diff --git a/rtcp.c b/rtcp.c index ffadc56..a2dca4d 100644 --- a/rtcp.c +++ b/rtcp.c @@ -10,16 +10,16 @@ #include "log.h" #include "rtcp.h" -cSatipRtcp::cSatipRtcp(cSatipTunerIf &tunerP, unsigned int bufferLenP) +cSatipRtcp::cSatipRtcp(cSatipTunerIf &tunerP) : tunerM(tunerP), - bufferLenM(bufferLenP), + bufferLenM(eApplicationMaxSizeB), bufferM(MALLOC(unsigned char, bufferLenM)) { - debug1("%s (, %u) [device %d]", __PRETTY_FUNCTION__, bufferLenP, tunerM.GetId()); + debug1("%s [device %d]", __PRETTY_FUNCTION__, tunerM.GetId()); if (bufferM) memset(bufferM, 0, bufferLenM); else - error("Cannot create RTCP buffer!"); + error("Cannot create RTCP buffer! [device %d]", tunerM.GetId()); } cSatipRtcp::~cSatipRtcp() diff --git a/rtcp.h b/rtcp.h index 30d5532..953707f 100644 --- a/rtcp.h +++ b/rtcp.h @@ -14,13 +14,16 @@ class cSatipRtcp : public cSatipSocket, public cSatipPollerIf { private: + enum { + eApplicationMaxSizeB = 1500, + }; cSatipTunerIf &tunerM; unsigned int bufferLenM; unsigned char *bufferM; int GetApplicationOffset(int *lenghtP); public: - cSatipRtcp(cSatipTunerIf &tunerP, unsigned int bufferLenP); + cSatipRtcp(cSatipTunerIf &tunerP); virtual ~cSatipRtcp(); // for internal poller interface diff --git a/rtp.c b/rtp.c index 1de109b..ab278e6 100644 --- a/rtp.c +++ b/rtp.c @@ -13,19 +13,17 @@ #include "log.h" #include "rtp.h" -cSatipRtp::cSatipRtp(cSatipTunerIf &tunerP, unsigned int bufferLenP) +cSatipRtp::cSatipRtp(cSatipTunerIf &tunerP) : tunerM(tunerP), - bufferLenM(bufferLenP), + bufferLenM(eRtpPacketReadCount * eMaxUdpPacketSizeB), bufferM(MALLOC(unsigned char, bufferLenM)), lastErrorReportM(0), packetErrorsM(0), sequenceNumberM(-1) { - debug1("%s (, %u) [device %d]", __PRETTY_FUNCTION__, bufferLenP, tunerM.GetId()); - if (bufferM) - memset(bufferM, 0, bufferLenM); - else - error("Cannot create RTP buffer!"); + debug1("%s () [device %d]", __PRETTY_FUNCTION__, tunerM.GetId()); + if (!bufferM) + error("Cannot create RTP buffer! [device %d]", tunerM.GetId()); } cSatipRtp::~cSatipRtp() @@ -53,30 +51,30 @@ void cSatipRtp::Close(void) } } -int cSatipRtp::GetHeaderLenght(unsigned int lengthP) +int cSatipRtp::GetHeaderLenght(unsigned char *bufferP, unsigned int lengthP) { - debug8("%s (%d) [device %d]", __PRETTY_FUNCTION__, lengthP, tunerM.GetId()); + debug8("%s (, %d) [device %d]", __PRETTY_FUNCTION__, lengthP, tunerM.GetId()); unsigned int headerlen = 0; if (lengthP > 0) { - if (bufferM[0] == TS_SYNC_BYTE) + if (bufferP[0] == TS_SYNC_BYTE) return headerlen; else if (lengthP > 3) { // http://tools.ietf.org/html/rfc3550 // http://tools.ietf.org/html/rfc2250 // Version - unsigned int v = (bufferM[0] >> 6) & 0x03; + unsigned int v = (bufferP[0] >> 6) & 0x03; // Extension bit - unsigned int x = (bufferM[0] >> 4) & 0x01; + unsigned int x = (bufferP[0] >> 4) & 0x01; // CSCR count - unsigned int cc = bufferM[0] & 0x0F; + unsigned int cc = bufferP[0] & 0x0F; // Payload type: MPEG2 TS = 33 - unsigned int pt = bufferM[1] & 0x7F; + unsigned int pt = bufferP[1] & 0x7F; if (pt != 33) - debug16("%s (%d) Received invalid RTP payload type %d - v=%d len=%d sync=0x%02X [device %d]", - __PRETTY_FUNCTION__, lengthP, pt, v, headerlen, bufferM[headerlen], tunerM.GetId()); + debug16("%s (%d) Received invalid RTP payload type %d - v=%d [device %d]", + __PRETTY_FUNCTION__, lengthP, pt, v, tunerM.GetId()); // Sequence number - int seq = ((bufferM[2] & 0xFF) << 8) | (bufferM[3] & 0xFF); + int seq = ((bufferP[2] & 0xFF) << 8) | (bufferP[3] & 0xFF); if ((((sequenceNumberM + 1) % 0xFFFF) == 0) && (seq == 0xFFFF)) sequenceNumberM = -1; else if ((sequenceNumberM >= 0) && (((sequenceNumberM + 1) % 0xFFFF) != seq)) { @@ -95,7 +93,7 @@ int cSatipRtp::GetHeaderLenght(unsigned int lengthP) // Check if extension if (x) { // Extension header length - unsigned int ehl = (((bufferM[headerlen + 2] & 0xFF) << 8) | (bufferM[headerlen + 3] & 0xFF)); + unsigned int ehl = (((bufferP[headerlen + 2] & 0xFF) << 8) | (bufferP[headerlen + 3] & 0xFF)); // Update header length headerlen += (ehl + 1) * (unsigned int)sizeof(uint32_t); } @@ -105,14 +103,14 @@ int cSatipRtp::GetHeaderLenght(unsigned int lengthP) headerlen = -1; } // Check that rtp is version 2 and payload contains multiple of TS packet data - else if ((v != 2) || (((lengthP - headerlen) % TS_SIZE) != 0) || (bufferM[headerlen] != TS_SYNC_BYTE)) { + else if ((v != 2) || (((lengthP - headerlen) % TS_SIZE) != 0) || (bufferP[headerlen] != TS_SYNC_BYTE)) { debug16("%s (%d) Received incorrect RTP packet #%d v=%d len=%d sync=0x%02X [device %d]", __PRETTY_FUNCTION__, - lengthP, seq, v, headerlen, bufferM[headerlen], tunerM.GetId()); + lengthP, seq, v, headerlen, bufferP[headerlen], tunerM.GetId()); headerlen = -1; } else debug16("%s (%d) Received RTP packet #%d v=%d len=%d sync=0x%02X [device %d]", __PRETTY_FUNCTION__, - lengthP, seq, v, headerlen, bufferM[headerlen], tunerM.GetId()); + lengthP, seq, v, headerlen, bufferP[headerlen], tunerM.GetId()); } } @@ -123,16 +121,21 @@ void cSatipRtp::Process(void) { debug8("%s [device %d]", __PRETTY_FUNCTION__, tunerM.GetId()); if (bufferM) { + unsigned int lenMsg[eRtpPacketReadCount]; uint64_t elapsed; - int length, count = 0; + int count = 0; cTimeMs processing(0); - while ((length = Read(bufferM, bufferLenM)) > 0) { - int headerlen = GetHeaderLenght(length); - if ((headerlen >= 0) && (headerlen < length)) - tunerM.ProcessVideoData(bufferM + headerlen, length - headerlen); - ++count; + do { + count = ReadMulti(bufferM, lenMsg, eRtpPacketReadCount, eMaxUdpPacketSizeB); + for (int i = 0; i < count; ++i) { + unsigned char *p = &bufferM[i * eMaxUdpPacketSizeB]; + int headerlen = GetHeaderLenght(p, lenMsg[i]); + if ((headerlen >= 0) && (headerlen < (int)lenMsg[i])) + tunerM.ProcessVideoData(p + headerlen, lenMsg[i] - headerlen); } + } while (count >= eRtpPacketReadCount); + if (errno != EAGAIN && errno != EWOULDBLOCK) error("Error %d reading in %s [device %d]", errno, *ToString(), tunerM.GetId()); diff --git a/rtp.h b/rtp.h index 45ebf95..1bd140a 100644 --- a/rtp.h +++ b/rtp.h @@ -15,7 +15,9 @@ class cSatipRtp : public cSatipSocket, public cSatipPollerIf { private: enum { - eReportIntervalS = 300 // in seconds + eRtpPacketReadCount = 50, + eMaxUdpPacketSizeB = TS_SIZE * 7 + 12, + eReportIntervalS = 300 // in seconds }; cSatipTunerIf &tunerM; unsigned int bufferLenM; @@ -23,10 +25,10 @@ private: time_t lastErrorReportM; int packetErrorsM; int sequenceNumberM; - int GetHeaderLenght(unsigned int lengthP); + int GetHeaderLenght(unsigned char *bufferP, unsigned int lengthP); public: - cSatipRtp(cSatipTunerIf &tunerP, unsigned int bufferLenP); + cSatipRtp(cSatipTunerIf &tunerP); virtual ~cSatipRtp(); virtual void Close(void); diff --git a/socket.c b/socket.c index adfe688..6226f43 100644 --- a/socket.c +++ b/socket.c @@ -101,7 +101,7 @@ int cSatipSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP) debug8("%s (, %d)", __PRETTY_FUNCTION__, bufferLenP); // Error out if socket not initialized if (socketDescM <= 0) { - error("Invalid socket in %s()", __PRETTY_FUNCTION__); + error("%s Invalid socket", __PRETTY_FUNCTION__); return -1; } int len = 0; @@ -120,7 +120,7 @@ int cSatipSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP) msgh.msg_controllen = sizeof(cbuf); msgh.msg_name = &sockAddrM; msgh.msg_namelen = addrlen; - msgh.msg_iov = &iov; + msgh.msg_iov = &iov; msgh.msg_iovlen = 1; msgh.msg_flags = 0; @@ -133,6 +133,37 @@ int cSatipSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP) return 0; } +int cSatipSocket::ReadMulti(unsigned char *bufferAddrP, unsigned int *elementRecvSizeP, unsigned int elementCountP, unsigned int elementBufferSizeP) +{ + debug8("%s (, , %d, %d)", __PRETTY_FUNCTION__, elementCountP, elementBufferSizeP); + // Error out if socket not initialized + if (socketDescM <= 0) { + error("%s Invalid socket", __PRETTY_FUNCTION__); + return -1; + } + // Initialize iov and msgh structures + struct mmsghdr mmsgh[elementCountP]; + struct iovec iov[elementCountP]; + memset(mmsgh, 0, sizeof(mmsgh[0]) * elementCountP); + for (unsigned int i = 0; i < elementCountP; ++i) { + iov[i].iov_base = bufferAddrP + i * elementBufferSizeP; + iov[i].iov_len = elementBufferSizeP; + mmsgh[i].msg_hdr.msg_iov = &iov[i]; + mmsgh[i].msg_hdr.msg_iovlen = 1; + } + + // Read data from socket as a set + int count = -1; + if (socketDescM && bufferAddrP && elementRecvSizeP && (elementCountP > 0) && (elementBufferSizeP > 0)) + count = (int)recvmmsg(socketDescM, mmsgh, elementCountP, MSG_DONTWAIT, NULL); + ERROR_IF_RET(count < 0 && errno != EAGAIN && errno != EWOULDBLOCK, "recvmmsg()", return -1); + for (int i = 0; i < count; ++i) + elementRecvSizeP[i] = mmsgh[i].msg_len; + debug8("%s Received %d packets size[0]=%d", __PRETTY_FUNCTION__, count, elementRecvSizeP[0]); + + return count; +} + bool cSatipSocket::Write(const char *addrP, const unsigned char *bufferAddrP, unsigned int bufferLenP) { diff --git a/socket.h b/socket.h index a146551..f9a93d8 100644 --- a/socket.h +++ b/socket.h @@ -26,6 +26,7 @@ public: bool IsOpen(void) { return (socketDescM >= 0); } bool Flush(void); int Read(unsigned char *bufferAddrP, unsigned int bufferLenP); + int ReadMulti(unsigned char *bufferAddrP, unsigned int *elementRecvSizeP, unsigned int elementCountP, unsigned int elementBufferSizeP); bool Write(const char *addrP, const unsigned char *bufferAddrP, unsigned int bufferLenP); }; diff --git a/tuner.c b/tuner.c index 1db5343..486d553 100644 --- a/tuner.c +++ b/tuner.c @@ -21,8 +21,8 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) deviceM(&deviceP), deviceIdM(deviceP.GetId()), rtspM(*this), - rtpM(*this, packetLenP), - rtcpM(*this, eApplicationMaxSizeB), + rtpM(*this), + rtcpM(*this), streamAddrM(""), streamParamM(""), currentServerM(NULL), diff --git a/tuner.h b/tuner.h index f4d331a..cb26f90 100644 --- a/tuner.h +++ b/tuner.h @@ -59,7 +59,6 @@ private: eDummyPid = 100, eDefaultSignalStrength = 15, eDefaultSignalQuality = 224, - eApplicationMaxSizeB = 1500, eSleepTimeoutMs = 500, // in milliseconds eStatusUpdateTimeoutMs = 1000, // in milliseconds ePidUpdateIntervalMs = 250, // in milliseconds