Optimized TS packet data flow.

This commit is contained in:
Rolf Ahrenberg 2009-02-26 16:04:12 +02:00
parent 73906ab698
commit 024ee7ba89
18 changed files with 105 additions and 149 deletions

View File

@ -84,3 +84,7 @@ VDR Plugin 'iptv' Revision History
- Fixed blacklisting of PAT section filter.
- Set max IPTV device count to VDR's max devices.
- Fixed a possible crash in sid and pid scanners.
2009-xx-xx: Version 0.2.4
- Optimized TS packet data flow.

View File

@ -10,8 +10,7 @@
cIptvConfig IptvConfig;
cIptvConfig::cIptvConfig(void)
: readBufferTsCount(48),
tsBufferSize(2),
: tsBufferSize(2),
tsBufferPrefillRatio(0),
extProtocolBasePort(4321),
useBytes(1),

View File

@ -14,7 +14,6 @@
class cIptvConfig
{
private:
unsigned int readBufferTsCount;
unsigned int tsBufferSize;
unsigned int tsBufferPrefillRatio;
unsigned int extProtocolBasePort;
@ -25,7 +24,6 @@ private:
public:
cIptvConfig();
unsigned int GetReadBufferTsCount(void) const { return readBufferTsCount; }
unsigned int GetTsBufferSize(void) const { return tsBufferSize; }
unsigned int GetTsBufferPrefillRatio(void) const { return tsBufferPrefillRatio; }
unsigned int GetExtProtocolBasePort(void) const { return extProtocolBasePort; }

View File

@ -20,21 +20,20 @@ cIptvDevice::cIptvDevice(unsigned int Index)
isPacketDelivered(false),
isOpenDvr(false),
sidScanEnabled(false),
pidScanEnabled(false),
mutex()
pidScanEnabled(false)
{
//debug("cIptvDevice::cIptvDevice(%d)\n", deviceIndex);
unsigned int bufsize = MEGABYTE(IptvConfig.GetTsBufferSize());
bufsize -= (bufsize % TS_SIZE);
isyslog("creating IPTV device %d (CardIndex=%d)", deviceIndex, CardIndex());
tsBuffer = new cRingBufferLinear(MEGABYTE(IptvConfig.GetTsBufferSize()),
(TS_SIZE * IptvConfig.GetReadBufferTsCount()),
false, "IPTV");
tsBuffer->SetTimeouts(100, 100);
tsBuffer = new cRingBufferLinear(bufsize + 1, TS_SIZE, false,
*cString::sprintf("IPTV %d", deviceIndex));
tsBuffer->SetTimeouts(10, 10);
ResetBuffering();
pUdpProtocol = new cIptvProtocolUdp();
pHttpProtocol = new cIptvProtocolHttp();
pFileProtocol = new cIptvProtocolFile();
pExtProtocol = new cIptvProtocolExt();
pIptvStreamer = new cIptvStreamer(tsBuffer, &mutex);
pIptvStreamer = new cIptvStreamer(tsBuffer, (100 * TS_SIZE));
pPidScanner = new cPidScanner;
// Initialize filter pointers
memset(secfilters, '\0', sizeof(secfilters));
@ -356,12 +355,11 @@ void cIptvDevice::CloseFilter(int Handle)
bool cIptvDevice::OpenDvr(void)
{
debug("cIptvDevice::OpenDvr(%d)\n", deviceIndex);
mutex.Lock();
isPacketDelivered = false;
tsBuffer->Clear();
mutex.Unlock();
ResetBuffering();
pIptvStreamer->Open();
if (pIptvStreamer)
pIptvStreamer->Open();
if (sidScanEnabled && pSidScanner && IptvConfig.GetSectionFiltering())
pSidScanner->SetStatus(true);
isOpenDvr = true;
@ -407,7 +405,7 @@ bool cIptvDevice::GetTSPacket(uchar *&Data)
{
int Count = 0;
//debug("cIptvDevice::GetTSPacket(%d)\n", deviceIndex);
if (!IsBuffering()) {
if (tsBuffer && !IsBuffering()) {
if (isPacketDelivered) {
tsBuffer->Del(TS_SIZE);
isPacketDelivered = false;
@ -432,8 +430,8 @@ bool cIptvDevice::GetTSPacket(uchar *&Data)
// Update pid statistics
AddPidStatistic(ts_pid(p), payload(p));
// Send data also to dvr fifo
if ((dvrFd >= 0) && (write(dvrFd, p, TS_SIZE) != TS_SIZE))
error("ERROR: write failed to FIFO\n");
if (dvrFd >= 0)
Count = write(dvrFd, p, TS_SIZE);
// Analyze incomplete streams with built-in pid analyzer
if (pidScanEnabled && pPidScanner)
pPidScanner->Process(p);
@ -446,7 +444,7 @@ bool cIptvDevice::GetTSPacket(uchar *&Data)
}
}
// Reduce cpu load by preventing busylooping
cCondWait::SleepMs(100);
cCondWait::SleepMs(10);
Data = NULL;
return true;
}

2
iptv.c
View File

@ -20,7 +20,7 @@
#error "VDR-1.6.0 API version or greater is required!"
#endif
static const char VERSION[] = "0.2.4";
static const char VERSION[] = "0.2.5";
static const char DESCRIPTION[] = trNOOP("Experience the IPTV");
class cPluginIptv : public cPlugin {

View File

@ -127,9 +127,9 @@ bool cIptvProtocolExt::Close(void)
return true;
}
int cIptvProtocolExt::Read(unsigned char* *BufferAddr)
int cIptvProtocolExt::Read(unsigned char* BufferAddr, unsigned int BufferLen)
{
return cIptvUdpSocket::Read(BufferAddr);
return cIptvUdpSocket::Read(BufferAddr, BufferLen);
}
bool cIptvProtocolExt::Set(const char* Location, const int Parameter, const int Index)

View File

@ -25,7 +25,7 @@ private:
public:
cIptvProtocolExt();
virtual ~cIptvProtocolExt();
int Read(unsigned char* *BufferAddr);
int Read(unsigned char* BufferAddr, unsigned int BufferLen);
bool Set(const char* Location, const int Parameter, const int Index);
bool Open(void);
bool Close(void);

View File

@ -16,15 +16,10 @@
cIptvProtocolFile::cIptvProtocolFile()
: fileDelay(0),
readBufferLen(TS_SIZE * IptvConfig.GetReadBufferTsCount()),
isActive(false)
{
debug("cIptvProtocolFile::cIptvProtocolFile()\n");
fileLocation = strdup("");
// Allocate receive buffer
readBuffer = MALLOC(unsigned char, readBufferLen);
if (!readBuffer)
error("ERROR: MALLOC() failed in ProtocolFile()");
}
cIptvProtocolFile::~cIptvProtocolFile()
@ -34,7 +29,6 @@ cIptvProtocolFile::~cIptvProtocolFile()
cIptvProtocolFile::Close();
// Free allocated memory
free(fileLocation);
free(readBuffer);
}
bool cIptvProtocolFile::OpenFile(void)
@ -61,10 +55,9 @@ void cIptvProtocolFile::CloseFile(void)
}
}
int cIptvProtocolFile::Read(unsigned char* *BufferAddr)
int cIptvProtocolFile::Read(unsigned char* BufferAddr, unsigned int BufferLen)
{
//debug("cIptvProtocolFile::Read()\n");
*BufferAddr = readBuffer;
// Check errors
if (ferror(fileStream)) {
debug("Read error\n");
@ -81,7 +74,7 @@ int cIptvProtocolFile::Read(unsigned char* *BufferAddr)
// during the sleep and buffers are disposed. Check here that the plugin is
// still active before accessing the buffers
if (isActive)
return fread(readBuffer, sizeof(unsigned char), readBufferLen, fileStream);
return fread(BufferAddr, sizeof(unsigned char), BufferLen, fileStream);
return -1;
}

View File

@ -16,8 +16,6 @@ private:
char* fileLocation;
int fileDelay;
FILE* fileStream;
unsigned char* readBuffer;
unsigned int readBufferLen;
bool isActive;
private:
@ -27,7 +25,7 @@ private:
public:
cIptvProtocolFile();
virtual ~cIptvProtocolFile();
int Read(unsigned char* *BufferAddr);
int Read(unsigned char* BufferAddr, unsigned int BufferLen);
bool Set(const char* Location, const int Parameter, const int Index);
bool Open(void);
bool Close(void);

View File

@ -45,7 +45,7 @@ bool cIptvProtocolHttp::Connect(void)
// First try only the IP address
sockAddr.sin_addr.s_addr = inet_addr(streamAddr);
if (sockAddr.sin_addr.s_addr == INADDR_NONE) {
debug("Cannot convert %s directly to internet address\n", streamAddr);
@ -214,9 +214,9 @@ bool cIptvProtocolHttp::Close(void)
return true;
}
int cIptvProtocolHttp::Read(unsigned char* *BufferAddr)
int cIptvProtocolHttp::Read(unsigned char* BufferAddr, unsigned int BufferLen)
{
return cIptvTcpSocket::Read(BufferAddr);
return cIptvTcpSocket::Read(BufferAddr, BufferLen);
}
bool cIptvProtocolHttp::Set(const char* Location, const int Parameter, const int Index)

View File

@ -27,7 +27,7 @@ private:
public:
cIptvProtocolHttp();
virtual ~cIptvProtocolHttp();
int Read(unsigned char* *BufferAddr);
int Read(unsigned char* BufferAddr, unsigned int BufferLen);
bool Set(const char* Location, const int Parameter, const int Index);
bool Open(void);
bool Close(void);

View File

@ -12,7 +12,7 @@ class cIptvProtocolIf {
public:
cIptvProtocolIf() {}
virtual ~cIptvProtocolIf() {}
virtual int Read(unsigned char* *BufferAddr) = 0;
virtual int Read(unsigned char* BufferAddr, unsigned int BufferLen) = 0;
virtual bool Set(const char* Location, const int Parameter, const int Index) = 0;
virtual bool Open(void) = 0;
virtual bool Close(void) = 0;

View File

@ -91,9 +91,9 @@ bool cIptvProtocolUdp::Close(void)
return true;
}
int cIptvProtocolUdp::Read(unsigned char* *BufferAddr)
int cIptvProtocolUdp::Read(unsigned char* BufferAddr, unsigned int BufferLen)
{
return cIptvUdpSocket::Read(BufferAddr);
return cIptvUdpSocket::Read(BufferAddr, BufferLen);
}
bool cIptvProtocolUdp::Set(const char* Location, const int Parameter, const int Index)

View File

@ -23,7 +23,7 @@ private:
public:
cIptvProtocolUdp();
virtual ~cIptvProtocolUdp();
int Read(unsigned char* *BufferAddr);
int Read(unsigned char* BufferAddr, unsigned int BufferLen);
bool Set(const char* Location, const int Parameter, const int Index);
bool Open(void);
bool Close(void);

106
socket.c
View File

@ -20,14 +20,9 @@
cIptvSocket::cIptvSocket()
: socketPort(0),
socketDesc(-1),
readBufferLen(TS_SIZE * IptvConfig.GetReadBufferTsCount()),
isActive(false)
{
debug("cIptvSocket::cIptvSocket()\n");
// Allocate receive buffer
readBuffer = MALLOC(unsigned char, readBufferLen);
if (!readBuffer)
error("ERROR: MALLOC() failed in socket()");
}
cIptvSocket::~cIptvSocket()
@ -35,8 +30,6 @@ cIptvSocket::~cIptvSocket()
debug("cIptvSocket::~cIptvSocket()\n");
// Close the socket
CloseSocket();
// Free allocated memory
free(readBuffer);
}
bool cIptvSocket::OpenSocket(const int Port, const bool isUdp)
@ -50,7 +43,7 @@ bool cIptvSocket::OpenSocket(const int Port, const bool isUdp)
}
// Bind to the socket if it is not active already
if (socketDesc < 0) {
int yes = 1;
int yes = 1;
// Create socket
if (isUdp)
socketDesc = socket(PF_INET, SOCK_DGRAM, 0);
@ -104,7 +97,7 @@ bool cIptvUdpSocket::OpenSocket(const int Port)
return cIptvSocket::OpenSocket(Port, true);
}
int cIptvUdpSocket::Read(unsigned char* *BufferAddr)
int cIptvUdpSocket::Read(unsigned char* BufferAddr, unsigned int BufferLen)
{
//debug("cIptvUdpSocket::Read()\n");
// Error out if socket not initialized
@ -113,51 +106,38 @@ int cIptvUdpSocket::Read(unsigned char* *BufferAddr)
return -1;
}
socklen_t addrlen = sizeof(sockAddr);
// Set argument point to read buffer
*BufferAddr = readBuffer;
// Wait 500ms for data
int retval = select_single_desc(socketDesc, 500000, false);
// Check if error
if (retval < 0)
return retval;
// Check if data available
else if (retval) {
int len = 0;
// Read data from socket
if (isActive)
len = recvfrom(socketDesc, readBuffer, readBufferLen, MSG_DONTWAIT,
(struct sockaddr *)&sockAddr, &addrlen);
ERROR_IF_RET(len < 0, "recvfrom()", return len);
if ((len > 0) && (readBuffer[0] == 0x47)) {
// Set argument point to read buffer
*BufferAddr = &readBuffer[0];
return len;
int len = 0;
// Read data from socket
if (isActive)
len = recvfrom(socketDesc, BufferAddr, BufferLen, MSG_DONTWAIT,
(struct sockaddr *)&sockAddr, &addrlen);
if ((len > 0) && (BufferAddr[0] == 0x47)) {
return len;
}
else if (len > 3) {
// http://www.networksorcery.com/enp/rfc/rfc2250.txt
// version
unsigned int v = (BufferAddr[0] >> 6) & 0x03;
// extension bit
unsigned int x = (BufferAddr[0] >> 4) & 0x01;
// cscr count
unsigned int cc = BufferAddr[0] & 0x0F;
// payload type: MPEG2 TS = 33
//unsigned int pt = readBuffer[1] & 0x7F;
// header lenght
unsigned int headerlen = (3 + cc) * sizeof(uint32_t);
// check if extension
if (x) {
// extension header length
unsigned int ehl = (((BufferAddr[headerlen + 2] & 0xFF) << 8) | (BufferAddr[headerlen + 3] & 0xFF));
// update header length
headerlen += (ehl + 1) * sizeof(uint32_t);
}
else if (len > 3) {
// http://www.networksorcery.com/enp/rfc/rfc2250.txt
// version
unsigned int v = (readBuffer[0] >> 6) & 0x03;
// extension bit
unsigned int x = (readBuffer[0] >> 4) & 0x01;
// cscr count
unsigned int cc = readBuffer[0] & 0x0F;
// payload type: MPEG2 TS = 33
//unsigned int pt = readBuffer[1] & 0x7F;
// header lenght
unsigned int headerlen = (3 + cc) * sizeof(uint32_t);
// check if extension
if (x) {
// extension header length
unsigned int ehl = (((readBuffer[headerlen + 2] & 0xFF) << 8) | (readBuffer[headerlen + 3] & 0xFF));
// update header length
headerlen += (ehl + 1) * sizeof(uint32_t);
}
// Check that rtp is version 2 and payload contains multiple of TS packet data
if ((v == 2) && (((len - headerlen) % TS_SIZE) == 0) && (readBuffer[headerlen] == 0x47)) {
// Set argument point to payload in read buffer
*BufferAddr = &readBuffer[headerlen];
return (len - headerlen);
}
// Check that rtp is version 2 and payload contains multiple of TS packet data
if ((v == 2) && (((len - headerlen) % TS_SIZE) == 0) && (BufferAddr[headerlen] == 0x47)) {
// Set argument point to payload in read buffer
memmove(BufferAddr, &BufferAddr[headerlen], (len - headerlen));
return (len - headerlen);
}
}
return 0;
@ -180,7 +160,7 @@ bool cIptvTcpSocket::OpenSocket(const int Port)
return cIptvSocket::OpenSocket(Port, false);
}
int cIptvTcpSocket::Read(unsigned char* *BufferAddr)
int cIptvTcpSocket::Read(unsigned char* BufferAddr, unsigned int BufferLen)
{
//debug("cIptvTcpSocket::Read()\n");
// Error out if socket not initialized
@ -189,19 +169,9 @@ int cIptvTcpSocket::Read(unsigned char* *BufferAddr)
return -1;
}
socklen_t addrlen = sizeof(sockAddr);
// Set argument point to read buffer
*BufferAddr = readBuffer;
// Wait 500ms for data
int retval = select_single_desc(socketDesc, 500000, false);
// Check if error
if (retval < 0)
return retval;
// Check if data available
else if (retval) {
// Read data from socket
if (isActive)
return recvfrom(socketDesc, readBuffer, readBufferLen, MSG_DONTWAIT,
(struct sockaddr *)&sockAddr, &addrlen);
}
// Read data from socket
if (isActive)
return recvfrom(socketDesc, BufferAddr, BufferLen, MSG_DONTWAIT,
(struct sockaddr *)&sockAddr, &addrlen);
return 0;
}

View File

@ -14,8 +14,6 @@ class cIptvSocket {
protected:
int socketPort;
int socketDesc;
unsigned char* readBuffer;
unsigned int readBufferLen;
struct sockaddr_in sockAddr;
bool isActive;
@ -32,7 +30,7 @@ class cIptvUdpSocket : public cIptvSocket {
public:
cIptvUdpSocket();
virtual ~cIptvUdpSocket();
virtual int Read(unsigned char* *BufferAddr);
virtual int Read(unsigned char* BufferAddr, unsigned int BufferLen);
bool OpenSocket(const int Port);
};
@ -40,7 +38,7 @@ class cIptvTcpSocket : public cIptvSocket {
public:
cIptvTcpSocket();
virtual ~cIptvTcpSocket();
virtual int Read(unsigned char* *BufferAddr);
virtual int Read(unsigned char* BufferAddr, unsigned int BufferLen);
bool OpenSocket(const int Port);
};

View File

@ -11,13 +11,19 @@
#include "common.h"
#include "streamer.h"
cIptvStreamer::cIptvStreamer(cRingBufferLinear* RingBuffer, cMutex* Mutex)
cIptvStreamer::cIptvStreamer(cRingBufferLinear* RingBuffer, unsigned int PacketLen)
: cThread("IPTV streamer"),
ringBuffer(RingBuffer),
mutex(Mutex),
packetBufferLen(PacketLen),
protocol(NULL)
{
debug("cIptvStreamer::cIptvStreamer()\n");
debug("cIptvStreamer::cIptvStreamer(%d)\n", packetBufferLen);
// Allocate packet buffer
packetBuffer = MALLOC(unsigned char, packetBufferLen);
if (packetBuffer)
memset(packetBuffer, 0, packetBufferLen);
else
error("ERROR: MALLOC() failed for packet buffer");
}
cIptvStreamer::~cIptvStreamer()
@ -25,32 +31,31 @@ cIptvStreamer::~cIptvStreamer()
debug("cIptvStreamer::~cIptvStreamer()\n");
// Close the protocol
Close();
// Free allocated memory
free(packetBuffer);
}
void cIptvStreamer::Action(void)
{
debug("cIptvStreamer::Action(): Entering\n");
// Increase priority
//SetPriority(-1);
// Do the thread loop
while (Running()) {
if (ringBuffer && mutex && protocol && ringBuffer->Free()) {
unsigned char *buffer = NULL;
mutex->Lock();
int length = protocol->Read(&buffer);
if (length >= 0) {
AddStreamerStatistic(length);
int p = ringBuffer->Put(buffer, length);
if (p != length && Running())
ringBuffer->ReportOverflow(length - p);
mutex->Unlock();
}
else {
mutex->Unlock();
sleep.Wait(100); // to reduce cpu load
}
}
else
sleep.Wait(100); // and avoid busy loop
}
while (packetBuffer && Running()) {
int length = -1;
if (protocol)
length = protocol->Read(packetBuffer, packetBufferLen);
if (length >= 0) {
AddStreamerStatistic(length);
if (ringBuffer) {
int p = ringBuffer->Put(packetBuffer, length);
if (p != length)
ringBuffer->ReportOverflow(length - p);
}
}
else
sleep.Wait(10); // to avoid busy loop and reduce cpu load
}
debug("cIptvStreamer::Action(): Exiting\n");
}
@ -72,15 +77,9 @@ bool cIptvStreamer::Close(void)
sleep.Signal();
if (Running())
Cancel(3);
// Close the protocol. A mutex should be taken here to avoid a race condition
// where thread Action() may be in the process of accessing the protocol.
// Taking a mutex serializes the Close() and Action() -calls.
if (mutex)
mutex->Lock();
// Close the protocol
if (protocol)
protocol->Close();
if (mutex)
mutex->Unlock();
return true;
}

View File

@ -19,14 +19,13 @@
class cIptvStreamer : public cThread, public cIptvStreamerStatistics {
private:
cRingBufferLinear* ringBuffer;
cMutex* mutex;
cCondWait sleep;
unsigned char* readBuffer;
unsigned int readBufferLen;
unsigned char* packetBuffer;
unsigned int packetBufferLen;
cIptvProtocolIf* protocol;
public:
cIptvStreamer(cRingBufferLinear* RingBuffer, cMutex* Mutex);
cIptvStreamer(cRingBufferLinear* RingBuffer, unsigned int PacketLen);
virtual ~cIptvStreamer();
virtual void Action(void);
bool Set(const char* Location, const int Parameter, const int Index, cIptvProtocolIf* Protocol);