diff --git a/HISTORY b/HISTORY index e5a5291..12f5d72 100644 --- a/HISTORY +++ b/HISTORY @@ -204,3 +204,5 @@ VDR Plugin 'iptv' Revision History - Enabled I/O throttling and tweaked buffer timeouts. - Fixed a nasty network byte order bug. +- Fixed and refactored the section filtering code. +- Fixed a possible crash in the file protocol. diff --git a/Makefile b/Makefile index b282cfd..7f147b8 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,7 @@ PKGCFG = $(if $(VDRDIR),$(shell pkg-config --variable=$(1) $(VDRDIR)/vdr.pc),$(s LIBDIR = $(call PKGCFG,libdir) LOCDIR = $(call PKGCFG,locdir) PLGCFG = $(call PKGCFG,plgcfg) +CFGDIR = $(call PKGCFG,configdir) # TMPDIR ?= /tmp @@ -144,7 +145,11 @@ endif install-lib: $(SOFILE) install -D $^ $(DESTDIR)$(LIBDIR)/$^.$(APIVERSION) -install: install-lib install-i18n +install-conf: + @mkdir -p $(DESTDIR)$(CFGDIR)/plugins/$(PLUGIN) + @cp -pn $(PLUGIN)/* $(DESTDIR)$(CFGDIR)/plugins/$(PLUGIN)/ + +install: install-lib install-i18n install-conf dist: $(I18Npo) clean @-rm -rf $(TMPDIR)/$(ARCHIVE) diff --git a/README b/README index 855ad7c..4d85a63 100644 --- a/README +++ b/README @@ -41,14 +41,8 @@ MP3 radio streams, mms video streams and so on. Installation: -cd /put/your/path/here/VDR/PLUGINS/src tar -xzf /put/your/path/here/vdr-iptv-X.Y.Z.tgz -ln -s iptv-X.Y.Z iptv -cd /put/your/path/here/VDR -cp -R PLUGINS/src/iptv/iptv /path/to/vdrresource/plugins/ -make -make plugins -./vdr -P iptv +make -C iptv-X.Y.Z install Setup menu: diff --git a/device.c b/device.c index 4aa52ee..bbbfbd4 100644 --- a/device.c +++ b/device.c @@ -26,20 +26,21 @@ cIptvDevice::cIptvDevice(unsigned int indexP) bufsize -= (bufsize % TS_SIZE); isyslog("creating IPTV device %d (CardIndex=%d)", deviceIndexM, CardIndex()); tsBufferM = new cRingBufferLinear(bufsize + 1, TS_SIZE, false, - *cString::sprintf("IPTV %d", deviceIndexM)); - tsBufferM->SetTimeouts(100, 100); - tsBufferM->SetIoThrottle(); + *cString::sprintf("IPTV TS %d", deviceIndexM)); + if (tsBufferM) { + tsBufferM->SetTimeouts(100, 100); + tsBufferM->SetIoThrottle(); + pIptvStreamerM = new cIptvStreamer(*this, tsBufferM->Free()); + } ResetBuffering(); pUdpProtocolM = new cIptvProtocolUdp(); pCurlProtocolM = new cIptvProtocolCurl(); pHttpProtocolM = new cIptvProtocolHttp(); pFileProtocolM = new cIptvProtocolFile(); pExtProtocolM = new cIptvProtocolExt(); - pIptvStreamerM = new cIptvStreamer(tsBufferM, (100 * TS_SIZE)); pPidScannerM = new cPidScanner(); - // Initialize filter pointers - memset(secFiltersM, 0, sizeof(secFiltersM)); // Start section handler for iptv device + pIptvSectionM = new cIptvSectionFilterHandler(deviceIndexM, bufsize + 1); StartSectionHandler(); // Sid scanner must be created after the section handler AttachFilter(pSidScannerM = new cSidScanner()); @@ -68,10 +69,7 @@ cIptvDevice::~cIptvDevice() DELETE_POINTER(pSidScannerM); // Stop section handler of iptv device StopSectionHandler(); - // Destroy all filters - cMutexLock MutexLock(&mutexM); - for (int i = 0; i < eMaxSecFilterCount; ++i) - DeleteFilter(i); + DELETE_POINTER(pIptvSectionM); // Close dvr fifo if (dvrFdM >= 0) { int fd = dvrFdM; @@ -136,20 +134,7 @@ cString cIptvDevice::GetPidsInformation(void) cString cIptvDevice::GetFiltersInformation(void) { //debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM); - unsigned int count = 0; - cString s("Active section filters:\n"); - // loop through active section filters - cMutexLock MutexLock(&mutexM); - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (secFiltersM[i]) { - s = cString::sprintf("%sFilter %d: %s Pid=0x%02X (%s)\n", *s, i, - *secFiltersM[i]->GetSectionStatistic(), secFiltersM[i]->GetPid(), - id_pid(secFiltersM[i]->GetPid())); - if (++count > IPTV_STATS_ACTIVE_FILTERS_COUNT) - break; - } - } - return s; + return cString::sprintf("Active section filters:\n%s", pIptvSectionM ? *pIptvSectionM->GetInformation() : ""); } cString cIptvDevice::GetInformation(unsigned int pageP) @@ -184,7 +169,7 @@ cString cIptvDevice::GetInformation(unsigned int pageP) cString cIptvDevice::DeviceType(void) const { - debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM); + //debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM); return "IPTV"; } @@ -300,80 +285,19 @@ bool cIptvDevice::SetPid(cPidHandle *handleP, int typeP, bool onP) return true; } -bool cIptvDevice::DeleteFilter(unsigned int indexP) -{ - if ((indexP < eMaxSecFilterCount) && secFiltersM[indexP]) { - //debug("cIptvDevice::%s(%d): index=%d", __FUNCTION__, deviceIndexM, indexP); - cIptvSectionFilter *tmp = secFiltersM[indexP]; - secFiltersM[indexP] = NULL; - delete tmp; - return true; - } - return false; -} - -bool cIptvDevice::IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const -{ - //debug("cIptvDevice::%s(%d): pid=%d tid=%02X mask=%02X", __FUNCTION__, deviceIndexM, pidP, tidP, maskP); - // loop through section filter table - for (int i = 0; i < SECTION_FILTER_TABLE_SIZE; ++i) { - int index = IptvConfig.GetDisabledFilters(i); - // Check if matches - if ((index >= 0) && (index < SECTION_FILTER_TABLE_SIZE) && - (section_filter_table[index].pid == pidP) && (section_filter_table[index].tid == tidP) && - (section_filter_table[index].mask == maskP)) { - //debug("cIptvDevice::%s(%d): found %s", __FUNCTION__, deviceIndexM, section_filter_table[index].description); - return true; - } - } - return false; -} - int cIptvDevice::OpenFilter(u_short pidP, u_char tidP, u_char maskP) { - // Check if disabled by user - if (!IptvConfig.GetSectionFiltering()) - return -1; - // Lock - cMutexLock MutexLock(&mutexM); - // Blacklist check, refuse certain filters - if (IsBlackListed(pidP, tidP, maskP)) - return -1; - // Search the next free filter slot - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (!secFiltersM[i]) { - //debug("cIptvDevice::%s(%d): pid=%d tid=%02X mask=%02X index=%d", __FUNCTION__, deviceIndexM, pidP, tidP, maskP, i); - secFiltersM[i] = new cIptvSectionFilter(deviceIndexM, pidP, tidP, maskP); - if (secFiltersM[i]) - return i; - break; - } - } - // No free filter slot found + debug("cIptvDevice::%s(%d): pid=%d tid=%d mask=%d", __FUNCTION__, deviceIndexM, pidP, tidP, maskP); + if (pIptvSectionM && IptvConfig.GetSectionFiltering()) + return pIptvSectionM->Open(pidP, tidP, maskP); return -1; } -int cIptvDevice::ReadFilter(int handleP, void *bufferP, size_t lengthP) -{ - // Lock - cMutexLock MutexLock(&mutexM); - // ... and load - if (secFiltersM[handleP]) { - return secFiltersM[handleP]->Read(bufferP, lengthP); - //debug("cIptvDevice::%s(%d): handle=%d length=%d", __FUNCTION__, deviceIndexM, handleP, lengthP); - } - return 0; -} - void cIptvDevice::CloseFilter(int handleP) { - // Lock - cMutexLock MutexLock(&mutexM); - // ... and load - if (secFiltersM[handleP]) { - //debug("cIptvDevice::%s(%d): handle=%d", __FUNCTION__, deviceIndexM, handleP); - DeleteFilter(handleP); - } + debug("cIptvDevice::%s(%d): handle=%d", __FUNCTION__, deviceIndexM, handleP); + if (pIptvSectionM) + pIptvSectionM->Close(handleP); } bool cIptvDevice::OpenDvr(void) @@ -431,6 +355,32 @@ bool cIptvDevice::IsBuffering(void) const return false; } +void cIptvDevice::WriteData(uchar *bufferP, int lengthP) +{ + //debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM); + int len; + // Send data to dvr fifo + if (dvrFdM >= 0) + len = write(dvrFdM, bufferP, lengthP); + // Fill up TS buffer + if (tsBufferM) { + len = tsBufferM->Put(bufferP, lengthP); + if (len != lengthP) + tsBufferM->ReportOverflow(lengthP - len); + } + // Filter the sections + if (pIptvSectionM && IptvConfig.GetSectionFiltering()) + pIptvSectionM->Write(bufferP, lengthP); +} + +unsigned int cIptvDevice::CheckData(void) +{ + //debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM); + if (tsBufferM) + return (unsigned int)tsBufferM->Free(); + return 0; +} + bool cIptvDevice::GetTSPacket(uchar *&Data) { //debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM); @@ -452,26 +402,13 @@ bool cIptvDevice::GetTSPacket(uchar *&Data) } } tsBufferM->Del(Count); - error("Skipped %d bytes to sync on TS packet\n", Count); + error("Skipped %d bytes to sync on TS packet", Count); return false; } isPacketDeliveredM = true; Data = p; // Update pid statistics AddPidStatistic(ts_pid(p), payload(p)); - // Send data also to dvr fifo - if (dvrFdM >= 0) - Count = (int)write(dvrFdM, p, TS_SIZE); - // Analyze incomplete streams with built-in pid analyzer - if (pidScanEnabledM && pPidScannerM) - pPidScannerM->Process(p); - // Lock - cMutexLock MutexLock(&mutexM); - // Run the data through all filters - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (secFiltersM[i]) - secFiltersM[i]->Process(p); - } return true; } } diff --git a/device.h b/device.h index d247330..705f131 100644 --- a/device.h +++ b/device.h @@ -10,6 +10,7 @@ #include #include "common.h" +#include "deviceif.h" #include "protocoludp.h" #include "protocolcurl.h" #include "protocolhttp.h" @@ -21,7 +22,7 @@ #include "sidscanner.h" #include "statistics.h" -class cIptvDevice : public cDevice, public cIptvPidStatistics, public cIptvBufferStatistics { +class cIptvDevice : public cDevice, public cIptvPidStatistics, public cIptvBufferStatistics, public cIptvDeviceIf { // static ones public: static unsigned int deviceCount; @@ -31,9 +32,6 @@ public: // private parts private: - enum { - eMaxSecFilterCount = 32 - }; unsigned int deviceIndexM; int dvrFdM; bool isPacketDeliveredM; @@ -49,10 +47,10 @@ private: cIptvProtocolFile *pFileProtocolM; cIptvProtocolExt *pExtProtocolM; cIptvStreamer *pIptvStreamerM; + cIptvSectionFilterHandler *pIptvSectionM; cPidScanner *pPidScannerM; cSidScanner *pSidScannerM; cMutex mutexM; - cIptvSectionFilter *secFiltersM[eMaxSecFilterCount]; // constructor & destructor public: @@ -74,8 +72,6 @@ private: private: void ResetBuffering(void); bool IsBuffering(void) const; - bool DeleteFilter(unsigned int indexP); - bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const; // for channel info public: @@ -104,7 +100,6 @@ protected: // for section filtering public: virtual int OpenFilter(u_short pidP, u_char tidP, u_char maskP); - virtual int ReadFilter(int handleP, void *bufferP, size_t lengthP); virtual void CloseFilter(int handleP); // for transponder lock @@ -114,6 +109,11 @@ public: // for common interface public: virtual bool HasInternalCam(void); + + // for internal device interface +public: + virtual void WriteData(u_char *bufferP, int lengthP); + virtual unsigned int CheckData(void); }; #endif // __IPTV_DEVICE_H diff --git a/deviceif.h b/deviceif.h new file mode 100644 index 0000000..8612aa7 --- /dev/null +++ b/deviceif.h @@ -0,0 +1,23 @@ +/* + * deviceif.h: IPTV plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#ifndef __IPTV_DEVICEIF_H +#define __IPTV_DEVICEIF_H + +class cIptvDeviceIf { +public: + cIptvDeviceIf() {} + virtual ~cIptvDeviceIf() {} + virtual void WriteData(u_char *bufferP, int lengthP) = 0; + virtual unsigned int CheckData(void) = 0; + +private: + cIptvDeviceIf(const cIptvDeviceIf&); + cIptvDeviceIf& operator=(const cIptvDeviceIf&); +}; + +#endif // __IPTV_DEVICEIF_H diff --git a/pidscanner.c b/pidscanner.c index 6c380ca..fcc22c7 100644 --- a/pidscanner.c +++ b/pidscanner.c @@ -56,7 +56,7 @@ void cPidScanner::Process(const uint8_t* bufP) // Verify TS packet if (bufP[0] != 0x47) { - error("Not TS packet: 0x%X\n", bufP[0]); + error("Not TS packet: 0x%02X", bufP[0]); return; } diff --git a/protocolcurl.c b/protocolcurl.c index c17fe2e..b1ac20b 100644 --- a/protocolcurl.c +++ b/protocolcurl.c @@ -9,14 +9,18 @@ #include "config.h" #include "protocolcurl.h" +#ifdef CURLOPT_RTSPHEADER +#define USE_RTSP +#endif + #define iptv_curl_easy_setopt(X, Y, Z) \ if ((res = curl_easy_setopt((X), (Y), (Z))) != CURLE_OK) { \ - error("curl_easy_setopt(%s, %s, %s) failed: %d\n", #X, #Y, #Z, res); \ + error("curl_easy_setopt(%s, %s, %s) failed: %d", #X, #Y, #Z, res); \ } #define iptv_curl_easy_perform(X) \ if ((res = curl_easy_perform((X))) != CURLE_OK) { \ - error("curl_easy_perform(%s) failed: %d\n", #X, res); \ + error("curl_easy_perform(%s) failed: %d", #X, res); \ } cIptvProtocolCurl::cIptvProtocolCurl() @@ -210,7 +214,7 @@ unsigned char *cIptvProtocolCurl::GetData(int &lenP) break; } } - error("IPTV skipped %d bytes to sync on TS packet\n", count); + error("IPTV skipped %d bytes to sync on TS packet", count); ringBufferM->Del(count); lenP = 0; return NULL; @@ -274,6 +278,7 @@ bool cIptvProtocolCurl::Connect() // Protocol specific initializations switch (modeM) { +#ifdef USE_RTSP case eModeRtsp: { cString uri, control, transport, range; @@ -317,7 +322,7 @@ bool cIptvProtocolCurl::Connect() iptv_curl_easy_perform(handleM); } break; - +#endif case eModeHttp: case eModeHttps: { @@ -366,6 +371,7 @@ bool cIptvProtocolCurl::Disconnect() if (handleM && multiM) { // Mode specific tricks switch (modeM) { +#ifdef USE_RTSP case eModeRtsp: { CURLcode res = CURLE_OK; @@ -377,7 +383,7 @@ bool cIptvProtocolCurl::Disconnect() rtspControlM = ""; } break; - +#endif case eModeHttp: case eModeHttps: case eModeFile: @@ -424,6 +430,7 @@ int cIptvProtocolCurl::Read(unsigned char* bufferAddrP, unsigned int bufferLenP) // Fill up the buffer if (handleM && multiM) { switch (modeM) { +#ifdef USE_RTSP case eModeRtsp: { cMutexLock MutexLock(&mutexM); @@ -433,7 +440,7 @@ int cIptvProtocolCurl::Read(unsigned char* bufferAddrP, unsigned int bufferLenP) // @todo - How to detect eof? } break; - +#endif case eModeFile: case eModeHttp: case eModeHttps: diff --git a/protocolfile.c b/protocolfile.c index 5faf941..885d137 100644 --- a/protocolfile.c +++ b/protocolfile.c @@ -60,7 +60,7 @@ int cIptvProtocolFile::Read(unsigned char* bufferAddrP, unsigned int bufferLenP) { //debug("cIptvProtocolFile::%s()", __FUNCTION__); // Check errors - if (ferror(fileStreamM)) { + if (!fileStreamM || ferror(fileStreamM)) { debug("cIptvProtocolFile::%s(): stream error", __FUNCTION__); return -1; } diff --git a/protocolhttp.c b/protocolhttp.c index 995109f..5d85348 100644 --- a/protocolhttp.c +++ b/protocolhttp.c @@ -113,13 +113,13 @@ bool cIptvProtocolHttp::GetHeaderLine(char* destP, unsigned int destLenP, ++bufptr; // Check that buffer won't be exceeded if (recvLenP >= destLenP) { - error("Header wouldn't fit into buffer\n"); + error("Header wouldn't fit into buffer"); recvLenP = 0; return false; } } else { - error("No HTTP response received in 500ms\n"); + error("No HTTP response received in 500ms"); return false; } } @@ -143,14 +143,14 @@ bool cIptvProtocolHttp::ProcessHeaders(void) if (!GetHeaderLine(buf, sizeof(buf), lineLength)) return false; if (!responseFound && sscanf(buf, fmt, &version, &response) != 2) { - error("Expected HTTP header not found\n"); + error("Expected HTTP header not found"); continue; } else responseFound = true; // Allow only 'OK' and 'Partial Content' if ((response != 200) && (response != 206)) { - error("Invalid HTTP response (%d): %s\n", response, buf); + error("Invalid HTTP response (%d): %s", response, buf); return false; } } diff --git a/sectionfilter.c b/sectionfilter.c index 012e092..e6048a0 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -17,9 +17,9 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t secLenM(0), tsFeedpM(0), pidM(pidP), - devIdM(deviceIndexP) + deviceIndexM(deviceIndexP) { - //debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, devIdM, pidM); + //debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, deviceIndexM, pidM); int i; memset(secBufBaseM, 0, sizeof(secBufBaseM)); @@ -46,32 +46,32 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t } doneqM = doneq ? 1 : 0; - // Create filtering buffer - ringbufferM = new cRingBufferLinear(KILOBYTE(128), 0, false, *cString::sprintf("IPTV SECTION %d/%d", devIdM, pidM)); - if (ringbufferM) - ringbufferM->SetTimeouts(10, 10); - else - error("Failed to allocate buffer for section filter (device=%d pid=%d): ", devIdM, pidM); + // Create sockets + socketM[0] = socketM[1] = -1; + if (socketpair(AF_UNIX, SOCK_DGRAM, 0, socketM) != 0) { + char tmp[64]; + error("Opening section filter sockets failed (device=%d pid=%d): %s", deviceIndexM, pidM, strerror_r(errno, tmp, sizeof(tmp))); + } + else if ((fcntl(socketM[0], F_SETFL, O_NONBLOCK) != 0) || (fcntl(socketM[1], F_SETFL, O_NONBLOCK) != 0)) { + char tmp[64]; + error("Setting section filter socket to non-blocking mode failed (device=%d pid=%d): %s", deviceIndexM, pidM, strerror_r(errno, tmp, sizeof(tmp))); + } } cIptvSectionFilter::~cIptvSectionFilter() { - //debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, devIdM, pidM); - DELETE_POINTER(ringbufferM); + //debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, deviceIndexM, pidM); + int tmp = socketM[1]; + socketM[1] = -1; + if (tmp >= 0) + close(tmp); + tmp = socketM[0]; + socketM[0] = -1; + if (tmp >= 0) + close(tmp); secBufM = NULL; } -int cIptvSectionFilter::Read(void *Data, size_t Length) -{ - int count = 0; - uchar *p = ringbufferM->Get(count); - if (p && count > 0) { - memcpy(Data, p, count); - ringbufferM->Del(count); - } - return count; -} - inline uint16_t cIptvSectionFilter::GetLength(const uint8_t *dataP) { return (uint16_t)(3 + ((dataP[1] & 0x0f) << 8) + dataP[2]); @@ -99,10 +99,10 @@ int cIptvSectionFilter::Filter(void) if (doneqM && !neq) return 0; - if (ringbufferM) { - int len = ringbufferM->Put(secBufM, secLenM); - if (len != secLenM) - ringbufferM->ReportOverflow(secLenM - len); + // There is no data in the read socket, more can be written + if ((socketM[0] >= 0) && (socketM[1] >= 0) /*&& !select_single_desc(socketM[0], 0, false)*/) { + ssize_t len = write(socketM[1], secBufM, secLenM); + ERROR_IF(len < 0, "write()"); // Update statistics AddSectionStatistic(len, 1); } @@ -210,3 +210,175 @@ void cIptvSectionFilter::Process(const uint8_t* dataP) CopyDump(&dataP[p], count); } } + + +cIptvSectionFilterHandler::cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP) +: cThread("IPTV section handler", true), + mutexM(), + deviceIndexM(deviceIndexP), + processedM(false), + ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("IPTV SECTION HANDLER %d", deviceIndexP))) +{ + debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); + + // Initialize filter pointers + memset(filtersM, 0, sizeof(filtersM)); + + // Create input buffer + if (ringBufferM) { + ringBufferM->SetTimeouts(100, 100); + ringBufferM->SetIoThrottle(); + } + else + error("Failed to allocate buffer for section filter handler (device=%d): ", deviceIndexM); + + Start(); +} + +cIptvSectionFilterHandler::~cIptvSectionFilterHandler() +{ + debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); + // Stop thread + if (Running()) + Cancel(3); + + // Destroy all filters + cMutexLock MutexLock(&mutexM); + for (int i = 0; i < eMaxSecFilterCount; ++i) + Delete(i); +} + +void cIptvSectionFilterHandler::Action(void) +{ + debug("cIptvSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM); + // Do the thread loop + while (Running()) { + // Read one TS packet + if (ringBufferM) { + int len = 0; + if (processedM) { + ringBufferM->Del(TS_SIZE); + processedM = false; + } + uchar *p = ringBufferM->Get(len); + if (p && (len >= TS_SIZE)) { + if (*p != TS_SYNC_BYTE) { + for (int i = 1; i < len; ++i) { + if (p[i] == TS_SYNC_BYTE) { + len = i; + break; + } + } + ringBufferM->Del(len); + debug("cIptvSectionFilterHandler::%s(%d): Skipped %d bytes to sync on TS packet", __FUNCTION__, deviceIndexM, len); + continue; + } + // Process TS packet through all filters + mutexM.Lock(); + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i]) + filtersM[i]->Process(p); + } + mutexM.Unlock(); + processedM = true; + continue; + } + } + cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load + } + debug("cIptvSectionFilterHandler::%s(%d): exiting", __FUNCTION__, deviceIndexM); +} + +cString cIptvSectionFilterHandler::GetInformation(void) +{ + //debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); + // loop through active section filters + cMutexLock MutexLock(&mutexM); + cString s = ""; + unsigned int count = 0; + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i]) { + s = cString::sprintf("%sFilter %d: %s Pid=0x%02X (%s)\n", *s, i, + *filtersM[i]->GetSectionStatistic(), filtersM[i]->GetPid(), + id_pid(filtersM[i]->GetPid())); + if (++count > IPTV_STATS_ACTIVE_FILTERS_COUNT) + break; + } + } + return s; +} + +bool cIptvSectionFilterHandler::Delete(unsigned int indexP) +{ + //debug("cIptvSectionFilterHandler::%s(%d): index=%d", __FUNCTION__, deviceIndexM, indexP); + if ((indexP < eMaxSecFilterCount) && filtersM[indexP]) { + //debug("cIptvSectionFilterHandler::%s(%d): found %d", __FUNCTION__, deviceIndexM, indexP); + cIptvSectionFilter *tmp = filtersM[indexP]; + filtersM[indexP] = NULL; + delete tmp; + return true; + } + return false; +} + +bool cIptvSectionFilterHandler::IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const +{ + //debug("cIptvSectionFilterHandler::%s(%d): pid=%d tid=%02X mask=%02X", __FUNCTION__, deviceIndexM, pidP, tidP, maskP); + // loop through section filter table + for (int i = 0; i < SECTION_FILTER_TABLE_SIZE; ++i) { + int index = IptvConfig.GetDisabledFilters(i); + // Check if matches + if ((index >= 0) && (index < SECTION_FILTER_TABLE_SIZE) && + (section_filter_table[index].pid == pidP) && (section_filter_table[index].tid == tidP) && + (section_filter_table[index].mask == maskP)) { + //debug("cIptvSectionFilterHandler::%s(%d): found %s", __FUNCTION__, deviceIndexM, section_filter_table[index].description); + return true; + } + } + return false; +} + +int cIptvSectionFilterHandler::Open(u_short pidP, u_char tidP, u_char maskP) +{ + // Lock + cMutexLock MutexLock(&mutexM); + // Blacklist check, refuse certain filters + if (IsBlackListed(pidP, tidP, maskP)) + return -1; + // Search the next free filter slot + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (!filtersM[i]) { + //debug("cIptvSectionFilterHandler::%s(%d): pid=%d tid=%02X mask=%02X index=%d", __FUNCTION__, deviceIndexM, pidP, tidP, maskP, i); + filtersM[i] = new cIptvSectionFilter(deviceIndexM, pidP, tidP, maskP); + return filtersM[i]->GetFd(); + } + } + // No free filter slot found + return -1; +} + +void cIptvSectionFilterHandler::Close(int handleP) +{ + // Lock + cMutexLock MutexLock(&mutexM); + // Search the filter for deletion + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i] && (handleP == filtersM[i]->GetFd())) { + //debug(""cIptvSectionFilterHandler::%s(%d): handle=%d", __FUNCTION__, deviceIndex, handleP); + Delete(i); + break; + } + } +} + +void cIptvSectionFilterHandler::Write(uchar *bufferP, int lengthP) +{ + //debug("cIptvSectionFilterHandler::%s(%d): length=%d", __FUNCTION__, deviceIndexM, lengthP); + // Fill up the buffer + if (ringBufferM) { + int len = ringBufferM->Put(bufferP, lengthP); + if (len != lengthP) + ringBufferM->ReportOverflow(lengthP - len); + } +} + diff --git a/sectionfilter.h b/sectionfilter.h index da8e389..935ed5e 100644 --- a/sectionfilter.h +++ b/sectionfilter.h @@ -35,7 +35,8 @@ private: uint16_t tsFeedpM; uint16_t pidM; - int devIdM; + int deviceIndexM; + int socketM[2]; uint8_t filterValueM[DMX_MAX_FILTER_SIZE]; uint8_t filterMaskM[DMX_MAX_FILTER_SIZE]; @@ -44,8 +45,6 @@ private: uint8_t maskAndModeM[DMX_MAX_FILTER_SIZE]; uint8_t maskAndNotModeM[DMX_MAX_FILTER_SIZE]; - cRingBufferLinear *ringbufferM; - inline uint16_t GetLength(const uint8_t *dataP); void New(void); int Filter(void); @@ -57,8 +56,34 @@ public: cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP); virtual ~cIptvSectionFilter(); void Process(const uint8_t* dataP); - int Read(void *bufferP, size_t lengthP); + int GetFd(void) { return socketM[0]; } uint16_t GetPid(void) const { return pidM; } }; +class cIptvSectionFilterHandler : public cThread { +private: + enum { + eMaxSecFilterCount = 32 + }; + cMutex mutexM; + int deviceIndexM; + bool processedM; + cRingBufferLinear *ringBufferM; + cIptvSectionFilter *filtersM[eMaxSecFilterCount]; + + bool Delete(unsigned int indexP); + bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const; + +protected: + virtual void Action(void); + +public: + cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP); + virtual ~cIptvSectionFilterHandler(); + cString GetInformation(void); + int Open(u_short pidP, u_char tidP, u_char maskP); + void Close(int handleP); + void Write(u_char *bufferP, int lengthP); +}; + #endif // __IPTV_SECTIONFILTER_H diff --git a/sidscanner.c b/sidscanner.c index 0601c7a..27f3285 100644 --- a/sidscanner.c +++ b/sidscanner.c @@ -76,10 +76,10 @@ void cSidScanner::Process(u_short pidP, u_char tidP, const u_char *dataP, int le break; // default to the first one } if (nit.getNetworkId() != channelIdM.Nid()) { - debug("cSidScanner::%s(): nid=%d\n", __FUNCTION__, ts.getTransportStreamId()); + debug("cSidScanner::%s(): nid=%d", __FUNCTION__, ts.getTransportStreamId()); newNid = nit.getNetworkId(); } - nidFoundM = true; + nidFoundM = true; } } if ((newSid >= 0) || (newNid >= 0) || (newTid >= 0)) { diff --git a/socket.c b/socket.c index 8cd29fa..afbf28a 100644 --- a/socket.c +++ b/socket.c @@ -226,7 +226,7 @@ int cIptvUdpSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP) //debug("cIptvUdpSocket::%s()", __FUNCTION__); // Error out if socket not initialized if (socketDescM <= 0) { - error("Invalid socket in cIptvUdpSocket::%s()\n", __FUNCTION__); + error("Invalid socket in cIptvUdpSocket::%s()", __FUNCTION__); return -1; } int len = 0; @@ -359,7 +359,7 @@ int cIptvTcpSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP) //debug("cIptvTcpSocket::%s()", __FUNCTION__); // Error out if socket not initialized if (socketDescM <= 0) { - error("Invalid socket in cIptvTcpSocket::%s()\n", __FUNCTION__); + error("Invalid socket in cIptvTcpSocket::%s()", __FUNCTION__); return -1; } int len = 0; @@ -376,7 +376,7 @@ bool cIptvTcpSocket::ReadChar(char *bufferAddrP, unsigned int timeoutMsP) //debug("cIptvTcpSocket::%s()", __FUNCTION__); // Error out if socket not initialized if (socketDescM <= 0) { - error("Invalid socket in cIptvTcpSocket::%s()\n", __FUNCTION__); + error("Invalid socket in cIptvTcpSocket::%s()", __FUNCTION__); return false; } socklen_t addrlen = sizeof(sockAddrM); @@ -401,7 +401,7 @@ bool cIptvTcpSocket::Write(const char *bufferAddrP, unsigned int bufferLenP) //debug("cIptvTcpSocket::%s()", __FUNCTION__); // Error out if socket not initialized if (socketDescM <= 0) { - error("Invalid socket in cIptvTcpSocket::%s()\n", __FUNCTION__); + error("Invalid socket in cIptvTcpSocket::%s()", __FUNCTION__); return false; } ERROR_IF_RET(send(socketDescM, bufferAddrP, bufferLenP, 0) < 0, "send()", return false); diff --git a/source.c b/source.c index def10fd..cf078e2 100644 --- a/source.c +++ b/source.c @@ -120,7 +120,7 @@ bool cIptvTransponderParameters::Parse(const char *strP) if (found_s && found_p && found_f && found_u && found_a) result = true; else - error("Invalid channel parameters: %s\n", str); + error("Invalid channel parameters: %s", str); free(str); } diff --git a/statistics.c b/statistics.c index ff21b78..2d3a6bd 100644 --- a/statistics.c +++ b/statistics.c @@ -44,7 +44,7 @@ cString cIptvSectionStatistics::GetSectionStatistic() void cIptvSectionStatistics::AddSectionStatistic(long bytesP, long callsP) { - //debug("cIptvSectionStatistics::%s(%ld, %ld)", __FUNCTION__, bytesP, callsP); + //debug("cIptvSectionStatistics::%s(%ld, %ld)", __FUNCTION__, bytesP, callsP); cMutexLock MutexLock(&mutexM); filteredDataM += bytesP; numberOfCallsM += callsP; diff --git a/streamer.c b/streamer.c index 0c13c5e..0434336 100644 --- a/streamer.c +++ b/streamer.c @@ -5,16 +5,13 @@ * */ -#include -#include - #include "common.h" #include "streamer.h" -cIptvStreamer::cIptvStreamer(cRingBufferLinear* ringBufferP, unsigned int packetLenP) +cIptvStreamer::cIptvStreamer(cIptvDeviceIf &deviceP, unsigned int packetLenP) : cThread("IPTV streamer"), - ringBufferM(ringBufferP), sleepM(), + deviceM(&deviceP), packetBufferLenM(packetLenP), protocolM(NULL) { @@ -33,7 +30,6 @@ cIptvStreamer::~cIptvStreamer() // Close the protocol Close(); protocolM = NULL; - ringBufferM = NULL; // Free allocated memory free(packetBufferM); } @@ -46,16 +42,12 @@ void cIptvStreamer::Action(void) // Do the thread loop while (packetBufferM && Running()) { int length = -1; - unsigned int size = min((unsigned int)ringBufferM->Free(), packetBufferLenM); + unsigned int size = min(deviceM->CheckData(), packetBufferLenM); if (protocolM && (size > 0)) length = protocolM->Read(packetBufferM, size); if (length > 0) { AddStreamerStatistic(length); - if (ringBufferM) { - int p = ringBufferM->Put(packetBufferM, length); - if (p != length) - ringBufferM->ReportOverflow(length - p); - } + deviceM->WriteData(packetBufferM, length); } else sleepM.Wait(10); // to avoid busy loop and reduce cpu load diff --git a/streamer.h b/streamer.h index 9de2226..d43232b 100644 --- a/streamer.h +++ b/streamer.h @@ -11,15 +11,15 @@ #include #include -#include +#include "deviceif.h" #include "protocolif.h" #include "statistics.h" class cIptvStreamer : public cThread, public cIptvStreamerStatistics { private: - cRingBufferLinear* ringBufferM; cCondWait sleepM; + cIptvDeviceIf* deviceM; unsigned char* packetBufferM; unsigned int packetBufferLenM; cIptvProtocolIf* protocolM; @@ -28,7 +28,7 @@ protected: virtual void Action(void); public: - cIptvStreamer(cRingBufferLinear* ringBufferP, unsigned int packetLenP); + cIptvStreamer(cIptvDeviceIf &deviceP, unsigned int packetLenP); virtual ~cIptvStreamer(); bool Set(const char* locationP, const int parameterP, const int indexP, cIptvProtocolIf* protocolP); bool Open(void);