diff --git a/HISTORY b/HISTORY index 234e291..77ba200 100644 --- a/HISTORY +++ b/HISTORY @@ -84,3 +84,7 @@ VDR Plugin 'iptv' Revision History - Fixed blacklisting of PAT section filter. - Set max IPTV device count to VDR's max devices. - Fixed a possible crash in sid and pid scanners. + +2009-xx-xx: Version 0.2.4 + +- Optimized TS packet data flow. diff --git a/config.c b/config.c index 258965b..11b0956 100644 --- a/config.c +++ b/config.c @@ -10,8 +10,7 @@ cIptvConfig IptvConfig; cIptvConfig::cIptvConfig(void) -: readBufferTsCount(48), - tsBufferSize(2), +: tsBufferSize(2), tsBufferPrefillRatio(0), extProtocolBasePort(4321), useBytes(1), diff --git a/config.h b/config.h index b0b0cd9..92e362f 100644 --- a/config.h +++ b/config.h @@ -14,7 +14,6 @@ class cIptvConfig { private: - unsigned int readBufferTsCount; unsigned int tsBufferSize; unsigned int tsBufferPrefillRatio; unsigned int extProtocolBasePort; @@ -25,7 +24,6 @@ private: public: cIptvConfig(); - unsigned int GetReadBufferTsCount(void) const { return readBufferTsCount; } unsigned int GetTsBufferSize(void) const { return tsBufferSize; } unsigned int GetTsBufferPrefillRatio(void) const { return tsBufferPrefillRatio; } unsigned int GetExtProtocolBasePort(void) const { return extProtocolBasePort; } diff --git a/device.c b/device.c index 477a06f..b8ffa99 100644 --- a/device.c +++ b/device.c @@ -20,21 +20,20 @@ cIptvDevice::cIptvDevice(unsigned int Index) isPacketDelivered(false), isOpenDvr(false), sidScanEnabled(false), - pidScanEnabled(false), - mutex() + pidScanEnabled(false) { - //debug("cIptvDevice::cIptvDevice(%d)\n", deviceIndex); + unsigned int bufsize = MEGABYTE(IptvConfig.GetTsBufferSize()); + bufsize -= (bufsize % TS_SIZE); isyslog("creating IPTV device %d (CardIndex=%d)", deviceIndex, CardIndex()); - tsBuffer = new cRingBufferLinear(MEGABYTE(IptvConfig.GetTsBufferSize()), - (TS_SIZE * IptvConfig.GetReadBufferTsCount()), - false, "IPTV"); - tsBuffer->SetTimeouts(100, 100); + tsBuffer = new cRingBufferLinear(bufsize + 1, TS_SIZE, false, + *cString::sprintf("IPTV %d", deviceIndex)); + tsBuffer->SetTimeouts(10, 10); ResetBuffering(); pUdpProtocol = new cIptvProtocolUdp(); pHttpProtocol = new cIptvProtocolHttp(); pFileProtocol = new cIptvProtocolFile(); pExtProtocol = new cIptvProtocolExt(); - pIptvStreamer = new cIptvStreamer(tsBuffer, &mutex); + pIptvStreamer = new cIptvStreamer(tsBuffer, (100 * TS_SIZE)); pPidScanner = new cPidScanner; // Initialize filter pointers memset(secfilters, '\0', sizeof(secfilters)); @@ -356,12 +355,11 @@ void cIptvDevice::CloseFilter(int Handle) bool cIptvDevice::OpenDvr(void) { debug("cIptvDevice::OpenDvr(%d)\n", deviceIndex); - mutex.Lock(); isPacketDelivered = false; tsBuffer->Clear(); - mutex.Unlock(); ResetBuffering(); - pIptvStreamer->Open(); + if (pIptvStreamer) + pIptvStreamer->Open(); if (sidScanEnabled && pSidScanner && IptvConfig.GetSectionFiltering()) pSidScanner->SetStatus(true); isOpenDvr = true; @@ -407,7 +405,7 @@ bool cIptvDevice::GetTSPacket(uchar *&Data) { int Count = 0; //debug("cIptvDevice::GetTSPacket(%d)\n", deviceIndex); - if (!IsBuffering()) { + if (tsBuffer && !IsBuffering()) { if (isPacketDelivered) { tsBuffer->Del(TS_SIZE); isPacketDelivered = false; @@ -432,8 +430,8 @@ bool cIptvDevice::GetTSPacket(uchar *&Data) // Update pid statistics AddPidStatistic(ts_pid(p), payload(p)); // Send data also to dvr fifo - if ((dvrFd >= 0) && (write(dvrFd, p, TS_SIZE) != TS_SIZE)) - error("ERROR: write failed to FIFO\n"); + if (dvrFd >= 0) + Count = write(dvrFd, p, TS_SIZE); // Analyze incomplete streams with built-in pid analyzer if (pidScanEnabled && pPidScanner) pPidScanner->Process(p); @@ -446,7 +444,7 @@ bool cIptvDevice::GetTSPacket(uchar *&Data) } } // Reduce cpu load by preventing busylooping - cCondWait::SleepMs(100); + cCondWait::SleepMs(10); Data = NULL; return true; } diff --git a/iptv.c b/iptv.c index 1d16c13..8726f35 100644 --- a/iptv.c +++ b/iptv.c @@ -20,7 +20,7 @@ #error "VDR-1.6.0 API version or greater is required!" #endif -static const char VERSION[] = "0.2.4"; +static const char VERSION[] = "0.2.5"; static const char DESCRIPTION[] = trNOOP("Experience the IPTV"); class cPluginIptv : public cPlugin { diff --git a/protocolext.c b/protocolext.c index 73d8f32..29239f4 100644 --- a/protocolext.c +++ b/protocolext.c @@ -127,9 +127,9 @@ bool cIptvProtocolExt::Close(void) return true; } -int cIptvProtocolExt::Read(unsigned char* *BufferAddr) +int cIptvProtocolExt::Read(unsigned char* BufferAddr, unsigned int BufferLen) { - return cIptvUdpSocket::Read(BufferAddr); + return cIptvUdpSocket::Read(BufferAddr, BufferLen); } bool cIptvProtocolExt::Set(const char* Location, const int Parameter, const int Index) diff --git a/protocolext.h b/protocolext.h index 2f2ef57..e8bebdc 100644 --- a/protocolext.h +++ b/protocolext.h @@ -25,7 +25,7 @@ private: public: cIptvProtocolExt(); virtual ~cIptvProtocolExt(); - int Read(unsigned char* *BufferAddr); + int Read(unsigned char* BufferAddr, unsigned int BufferLen); bool Set(const char* Location, const int Parameter, const int Index); bool Open(void); bool Close(void); diff --git a/protocolfile.c b/protocolfile.c index 25889e4..a61d5aa 100644 --- a/protocolfile.c +++ b/protocolfile.c @@ -16,15 +16,10 @@ cIptvProtocolFile::cIptvProtocolFile() : fileDelay(0), - readBufferLen(TS_SIZE * IptvConfig.GetReadBufferTsCount()), isActive(false) { debug("cIptvProtocolFile::cIptvProtocolFile()\n"); fileLocation = strdup(""); - // Allocate receive buffer - readBuffer = MALLOC(unsigned char, readBufferLen); - if (!readBuffer) - error("ERROR: MALLOC() failed in ProtocolFile()"); } cIptvProtocolFile::~cIptvProtocolFile() @@ -34,7 +29,6 @@ cIptvProtocolFile::~cIptvProtocolFile() cIptvProtocolFile::Close(); // Free allocated memory free(fileLocation); - free(readBuffer); } bool cIptvProtocolFile::OpenFile(void) @@ -61,10 +55,9 @@ void cIptvProtocolFile::CloseFile(void) } } -int cIptvProtocolFile::Read(unsigned char* *BufferAddr) +int cIptvProtocolFile::Read(unsigned char* BufferAddr, unsigned int BufferLen) { //debug("cIptvProtocolFile::Read()\n"); - *BufferAddr = readBuffer; // Check errors if (ferror(fileStream)) { debug("Read error\n"); @@ -81,7 +74,7 @@ int cIptvProtocolFile::Read(unsigned char* *BufferAddr) // during the sleep and buffers are disposed. Check here that the plugin is // still active before accessing the buffers if (isActive) - return fread(readBuffer, sizeof(unsigned char), readBufferLen, fileStream); + return fread(BufferAddr, sizeof(unsigned char), BufferLen, fileStream); return -1; } diff --git a/protocolfile.h b/protocolfile.h index a820e7c..4ac923d 100644 --- a/protocolfile.h +++ b/protocolfile.h @@ -16,8 +16,6 @@ private: char* fileLocation; int fileDelay; FILE* fileStream; - unsigned char* readBuffer; - unsigned int readBufferLen; bool isActive; private: @@ -27,7 +25,7 @@ private: public: cIptvProtocolFile(); virtual ~cIptvProtocolFile(); - int Read(unsigned char* *BufferAddr); + int Read(unsigned char* BufferAddr, unsigned int BufferLen); bool Set(const char* Location, const int Parameter, const int Index); bool Open(void); bool Close(void); diff --git a/protocolhttp.c b/protocolhttp.c index 0f9b9bf..1d91cbc 100644 --- a/protocolhttp.c +++ b/protocolhttp.c @@ -45,7 +45,7 @@ bool cIptvProtocolHttp::Connect(void) // First try only the IP address sockAddr.sin_addr.s_addr = inet_addr(streamAddr); - + if (sockAddr.sin_addr.s_addr == INADDR_NONE) { debug("Cannot convert %s directly to internet address\n", streamAddr); @@ -214,9 +214,9 @@ bool cIptvProtocolHttp::Close(void) return true; } -int cIptvProtocolHttp::Read(unsigned char* *BufferAddr) +int cIptvProtocolHttp::Read(unsigned char* BufferAddr, unsigned int BufferLen) { - return cIptvTcpSocket::Read(BufferAddr); + return cIptvTcpSocket::Read(BufferAddr, BufferLen); } bool cIptvProtocolHttp::Set(const char* Location, const int Parameter, const int Index) diff --git a/protocolhttp.h b/protocolhttp.h index 5539eb7..8410f92 100644 --- a/protocolhttp.h +++ b/protocolhttp.h @@ -27,7 +27,7 @@ private: public: cIptvProtocolHttp(); virtual ~cIptvProtocolHttp(); - int Read(unsigned char* *BufferAddr); + int Read(unsigned char* BufferAddr, unsigned int BufferLen); bool Set(const char* Location, const int Parameter, const int Index); bool Open(void); bool Close(void); diff --git a/protocolif.h b/protocolif.h index 2606ae5..2c395bd 100644 --- a/protocolif.h +++ b/protocolif.h @@ -12,7 +12,7 @@ class cIptvProtocolIf { public: cIptvProtocolIf() {} virtual ~cIptvProtocolIf() {} - virtual int Read(unsigned char* *BufferAddr) = 0; + virtual int Read(unsigned char* BufferAddr, unsigned int BufferLen) = 0; virtual bool Set(const char* Location, const int Parameter, const int Index) = 0; virtual bool Open(void) = 0; virtual bool Close(void) = 0; diff --git a/protocoludp.c b/protocoludp.c index 9626849..d5377a5 100644 --- a/protocoludp.c +++ b/protocoludp.c @@ -91,9 +91,9 @@ bool cIptvProtocolUdp::Close(void) return true; } -int cIptvProtocolUdp::Read(unsigned char* *BufferAddr) +int cIptvProtocolUdp::Read(unsigned char* BufferAddr, unsigned int BufferLen) { - return cIptvUdpSocket::Read(BufferAddr); + return cIptvUdpSocket::Read(BufferAddr, BufferLen); } bool cIptvProtocolUdp::Set(const char* Location, const int Parameter, const int Index) diff --git a/protocoludp.h b/protocoludp.h index 583d09d..ed33872 100644 --- a/protocoludp.h +++ b/protocoludp.h @@ -23,7 +23,7 @@ private: public: cIptvProtocolUdp(); virtual ~cIptvProtocolUdp(); - int Read(unsigned char* *BufferAddr); + int Read(unsigned char* BufferAddr, unsigned int BufferLen); bool Set(const char* Location, const int Parameter, const int Index); bool Open(void); bool Close(void); diff --git a/socket.c b/socket.c index b850982..a7db28d 100644 --- a/socket.c +++ b/socket.c @@ -20,14 +20,9 @@ cIptvSocket::cIptvSocket() : socketPort(0), socketDesc(-1), - readBufferLen(TS_SIZE * IptvConfig.GetReadBufferTsCount()), isActive(false) { debug("cIptvSocket::cIptvSocket()\n"); - // Allocate receive buffer - readBuffer = MALLOC(unsigned char, readBufferLen); - if (!readBuffer) - error("ERROR: MALLOC() failed in socket()"); } cIptvSocket::~cIptvSocket() @@ -35,8 +30,6 @@ cIptvSocket::~cIptvSocket() debug("cIptvSocket::~cIptvSocket()\n"); // Close the socket CloseSocket(); - // Free allocated memory - free(readBuffer); } bool cIptvSocket::OpenSocket(const int Port, const bool isUdp) @@ -50,7 +43,7 @@ bool cIptvSocket::OpenSocket(const int Port, const bool isUdp) } // Bind to the socket if it is not active already if (socketDesc < 0) { - int yes = 1; + int yes = 1; // Create socket if (isUdp) socketDesc = socket(PF_INET, SOCK_DGRAM, 0); @@ -104,7 +97,7 @@ bool cIptvUdpSocket::OpenSocket(const int Port) return cIptvSocket::OpenSocket(Port, true); } -int cIptvUdpSocket::Read(unsigned char* *BufferAddr) +int cIptvUdpSocket::Read(unsigned char* BufferAddr, unsigned int BufferLen) { //debug("cIptvUdpSocket::Read()\n"); // Error out if socket not initialized @@ -113,51 +106,38 @@ int cIptvUdpSocket::Read(unsigned char* *BufferAddr) return -1; } socklen_t addrlen = sizeof(sockAddr); - // Set argument point to read buffer - *BufferAddr = readBuffer; - // Wait 500ms for data - int retval = select_single_desc(socketDesc, 500000, false); - // Check if error - if (retval < 0) - return retval; - // Check if data available - else if (retval) { - int len = 0; - // Read data from socket - if (isActive) - len = recvfrom(socketDesc, readBuffer, readBufferLen, MSG_DONTWAIT, - (struct sockaddr *)&sockAddr, &addrlen); - ERROR_IF_RET(len < 0, "recvfrom()", return len); - if ((len > 0) && (readBuffer[0] == 0x47)) { - // Set argument point to read buffer - *BufferAddr = &readBuffer[0]; - return len; + int len = 0; + // Read data from socket + if (isActive) + len = recvfrom(socketDesc, BufferAddr, BufferLen, MSG_DONTWAIT, + (struct sockaddr *)&sockAddr, &addrlen); + if ((len > 0) && (BufferAddr[0] == 0x47)) { + return len; + } + else if (len > 3) { + // http://www.networksorcery.com/enp/rfc/rfc2250.txt + // version + unsigned int v = (BufferAddr[0] >> 6) & 0x03; + // extension bit + unsigned int x = (BufferAddr[0] >> 4) & 0x01; + // cscr count + unsigned int cc = BufferAddr[0] & 0x0F; + // payload type: MPEG2 TS = 33 + //unsigned int pt = readBuffer[1] & 0x7F; + // header lenght + unsigned int headerlen = (3 + cc) * sizeof(uint32_t); + // check if extension + if (x) { + // extension header length + unsigned int ehl = (((BufferAddr[headerlen + 2] & 0xFF) << 8) | (BufferAddr[headerlen + 3] & 0xFF)); + // update header length + headerlen += (ehl + 1) * sizeof(uint32_t); } - else if (len > 3) { - // http://www.networksorcery.com/enp/rfc/rfc2250.txt - // version - unsigned int v = (readBuffer[0] >> 6) & 0x03; - // extension bit - unsigned int x = (readBuffer[0] >> 4) & 0x01; - // cscr count - unsigned int cc = readBuffer[0] & 0x0F; - // payload type: MPEG2 TS = 33 - //unsigned int pt = readBuffer[1] & 0x7F; - // header lenght - unsigned int headerlen = (3 + cc) * sizeof(uint32_t); - // check if extension - if (x) { - // extension header length - unsigned int ehl = (((readBuffer[headerlen + 2] & 0xFF) << 8) | (readBuffer[headerlen + 3] & 0xFF)); - // update header length - headerlen += (ehl + 1) * sizeof(uint32_t); - } - // Check that rtp is version 2 and payload contains multiple of TS packet data - if ((v == 2) && (((len - headerlen) % TS_SIZE) == 0) && (readBuffer[headerlen] == 0x47)) { - // Set argument point to payload in read buffer - *BufferAddr = &readBuffer[headerlen]; - return (len - headerlen); - } + // Check that rtp is version 2 and payload contains multiple of TS packet data + if ((v == 2) && (((len - headerlen) % TS_SIZE) == 0) && (BufferAddr[headerlen] == 0x47)) { + // Set argument point to payload in read buffer + memmove(BufferAddr, &BufferAddr[headerlen], (len - headerlen)); + return (len - headerlen); } } return 0; @@ -180,7 +160,7 @@ bool cIptvTcpSocket::OpenSocket(const int Port) return cIptvSocket::OpenSocket(Port, false); } -int cIptvTcpSocket::Read(unsigned char* *BufferAddr) +int cIptvTcpSocket::Read(unsigned char* BufferAddr, unsigned int BufferLen) { //debug("cIptvTcpSocket::Read()\n"); // Error out if socket not initialized @@ -189,19 +169,9 @@ int cIptvTcpSocket::Read(unsigned char* *BufferAddr) return -1; } socklen_t addrlen = sizeof(sockAddr); - // Set argument point to read buffer - *BufferAddr = readBuffer; - // Wait 500ms for data - int retval = select_single_desc(socketDesc, 500000, false); - // Check if error - if (retval < 0) - return retval; - // Check if data available - else if (retval) { - // Read data from socket - if (isActive) - return recvfrom(socketDesc, readBuffer, readBufferLen, MSG_DONTWAIT, - (struct sockaddr *)&sockAddr, &addrlen); - } + // Read data from socket + if (isActive) + return recvfrom(socketDesc, BufferAddr, BufferLen, MSG_DONTWAIT, + (struct sockaddr *)&sockAddr, &addrlen); return 0; } diff --git a/socket.h b/socket.h index 80926ea..5323f34 100644 --- a/socket.h +++ b/socket.h @@ -14,8 +14,6 @@ class cIptvSocket { protected: int socketPort; int socketDesc; - unsigned char* readBuffer; - unsigned int readBufferLen; struct sockaddr_in sockAddr; bool isActive; @@ -32,7 +30,7 @@ class cIptvUdpSocket : public cIptvSocket { public: cIptvUdpSocket(); virtual ~cIptvUdpSocket(); - virtual int Read(unsigned char* *BufferAddr); + virtual int Read(unsigned char* BufferAddr, unsigned int BufferLen); bool OpenSocket(const int Port); }; @@ -40,7 +38,7 @@ class cIptvTcpSocket : public cIptvSocket { public: cIptvTcpSocket(); virtual ~cIptvTcpSocket(); - virtual int Read(unsigned char* *BufferAddr); + virtual int Read(unsigned char* BufferAddr, unsigned int BufferLen); bool OpenSocket(const int Port); }; diff --git a/streamer.c b/streamer.c index cf28660..966c922 100644 --- a/streamer.c +++ b/streamer.c @@ -11,13 +11,19 @@ #include "common.h" #include "streamer.h" -cIptvStreamer::cIptvStreamer(cRingBufferLinear* RingBuffer, cMutex* Mutex) +cIptvStreamer::cIptvStreamer(cRingBufferLinear* RingBuffer, unsigned int PacketLen) : cThread("IPTV streamer"), ringBuffer(RingBuffer), - mutex(Mutex), + packetBufferLen(PacketLen), protocol(NULL) { - debug("cIptvStreamer::cIptvStreamer()\n"); + debug("cIptvStreamer::cIptvStreamer(%d)\n", packetBufferLen); + // Allocate packet buffer + packetBuffer = MALLOC(unsigned char, packetBufferLen); + if (packetBuffer) + memset(packetBuffer, 0, packetBufferLen); + else + error("ERROR: MALLOC() failed for packet buffer"); } cIptvStreamer::~cIptvStreamer() @@ -25,32 +31,31 @@ cIptvStreamer::~cIptvStreamer() debug("cIptvStreamer::~cIptvStreamer()\n"); // Close the protocol Close(); + // Free allocated memory + free(packetBuffer); } void cIptvStreamer::Action(void) { debug("cIptvStreamer::Action(): Entering\n"); + // Increase priority + //SetPriority(-1); // Do the thread loop - while (Running()) { - if (ringBuffer && mutex && protocol && ringBuffer->Free()) { - unsigned char *buffer = NULL; - mutex->Lock(); - int length = protocol->Read(&buffer); - if (length >= 0) { - AddStreamerStatistic(length); - int p = ringBuffer->Put(buffer, length); - if (p != length && Running()) - ringBuffer->ReportOverflow(length - p); - mutex->Unlock(); - } - else { - mutex->Unlock(); - sleep.Wait(100); // to reduce cpu load - } - } - else - sleep.Wait(100); // and avoid busy loop - } + while (packetBuffer && Running()) { + int length = -1; + if (protocol) + length = protocol->Read(packetBuffer, packetBufferLen); + if (length >= 0) { + AddStreamerStatistic(length); + if (ringBuffer) { + int p = ringBuffer->Put(packetBuffer, length); + if (p != length) + ringBuffer->ReportOverflow(length - p); + } + } + else + sleep.Wait(10); // to avoid busy loop and reduce cpu load + } debug("cIptvStreamer::Action(): Exiting\n"); } @@ -72,15 +77,9 @@ bool cIptvStreamer::Close(void) sleep.Signal(); if (Running()) Cancel(3); - // Close the protocol. A mutex should be taken here to avoid a race condition - // where thread Action() may be in the process of accessing the protocol. - // Taking a mutex serializes the Close() and Action() -calls. - if (mutex) - mutex->Lock(); + // Close the protocol if (protocol) protocol->Close(); - if (mutex) - mutex->Unlock(); return true; } diff --git a/streamer.h b/streamer.h index b8bbb34..8c53539 100644 --- a/streamer.h +++ b/streamer.h @@ -19,14 +19,13 @@ class cIptvStreamer : public cThread, public cIptvStreamerStatistics { private: cRingBufferLinear* ringBuffer; - cMutex* mutex; cCondWait sleep; - unsigned char* readBuffer; - unsigned int readBufferLen; + unsigned char* packetBuffer; + unsigned int packetBufferLen; cIptvProtocolIf* protocol; public: - cIptvStreamer(cRingBufferLinear* RingBuffer, cMutex* Mutex); + cIptvStreamer(cRingBufferLinear* RingBuffer, unsigned int PacketLen); virtual ~cIptvStreamer(); virtual void Action(void); bool Set(const char* Location, const int Parameter, const int Index, cIptvProtocolIf* Protocol);