Try to read all data from udp socket at once

Existing implementation of udp read mechanism was giving up too easily.
Replaced it with version that tries more actively to drain the entire socket buffer.

Signed-off-by: Antti Seppälä <a.seppala@gmail.com>
This commit is contained in:
Antti Seppälä 2012-06-02 14:24:29 +03:00 committed by Rolf Ahrenberg
parent c7cbde301b
commit f4126b7e2c
1 changed files with 67 additions and 63 deletions

130
socket.c
View File

@ -154,70 +154,74 @@ int cIptvUdpSocket::Read(unsigned char* BufferAddr, unsigned int BufferLen)
error("Invalid socket in %s\n", __FUNCTION__);
return -1;
}
socklen_t addrlen = sizeof(sockAddr);
int len = 0;
struct msghdr msgh;
struct cmsghdr *cmsg;
struct iovec iov;
char cbuf[256];
// Initialize iov and msgh structures
memset(&msgh, 0, sizeof(struct msghdr));
iov.iov_base = BufferAddr;
iov.iov_len = BufferLen;
msgh.msg_control = cbuf;
msgh.msg_controllen = sizeof(cbuf);
msgh.msg_name = &sockAddr;
msgh.msg_namelen = addrlen;
msgh.msg_iov = &iov;
msgh.msg_iovlen = 1;
msgh.msg_flags = 0;
// Read data from socket
if (isActive && socketDesc && BufferAddr && (BufferLen > 0))
len = (int)recvmsg(socketDesc, &msgh, MSG_DONTWAIT);
if (len < 0) {
ERROR_IF(errno != EAGAIN, "recvmsg()");
return -1;
}
else if (len > 0) {
// Process auxiliary received data and validate source address
for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
if ((cmsg->cmsg_level == SOL_IP) && (cmsg->cmsg_type == IP_PKTINFO)) {
struct in_pktinfo *i = (struct in_pktinfo *)CMSG_DATA(cmsg);
if ((i->ipi_addr.s_addr == streamAddr) || (INADDR_ANY == streamAddr)) {
if (BufferAddr[0] == TS_SYNC_BYTE)
return len;
else if (len > 3) {
// http://www.networksorcery.com/enp/rfc/rfc2250.txt
// version
unsigned int v = (BufferAddr[0] >> 6) & 0x03;
// extension bit
unsigned int x = (BufferAddr[0] >> 4) & 0x01;
// cscr count
unsigned int cc = BufferAddr[0] & 0x0F;
// payload type: MPEG2 TS = 33
//unsigned int pt = readBuffer[1] & 0x7F;
// header lenght
unsigned int headerlen = (3 + cc) * (unsigned int)sizeof(uint32_t);
// check if extension
if (x) {
// extension header length
unsigned int ehl = (((BufferAddr[headerlen + 2] & 0xFF) << 8) |
(BufferAddr[headerlen + 3] & 0xFF));
// update header length
headerlen += (ehl + 1) * (unsigned int)sizeof(uint32_t);
}
// Check that rtp is version 2 and payload contains multiple of TS packet data
if ((v == 2) && (((len - headerlen) % TS_SIZE) == 0) &&
(BufferAddr[headerlen] == TS_SYNC_BYTE)) {
// Set argument point to payload in read buffer
memmove(BufferAddr, &BufferAddr[headerlen], (len - headerlen));
return (len - headerlen);
}
}
}
}
}
}
// Read data from socket in a loop
do {
socklen_t addrlen = sizeof(sockAddr);
struct msghdr msgh;
struct cmsghdr *cmsg;
struct iovec iov;
char cbuf[256];
len = 0;
// Initialize iov and msgh structures
memset(&msgh, 0, sizeof(struct msghdr));
iov.iov_base = BufferAddr;
iov.iov_len = BufferLen;
msgh.msg_control = cbuf;
msgh.msg_controllen = sizeof(cbuf);
msgh.msg_name = &sockAddr;
msgh.msg_namelen = addrlen;
msgh.msg_iov = &iov;
msgh.msg_iovlen = 1;
msgh.msg_flags = 0;
if (isActive && socketDesc && BufferAddr && (BufferLen > 0))
len = (int)recvmsg(socketDesc, &msgh, MSG_DONTWAIT);
else
break;
if (len > 0) {
// Process auxiliary received data and validate source address
for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
if ((cmsg->cmsg_level == SOL_IP) && (cmsg->cmsg_type == IP_PKTINFO)) {
struct in_pktinfo *i = (struct in_pktinfo *)CMSG_DATA(cmsg);
if ((i->ipi_addr.s_addr == streamAddr) || (INADDR_ANY == streamAddr)) {
if (BufferAddr[0] == TS_SYNC_BYTE)
return len;
else if (len > 3) {
// http://www.networksorcery.com/enp/rfc/rfc2250.txt
// version
unsigned int v = (BufferAddr[0] >> 6) & 0x03;
// extension bit
unsigned int x = (BufferAddr[0] >> 4) & 0x01;
// cscr count
unsigned int cc = BufferAddr[0] & 0x0F;
// payload type: MPEG2 TS = 33
//unsigned int pt = readBuffer[1] & 0x7F;
// header lenght
unsigned int headerlen = (3 + cc) * (unsigned int)sizeof(uint32_t);
// check if extension
if (x) {
// extension header length
unsigned int ehl = (((BufferAddr[headerlen + 2] & 0xFF) << 8) |
(BufferAddr[headerlen + 3] & 0xFF));
// update header length
headerlen += (ehl + 1) * (unsigned int)sizeof(uint32_t);
}
// Check that rtp is version 2 and payload contains multiple of TS packet data
if ((v == 2) && (((len - headerlen) % TS_SIZE) == 0) &&
(BufferAddr[headerlen] == TS_SYNC_BYTE)) {
// Set argument point to payload in read buffer
memmove(BufferAddr, &BufferAddr[headerlen], (len - headerlen));
return (len - headerlen);
}
}
}
}
}
}
} while (len > 0);
ERROR_IF_RET(len < 0 && errno != EAGAIN, "recvmsg()", return -1);
return 0;
}