1
0
mirror of https://github.com/rofafor/vdr-plugin-satip.git synced 2023-10-10 13:37:42 +02:00

Performance enhancement via recvmmsg().

Use recvmmsg() in order to read multiple packets with one system call.
This improves performance, especially in have loaded areas when catching up
a log of queued packets.

Original patch tweaked and optimized by Rolf Ahrenberg.
This commit is contained in:
nafets227 2014-12-13 22:15:58 +01:00 committed by Rolf Ahrenberg
parent fecbd3cbd4
commit 382e1dedef
8 changed files with 79 additions and 40 deletions

8
rtcp.c
View File

@ -10,16 +10,16 @@
#include "log.h"
#include "rtcp.h"
cSatipRtcp::cSatipRtcp(cSatipTunerIf &tunerP, unsigned int bufferLenP)
cSatipRtcp::cSatipRtcp(cSatipTunerIf &tunerP)
: tunerM(tunerP),
bufferLenM(bufferLenP),
bufferLenM(eApplicationMaxSizeB),
bufferM(MALLOC(unsigned char, bufferLenM))
{
debug1("%s (, %u) [device %d]", __PRETTY_FUNCTION__, bufferLenP, tunerM.GetId());
debug1("%s [device %d]", __PRETTY_FUNCTION__, tunerM.GetId());
if (bufferM)
memset(bufferM, 0, bufferLenM);
else
error("Cannot create RTCP buffer!");
error("Cannot create RTCP buffer! [device %d]", tunerM.GetId());
}
cSatipRtcp::~cSatipRtcp()

5
rtcp.h
View File

@ -14,13 +14,16 @@
class cSatipRtcp : public cSatipSocket, public cSatipPollerIf {
private:
enum {
eApplicationMaxSizeB = 1500,
};
cSatipTunerIf &tunerM;
unsigned int bufferLenM;
unsigned char *bufferM;
int GetApplicationOffset(int *lenghtP);
public:
cSatipRtcp(cSatipTunerIf &tunerP, unsigned int bufferLenP);
cSatipRtcp(cSatipTunerIf &tunerP);
virtual ~cSatipRtcp();
// for internal poller interface

57
rtp.c
View File

@ -13,19 +13,17 @@
#include "log.h"
#include "rtp.h"
cSatipRtp::cSatipRtp(cSatipTunerIf &tunerP, unsigned int bufferLenP)
cSatipRtp::cSatipRtp(cSatipTunerIf &tunerP)
: tunerM(tunerP),
bufferLenM(bufferLenP),
bufferLenM(eRtpPacketReadCount * eMaxUdpPacketSizeB),
bufferM(MALLOC(unsigned char, bufferLenM)),
lastErrorReportM(0),
packetErrorsM(0),
sequenceNumberM(-1)
{
debug1("%s (, %u) [device %d]", __PRETTY_FUNCTION__, bufferLenP, tunerM.GetId());
if (bufferM)
memset(bufferM, 0, bufferLenM);
else
error("Cannot create RTP buffer!");
debug1("%s () [device %d]", __PRETTY_FUNCTION__, tunerM.GetId());
if (!bufferM)
error("Cannot create RTP buffer! [device %d]", tunerM.GetId());
}
cSatipRtp::~cSatipRtp()
@ -53,30 +51,30 @@ void cSatipRtp::Close(void)
}
}
int cSatipRtp::GetHeaderLenght(unsigned int lengthP)
int cSatipRtp::GetHeaderLenght(unsigned char *bufferP, unsigned int lengthP)
{
debug8("%s (%d) [device %d]", __PRETTY_FUNCTION__, lengthP, tunerM.GetId());
debug8("%s (, %d) [device %d]", __PRETTY_FUNCTION__, lengthP, tunerM.GetId());
unsigned int headerlen = 0;
if (lengthP > 0) {
if (bufferM[0] == TS_SYNC_BYTE)
if (bufferP[0] == TS_SYNC_BYTE)
return headerlen;
else if (lengthP > 3) {
// http://tools.ietf.org/html/rfc3550
// http://tools.ietf.org/html/rfc2250
// Version
unsigned int v = (bufferM[0] >> 6) & 0x03;
unsigned int v = (bufferP[0] >> 6) & 0x03;
// Extension bit
unsigned int x = (bufferM[0] >> 4) & 0x01;
unsigned int x = (bufferP[0] >> 4) & 0x01;
// CSCR count
unsigned int cc = bufferM[0] & 0x0F;
unsigned int cc = bufferP[0] & 0x0F;
// Payload type: MPEG2 TS = 33
unsigned int pt = bufferM[1] & 0x7F;
unsigned int pt = bufferP[1] & 0x7F;
if (pt != 33)
debug16("%s (%d) Received invalid RTP payload type %d - v=%d len=%d sync=0x%02X [device %d]",
__PRETTY_FUNCTION__, lengthP, pt, v, headerlen, bufferM[headerlen], tunerM.GetId());
debug16("%s (%d) Received invalid RTP payload type %d - v=%d [device %d]",
__PRETTY_FUNCTION__, lengthP, pt, v, tunerM.GetId());
// Sequence number
int seq = ((bufferM[2] & 0xFF) << 8) | (bufferM[3] & 0xFF);
int seq = ((bufferP[2] & 0xFF) << 8) | (bufferP[3] & 0xFF);
if ((((sequenceNumberM + 1) % 0xFFFF) == 0) && (seq == 0xFFFF))
sequenceNumberM = -1;
else if ((sequenceNumberM >= 0) && (((sequenceNumberM + 1) % 0xFFFF) != seq)) {
@ -95,7 +93,7 @@ int cSatipRtp::GetHeaderLenght(unsigned int lengthP)
// Check if extension
if (x) {
// Extension header length
unsigned int ehl = (((bufferM[headerlen + 2] & 0xFF) << 8) | (bufferM[headerlen + 3] & 0xFF));
unsigned int ehl = (((bufferP[headerlen + 2] & 0xFF) << 8) | (bufferP[headerlen + 3] & 0xFF));
// Update header length
headerlen += (ehl + 1) * (unsigned int)sizeof(uint32_t);
}
@ -105,14 +103,14 @@ int cSatipRtp::GetHeaderLenght(unsigned int lengthP)
headerlen = -1;
}
// Check that rtp is version 2 and payload contains multiple of TS packet data
else if ((v != 2) || (((lengthP - headerlen) % TS_SIZE) != 0) || (bufferM[headerlen] != TS_SYNC_BYTE)) {
else if ((v != 2) || (((lengthP - headerlen) % TS_SIZE) != 0) || (bufferP[headerlen] != TS_SYNC_BYTE)) {
debug16("%s (%d) Received incorrect RTP packet #%d v=%d len=%d sync=0x%02X [device %d]", __PRETTY_FUNCTION__,
lengthP, seq, v, headerlen, bufferM[headerlen], tunerM.GetId());
lengthP, seq, v, headerlen, bufferP[headerlen], tunerM.GetId());
headerlen = -1;
}
else
debug16("%s (%d) Received RTP packet #%d v=%d len=%d sync=0x%02X [device %d]", __PRETTY_FUNCTION__,
lengthP, seq, v, headerlen, bufferM[headerlen], tunerM.GetId());
lengthP, seq, v, headerlen, bufferP[headerlen], tunerM.GetId());
}
}
@ -123,16 +121,21 @@ void cSatipRtp::Process(void)
{
debug8("%s [device %d]", __PRETTY_FUNCTION__, tunerM.GetId());
if (bufferM) {
unsigned int lenMsg[eRtpPacketReadCount];
uint64_t elapsed;
int length, count = 0;
int count = 0;
cTimeMs processing(0);
while ((length = Read(bufferM, bufferLenM)) > 0) {
int headerlen = GetHeaderLenght(length);
if ((headerlen >= 0) && (headerlen < length))
tunerM.ProcessVideoData(bufferM + headerlen, length - headerlen);
++count;
do {
count = ReadMulti(bufferM, lenMsg, eRtpPacketReadCount, eMaxUdpPacketSizeB);
for (int i = 0; i < count; ++i) {
unsigned char *p = &bufferM[i * eMaxUdpPacketSizeB];
int headerlen = GetHeaderLenght(p, lenMsg[i]);
if ((headerlen >= 0) && (headerlen < (int)lenMsg[i]))
tunerM.ProcessVideoData(p + headerlen, lenMsg[i] - headerlen);
}
} while (count >= eRtpPacketReadCount);
if (errno != EAGAIN && errno != EWOULDBLOCK)
error("Error %d reading in %s [device %d]", errno, *ToString(), tunerM.GetId());

8
rtp.h
View File

@ -15,7 +15,9 @@
class cSatipRtp : public cSatipSocket, public cSatipPollerIf {
private:
enum {
eReportIntervalS = 300 // in seconds
eRtpPacketReadCount = 50,
eMaxUdpPacketSizeB = TS_SIZE * 7 + 12,
eReportIntervalS = 300 // in seconds
};
cSatipTunerIf &tunerM;
unsigned int bufferLenM;
@ -23,10 +25,10 @@ private:
time_t lastErrorReportM;
int packetErrorsM;
int sequenceNumberM;
int GetHeaderLenght(unsigned int lengthP);
int GetHeaderLenght(unsigned char *bufferP, unsigned int lengthP);
public:
cSatipRtp(cSatipTunerIf &tunerP, unsigned int bufferLenP);
cSatipRtp(cSatipTunerIf &tunerP);
virtual ~cSatipRtp();
virtual void Close(void);

View File

@ -101,7 +101,7 @@ int cSatipSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP)
debug8("%s (, %d)", __PRETTY_FUNCTION__, bufferLenP);
// Error out if socket not initialized
if (socketDescM <= 0) {
error("Invalid socket in %s()", __PRETTY_FUNCTION__);
error("%s Invalid socket", __PRETTY_FUNCTION__);
return -1;
}
int len = 0;
@ -120,7 +120,7 @@ int cSatipSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP)
msgh.msg_controllen = sizeof(cbuf);
msgh.msg_name = &sockAddrM;
msgh.msg_namelen = addrlen;
msgh.msg_iov = &iov;
msgh.msg_iov = &iov;
msgh.msg_iovlen = 1;
msgh.msg_flags = 0;
@ -133,6 +133,37 @@ int cSatipSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP)
return 0;
}
int cSatipSocket::ReadMulti(unsigned char *bufferAddrP, unsigned int *elementRecvSizeP, unsigned int elementCountP, unsigned int elementBufferSizeP)
{
debug8("%s (, , %d, %d)", __PRETTY_FUNCTION__, elementCountP, elementBufferSizeP);
// Error out if socket not initialized
if (socketDescM <= 0) {
error("%s Invalid socket", __PRETTY_FUNCTION__);
return -1;
}
// Initialize iov and msgh structures
struct mmsghdr mmsgh[elementCountP];
struct iovec iov[elementCountP];
memset(mmsgh, 0, sizeof(mmsgh[0]) * elementCountP);
for (unsigned int i = 0; i < elementCountP; ++i) {
iov[i].iov_base = bufferAddrP + i * elementBufferSizeP;
iov[i].iov_len = elementBufferSizeP;
mmsgh[i].msg_hdr.msg_iov = &iov[i];
mmsgh[i].msg_hdr.msg_iovlen = 1;
}
// Read data from socket as a set
int count = -1;
if (socketDescM && bufferAddrP && elementRecvSizeP && (elementCountP > 0) && (elementBufferSizeP > 0))
count = (int)recvmmsg(socketDescM, mmsgh, elementCountP, MSG_DONTWAIT, NULL);
ERROR_IF_RET(count < 0 && errno != EAGAIN && errno != EWOULDBLOCK, "recvmmsg()", return -1);
for (int i = 0; i < count; ++i)
elementRecvSizeP[i] = mmsgh[i].msg_len;
debug8("%s Received %d packets size[0]=%d", __PRETTY_FUNCTION__, count, elementRecvSizeP[0]);
return count;
}
bool cSatipSocket::Write(const char *addrP, const unsigned char *bufferAddrP, unsigned int bufferLenP)
{

View File

@ -26,6 +26,7 @@ public:
bool IsOpen(void) { return (socketDescM >= 0); }
bool Flush(void);
int Read(unsigned char *bufferAddrP, unsigned int bufferLenP);
int ReadMulti(unsigned char *bufferAddrP, unsigned int *elementRecvSizeP, unsigned int elementCountP, unsigned int elementBufferSizeP);
bool Write(const char *addrP, const unsigned char *bufferAddrP, unsigned int bufferLenP);
};

View File

@ -21,8 +21,8 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP)
deviceM(&deviceP),
deviceIdM(deviceP.GetId()),
rtspM(*this),
rtpM(*this, packetLenP),
rtcpM(*this, eApplicationMaxSizeB),
rtpM(*this),
rtcpM(*this),
streamAddrM(""),
streamParamM(""),
currentServerM(NULL),

View File

@ -59,7 +59,6 @@ private:
eDummyPid = 100,
eDefaultSignalStrength = 15,
eDefaultSignalQuality = 224,
eApplicationMaxSizeB = 1500,
eSleepTimeoutMs = 500, // in milliseconds
eStatusUpdateTimeoutMs = 1000, // in milliseconds
ePidUpdateIntervalMs = 250, // in milliseconds