From 7b58d9f28ffe19d28c7cce258c05e33099be3934 Mon Sep 17 00:00:00 2001 From: Rolf Ahrenberg Date: Sun, 30 Mar 2014 22:20:56 +0300 Subject: [PATCH] Added a check to write new sections only if there is no data in the read socket. --- HISTORY | 2 ++ common.c | 24 +++++++++++++++ common.h | 1 + sectionfilter.c | 81 ++++++++++++++++++++++++++++++++++--------------- sectionfilter.h | 23 ++++++++------ 5 files changed, 98 insertions(+), 33 deletions(-) diff --git a/HISTORY b/HISTORY index f9867f4..0bb60df 100644 --- a/HISTORY +++ b/HISTORY @@ -30,3 +30,5 @@ VDR Plugin 'satip' Revision History - Changed implementation to report about RTP packet errors on 5 minutes interval only. +- Added a check to write new sections only if there + is no data in the read socket. diff --git a/common.c b/common.c index 41dc4c2..bd5ff70 100644 --- a/common.c +++ b/common.c @@ -57,6 +57,30 @@ char *StripTags(char *strP) return NULL; } +int select_single_desc(int descriptorP, const int msP, const bool selectWriteP) +{ + // Wait for data + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = msP * 1000L; + // Use select + fd_set infd; + fd_set outfd; + fd_set errfd; + FD_ZERO(&infd); + FD_ZERO(&outfd); + FD_ZERO(&errfd); + FD_SET(descriptorP, &errfd); + if (selectWriteP) + FD_SET(descriptorP, &outfd); + else + FD_SET(descriptorP, &infd); + int retval = select(descriptorP + 1, &infd, &outfd, &errfd, &tv); + // Check if error + ERROR_IF_RET(retval < 0, "select()", return retval); + return retval; +} + cString ChangeCase(const cString &strP, bool upperP) { cString res(strP); diff --git a/common.h b/common.h index 7c51e91..50cae19 100644 --- a/common.h +++ b/common.h @@ -93,6 +93,7 @@ uint16_t ts_pid(const uint8_t *bufP); uint8_t payload(const uint8_t *bufP); const char *id_pid(const u_short pidP); char *StripTags(char *strP); +int select_single_desc(int descriptorP, const int msP, const bool selectWriteP); cString ChangeCase(const cString &strP, bool upperP); struct section_filter_table_type { diff --git a/sectionfilter.c b/sectionfilter.c index 047d9e3..5c51631 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -100,11 +100,19 @@ int cSatipSectionFilter::Filter(void) return 0; // There is no data in the read socket, more can be written - if ((socketM[0] >= 0) && (socketM[1] >= 0) /*&& !select_single_desc(socketM[0], 0, false)*/) { - ssize_t len = write(socketM[1], secBufM, secLenM); - ERROR_IF(len < 0, "write()"); - // Update statistics - AddSectionStatistic(len, 1); + if ((secLenM > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) { + for (i = 0; i < eWriteMaxRetries; ++i) { + if (select_single_desc(socketM[0], 10, false)) + continue; + ssize_t len = write(socketM[1], secBufM, secLenM); + ERROR_IF(len < 0, "write()"); + // Update statistics + if (len >= 0) + AddSectionStatistic(len, 1); + break; + } + if (i >= eWriteMaxRetries) + debug("Skipped section write (%d bytes)", secLenM); } } return 0; @@ -213,17 +221,20 @@ void cSatipSectionFilter::Process(const uint8_t* dataP) cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP) -: cThread("SAT>IP section handler", true), +: +#ifdef USE_THREADED_SECTIONFILTER + cThread("SAT>IP section handler", true), + ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("SAT>IP SECTION HANDLER %d", deviceIndexP))), +#endif mutexM(), - deviceIndexM(deviceIndexP), - processedM(false), - ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("SAT>IP SECTION HANDLER %d", deviceIndexP))) + deviceIndexM(deviceIndexP) { debug("cSatipSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); // Initialize filter pointers memset(filtersM, 0, sizeof(filtersM)); +#ifdef USE_THREADED_SECTIONFILTER // Create input buffer if (ringBufferM) { ringBufferM->SetTimeouts(100, 100); @@ -231,16 +242,19 @@ cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigne } else error("Failed to allocate buffer for section filter handler (device=%d): ", deviceIndexM); - Start(); +#endif } cSatipSectionFilterHandler::~cSatipSectionFilterHandler() { debug("cSatipSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); - Stop(); - +#ifdef USE_THREADED_SECTIONFILTER + // Stop thread + if (Running()) + Cancel(3); DELETE_POINTER(ringBufferM); +#endif // Destroy all filters cMutexLock MutexLock(&mutexM); @@ -248,26 +262,19 @@ cSatipSectionFilterHandler::~cSatipSectionFilterHandler() Delete(i); } -bool cSatipSectionFilterHandler::Stop(void) -{ - debug("cSatipSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM); - // Stop thread - if (Running()) - Cancel(3); - return true; -} - +#ifdef USE_THREADED_SECTIONFILTER void cSatipSectionFilterHandler::Action(void) { debug("cSatipSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM); + bool processed = false; // Do the thread loop while (Running()) { // Read one TS packet if (ringBufferM) { int len = 0; - if (processedM) { + if (processed) { ringBufferM->Del(TS_SIZE); - processedM = false; + processed = false; } uchar *p = ringBufferM->Get(len); if (p && (len >= TS_SIZE)) { @@ -289,7 +296,7 @@ void cSatipSectionFilterHandler::Action(void) filtersM[i]->Process(p); } mutexM.Unlock(); - processedM = true; + processed = true; continue; } } @@ -297,6 +304,7 @@ void cSatipSectionFilterHandler::Action(void) } debug("cSatipSectionFilterHandler::%s(%d): exiting", __FUNCTION__, deviceIndexM); } +#endif cString cSatipSectionFilterHandler::GetInformation(void) { @@ -397,10 +405,35 @@ int cSatipSectionFilterHandler::GetPid(int handleP) void cSatipSectionFilterHandler::Write(uchar *bufferP, int lengthP) { //debug("cSatipSectionFilterHandler::%s(%d): length=%d", __FUNCTION__, deviceIndexM, lengthP); +#ifdef USE_THREADED_SECTIONFILTER // Fill up the buffer if (ringBufferM) { int len = ringBufferM->Put(bufferP, lengthP); if (len != lengthP) ringBufferM->ReportOverflow(lengthP - len); } +#else + // Lock + cMutexLock MutexLock(&mutexM); + uchar *p = bufferP; + int len = lengthP; + // Process TS packets through all filters + while (p && (len >= TS_SIZE)) { + if (*p != TS_SYNC_BYTE) { + for (int i = 1; i < len; ++i) { + if (p[i] == TS_SYNC_BYTE) { + p += i; + len -= i; + break; + } + } + } + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i]) + filtersM[i]->Process(p); + } + p += TS_SIZE; + len -= TS_SIZE; + } +#endif } diff --git a/sectionfilter.h b/sectionfilter.h index 1ae6dd8..7ebb116 100644 --- a/sectionfilter.h +++ b/sectionfilter.h @@ -16,11 +16,14 @@ #include "common.h" #include "statistics.h" +#define USE_THREADED_SECTIONFILTER + class cSatipSectionFilter : public cSatipSectionStatistics { private: - enum dmx_limits { - eDmxMaxFilterSize = 18, - eDmxMaxSectionSize = 4096, + enum { + eWriteMaxRetries = 20, + eDmxMaxFilterSize = 18, + eDmxMaxSectionSize = 4096, eDmxMaxSectionFeedSize = (eDmxMaxSectionSize + TS_SIZE) }; @@ -60,27 +63,29 @@ public: uint16_t GetPid(void) const { return pidM; } }; +#ifdef USE_THREADED_SECTIONFILTER class cSatipSectionFilterHandler : public cThread { +protected: + virtual void Action(void); +private: + cRingBufferLinear *ringBufferM; +#else +class cSatipSectionFilterHandler { +#endif private: enum { eMaxSecFilterCount = 32 }; cMutex mutexM; int deviceIndexM; - bool processedM; - cRingBufferLinear *ringBufferM; cSatipSectionFilter *filtersM[eMaxSecFilterCount]; bool Delete(unsigned int indexP); bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const; -protected: - virtual void Action(void); - public: cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP); virtual ~cSatipSectionFilterHandler(); - bool Stop(void); cString GetInformation(void); int Open(u_short pidP, u_char tidP, u_char maskP); void Close(int handleP);