From 5b1af5ba299db1eca7782d55ff5054a4c2ab848e Mon Sep 17 00:00:00 2001 From: Rolf Ahrenberg Date: Fri, 4 Apr 2014 00:56:00 +0300 Subject: [PATCH] Refactored the section filtering. --- HISTORY | 5 +++ common.c | 24 ------------- common.h | 1 - po/de_DE.po | 6 ++-- po/fi_FI.po | 6 ++-- satip.c | 2 +- sectionfilter.c | 89 +++++++++++++++++++++---------------------------- sectionfilter.h | 21 ++++-------- 8 files changed, 57 insertions(+), 97 deletions(-) diff --git a/HISTORY b/HISTORY index 7c34a51..ee412cf 100644 --- a/HISTORY +++ b/HISTORY @@ -33,3 +33,8 @@ VDR Plugin 'satip' Revision History - Added a check to write new sections only if there is no data in the read socket. - Fixed keepalive heartbeat again. + +2014-04-05: Version 0.2.2 + +- Fixed the default keepalive interval. +- Refactored the section filtering. diff --git a/common.c b/common.c index bd5ff70..41dc4c2 100644 --- a/common.c +++ b/common.c @@ -57,30 +57,6 @@ char *StripTags(char *strP) return NULL; } -int select_single_desc(int descriptorP, const int msP, const bool selectWriteP) -{ - // Wait for data - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = msP * 1000L; - // Use select - fd_set infd; - fd_set outfd; - fd_set errfd; - FD_ZERO(&infd); - FD_ZERO(&outfd); - FD_ZERO(&errfd); - FD_SET(descriptorP, &errfd); - if (selectWriteP) - FD_SET(descriptorP, &outfd); - else - FD_SET(descriptorP, &infd); - int retval = select(descriptorP + 1, &infd, &outfd, &errfd, &tv); - // Check if error - ERROR_IF_RET(retval < 0, "select()", return retval); - return retval; -} - cString ChangeCase(const cString &strP, bool upperP) { cString res(strP); diff --git a/common.h b/common.h index 50cae19..7c51e91 100644 --- a/common.h +++ b/common.h @@ -93,7 +93,6 @@ uint16_t ts_pid(const uint8_t *bufP); uint8_t payload(const uint8_t *bufP); const char *id_pid(const u_short pidP); char *StripTags(char *strP); -int select_single_desc(int descriptorP, const int msP, const bool selectWriteP); cString ChangeCase(const cString &strP, bool upperP); struct section_filter_table_type { diff --git a/po/de_DE.po b/po/de_DE.po index 3f9adff..17ce275 100644 --- a/po/de_DE.po +++ b/po/de_DE.po @@ -5,10 +5,10 @@ # msgid "" msgstr "" -"Project-Id-Version: vdr-satip 0.2.1\n" +"Project-Id-Version: vdr-satip 0.2.2\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2014-04-01 04:01+0200\n" -"PO-Revision-Date: 2014-04-01 04:01+0200\n" +"POT-Creation-Date: 2014-04-05 04:05+0200\n" +"PO-Revision-Date: 2014-04-05 04:05+0200\n" "Last-Translator: Frank Neumann \n" "Language-Team: German \n" "Language: de\n" diff --git a/po/fi_FI.po b/po/fi_FI.po index 4826776..6a0f595 100644 --- a/po/fi_FI.po +++ b/po/fi_FI.po @@ -5,10 +5,10 @@ # msgid "" msgstr "" -"Project-Id-Version: vdr-satip 0.2.1\n" +"Project-Id-Version: vdr-satip 0.2.2\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2014-04-01 04:01+0200\n" -"PO-Revision-Date: 2014-04-01 04:01+0200\n" +"POT-Creation-Date: 2014-04-05 04:05+0200\n" +"PO-Revision-Date: 2014-04-05 04:05+0200\n" "Last-Translator: Rolf Ahrenberg\n" "Language-Team: Finnish \n" "Language: fi\n" diff --git a/satip.c b/satip.c index 1764293..0933c1d 100644 --- a/satip.c +++ b/satip.c @@ -21,7 +21,7 @@ #define GITVERSION "" #endif - const char VERSION[] = "0.2.1" GITVERSION; + const char VERSION[] = "0.2.2" GITVERSION; static const char DESCRIPTION[] = trNOOP("SAT>IP Devices"); class cPluginSatip : public cPlugin { diff --git a/sectionfilter.c b/sectionfilter.c index 5c51631..6b3ae00 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -17,6 +17,7 @@ cSatipSectionFilter::cSatipSectionFilter(int deviceIndexP, uint16_t pidP, uint8_ secLenM(0), tsFeedpM(0), pidM(pidP), + ringBufferM(new cRingBufferFrame(eDmxMaxSectionCount * eDmxMaxSectionSize)), deviceIndexM(deviceIndexP) { //debug("cSatipSectionFilter::%s(%d, %d)", __FUNCTION__, deviceIndexM, pidM); @@ -48,7 +49,7 @@ cSatipSectionFilter::cSatipSectionFilter(int deviceIndexP, uint16_t pidP, uint8_ // Create sockets socketM[0] = socketM[1] = -1; - if (socketpair(AF_UNIX, SOCK_DGRAM, 0, socketM) != 0) { + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, socketM) != 0) { char tmp[64]; error("Opening section filter sockets failed (device=%d pid=%d): %s", deviceIndexM, pidM, strerror_r(errno, tmp, sizeof(tmp))); } @@ -70,6 +71,7 @@ cSatipSectionFilter::~cSatipSectionFilter() if (tmp >= 0) close(tmp); secBufM = NULL; + DELETENULL(ringBufferM); } inline uint16_t cSatipSectionFilter::GetLength(const uint8_t *dataP) @@ -99,21 +101,8 @@ int cSatipSectionFilter::Filter(void) if (doneqM && !neq) return 0; - // There is no data in the read socket, more can be written - if ((secLenM > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) { - for (i = 0; i < eWriteMaxRetries; ++i) { - if (select_single_desc(socketM[0], 10, false)) - continue; - ssize_t len = write(socketM[1], secBufM, secLenM); - ERROR_IF(len < 0, "write()"); - // Update statistics - if (len >= 0) - AddSectionStatistic(len, 1); - break; - } - if (i >= eWriteMaxRetries) - debug("Skipped section write (%d bytes)", secLenM); - } + if (ringBufferM && (secLenM > 0)) + ringBufferM->Put(new cFrame(secBufM, secLenM)); } return 0; } @@ -219,13 +208,31 @@ void cSatipSectionFilter::Process(const uint8_t* dataP) } } +bool cSatipSectionFilter::Send(void) +{ + bool result = false; + cFrame *section = ringBufferM->Get(); + if (section) { + uchar *data = section->Data(); + int count = section->Count(); + if (data && (count > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) { + ssize_t len = send(socketM[1], data, count, MSG_EOR); + ERROR_IF(len < 0 && errno != EAGAIN, "send()"); + if (len > 0) { + ringBufferM->Drop(section); + // Update statistics + AddSectionStatistic(len, 1); + result = true; + } + } + } + return result; +} + cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP) -: -#ifdef USE_THREADED_SECTIONFILTER - cThread("SAT>IP section handler", true), +: cThread("SAT>IP section handler"), ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("SAT>IP SECTION HANDLER %d", deviceIndexP))), -#endif mutexM(), deviceIndexM(deviceIndexP) { @@ -234,7 +241,6 @@ cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigne // Initialize filter pointers memset(filtersM, 0, sizeof(filtersM)); -#ifdef USE_THREADED_SECTIONFILTER // Create input buffer if (ringBufferM) { ringBufferM->SetTimeouts(100, 100); @@ -242,19 +248,17 @@ cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigne } else error("Failed to allocate buffer for section filter handler (device=%d): ", deviceIndexM); + Start(); -#endif } cSatipSectionFilterHandler::~cSatipSectionFilterHandler() { debug("cSatipSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); -#ifdef USE_THREADED_SECTIONFILTER // Stop thread if (Running()) Cancel(3); DELETE_POINTER(ringBufferM); -#endif // Destroy all filters cMutexLock MutexLock(&mutexM); @@ -262,13 +266,22 @@ cSatipSectionFilterHandler::~cSatipSectionFilterHandler() Delete(i); } -#ifdef USE_THREADED_SECTIONFILTER void cSatipSectionFilterHandler::Action(void) { debug("cSatipSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM); bool processed = false; // Do the thread loop while (Running()) { + // Send demuxed section packets through all filters + bool retry = false; + mutexM.Lock(); + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i] && filtersM[i]->Send()) + retry = true; + } + mutexM.Unlock(); + if (retry) + continue; // Read one TS packet if (ringBufferM) { int len = 0; @@ -304,7 +317,6 @@ void cSatipSectionFilterHandler::Action(void) } debug("cSatipSectionFilterHandler::%s(%d): exiting", __FUNCTION__, deviceIndexM); } -#endif cString cSatipSectionFilterHandler::GetInformation(void) { @@ -405,35 +417,10 @@ int cSatipSectionFilterHandler::GetPid(int handleP) void cSatipSectionFilterHandler::Write(uchar *bufferP, int lengthP) { //debug("cSatipSectionFilterHandler::%s(%d): length=%d", __FUNCTION__, deviceIndexM, lengthP); -#ifdef USE_THREADED_SECTIONFILTER // Fill up the buffer if (ringBufferM) { int len = ringBufferM->Put(bufferP, lengthP); if (len != lengthP) ringBufferM->ReportOverflow(lengthP - len); } -#else - // Lock - cMutexLock MutexLock(&mutexM); - uchar *p = bufferP; - int len = lengthP; - // Process TS packets through all filters - while (p && (len >= TS_SIZE)) { - if (*p != TS_SYNC_BYTE) { - for (int i = 1; i < len; ++i) { - if (p[i] == TS_SYNC_BYTE) { - p += i; - len -= i; - break; - } - } - } - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (filtersM[i]) - filtersM[i]->Process(p); - } - p += TS_SIZE; - len -= TS_SIZE; - } -#endif } diff --git a/sectionfilter.h b/sectionfilter.h index 7ebb116..e25c897 100644 --- a/sectionfilter.h +++ b/sectionfilter.h @@ -8,21 +8,16 @@ #ifndef __SATIP_SECTIONFILTER_H #define __SATIP_SECTIONFILTER_H -#ifdef __FreeBSD__ -#include -#endif // __FreeBSD__ #include #include "common.h" #include "statistics.h" -#define USE_THREADED_SECTIONFILTER - class cSatipSectionFilter : public cSatipSectionStatistics { private: enum { - eWriteMaxRetries = 20, eDmxMaxFilterSize = 18, + eDmxMaxSectionCount = 64, eDmxMaxSectionSize = 4096, eDmxMaxSectionFeedSize = (eDmxMaxSectionSize + TS_SIZE) }; @@ -38,6 +33,7 @@ private: uint16_t tsFeedpM; uint16_t pidM; + cRingBufferFrame *ringBufferM; int deviceIndexM; int socketM[2]; @@ -59,23 +55,17 @@ public: cSatipSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP); virtual ~cSatipSectionFilter(); void Process(const uint8_t* dataP); + bool Send(void); int GetFd(void) { return socketM[0]; } uint16_t GetPid(void) const { return pidM; } }; -#ifdef USE_THREADED_SECTIONFILTER class cSatipSectionFilterHandler : public cThread { -protected: - virtual void Action(void); -private: - cRingBufferLinear *ringBufferM; -#else -class cSatipSectionFilterHandler { -#endif private: enum { eMaxSecFilterCount = 32 }; + cRingBufferLinear *ringBufferM; cMutex mutexM; int deviceIndexM; cSatipSectionFilter *filtersM[eMaxSecFilterCount]; @@ -83,6 +73,9 @@ private: bool Delete(unsigned int indexP); bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const; +protected: + virtual void Action(void); + public: cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP); virtual ~cSatipSectionFilterHandler();