From 7e6b722747bc969d6e11a45985a728aa993d3fae Mon Sep 17 00:00:00 2001 From: Alexander Pipelka Date: Sun, 24 Sep 2017 20:29:51 +0200 Subject: [PATCH 1/7] Added transfer timeout for sectionfilter data. --- sectionfilter.c | 117 +++++++++++++++++++++++++++--------------------- sectionfilter.h | 6 ++- 2 files changed, 71 insertions(+), 52 deletions(-) diff --git a/sectionfilter.c b/sectionfilter.c index 1d17369..a77643d 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -212,27 +212,28 @@ void cSatipSectionFilter::Process(const uint8_t* dataP) } } -bool cSatipSectionFilter::Send(void) +void cSatipSectionFilter::Send(void) { - bool result = false; cFrame *section = ringBufferM->Get(); if (section) { uchar *data = section->Data(); int count = section->Count(); if (data && (count > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) { - ssize_t len = send(socketM[1], data, count, MSG_EOR); - ERROR_IF(len < 0 && errno != EAGAIN, "send()"); - if (len > 0) { - ringBufferM->Drop(section); - result = !!ringBufferM->Available(); + if (send(socketM[1], data, count, MSG_EOR) > 0) { // Update statistics - AddSectionStatistic(len, 1); + AddSectionStatistic(count, 1); } + else + esyslog("failed to send section data (%i bytes) to fd: %i (errno: %i)", count, socketM[1], errno); } + ringBufferM->Drop(section); } - return result; } +int cSatipSectionFilter::Available(void) const +{ + return ringBufferM->Available(); +} cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP) : cThread(cString::sprintf("SATIP#%d section handler", deviceIndexP)), @@ -270,54 +271,68 @@ cSatipSectionFilterHandler::~cSatipSectionFilterHandler() Delete(i); } +bool cSatipSectionFilterHandler::Send(void) +{ + // zero polling structures + memset(pollFdsM, 0, sizeof(pollFdsM)); + + // assemble all handlers to poll + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i] && filtersM[i]->Available() != 0) { + pollFdsM[i].fd = filtersM[i]->GetFd(); + pollFdsM[i].events = POLLOUT; + } + } + + // anyone ready for writing + if (poll(pollFdsM, eMaxSecFilterCount, 10) <= 0) + return false; + + // send data + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if(pollFdsM[i].revents & POLLOUT) + filtersM[i]->Send(); + } + return true; +} + void cSatipSectionFilterHandler::Action(void) { debug1("%s Entering [device %d]", __PRETTY_FUNCTION__, deviceIndexM); - bool processed = false; // Do the thread loop + uchar *p = NULL; while (Running()) { - // Send demuxed section packets through all filters - bool retry = false; - mutexM.Lock(); - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (filtersM[i] && filtersM[i]->Send()) - retry = true; - } - mutexM.Unlock(); - if (retry) - continue; - // Read one TS packet - if (ringBufferM) { - int len = 0; - if (processed) { - ringBufferM->Del(TS_SIZE); - processed = false; - } - uchar *p = ringBufferM->Get(len); - if (p && (len >= TS_SIZE)) { - if (*p != TS_SYNC_BYTE) { - for (int i = 1; i < len; ++i) { - if (p[i] == TS_SYNC_BYTE) { - len = i; - break; + int len = 0; + // Process all pending TS packets + while ((p = ringBufferM->Get(len)) != NULL) { + if (p && (len >= TS_SIZE)) { + if (*p != TS_SYNC_BYTE) { + for (int i = 1; i < len; ++i) { + if (p[i] == TS_SYNC_BYTE) { + len = i; + break; + } } - } - ringBufferM->Del(len); - debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM); - continue; - } - // Process TS packet through all filters - mutexM.Lock(); - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (filtersM[i]) - filtersM[i]->Process(p); - } - mutexM.Unlock(); - processed = true; - continue; - } - } - cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load + ringBufferM->Del(len); + debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM); + continue; + } + // Process TS packet through all filters + mutexM.Lock(); + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i]) + filtersM[i]->Process(p); + } + mutexM.Unlock(); + ringBufferM->Del(TS_SIZE); + } + } + + // Send demuxed section packets through all filters + mutexM.Lock(); + while (Send()) ; + mutexM.Unlock(); + //cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load } debug1("%s Exiting [device %d]", __PRETTY_FUNCTION__, deviceIndexM); } diff --git a/sectionfilter.h b/sectionfilter.h index 833511c..4d3195d 100644 --- a/sectionfilter.h +++ b/sectionfilter.h @@ -8,6 +8,7 @@ #ifndef __SATIP_SECTIONFILTER_H #define __SATIP_SECTIONFILTER_H +#include #include #include "common.h" @@ -55,9 +56,10 @@ public: cSatipSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP); virtual ~cSatipSectionFilter(); void Process(const uint8_t* dataP); - bool Send(void); + void Send(void); int GetFd(void) { return socketM[0]; } uint16_t GetPid(void) const { return pidM; } + int Available(void) const; }; class cSatipSectionFilterHandler : public cThread { @@ -69,9 +71,11 @@ private: cMutex mutexM; int deviceIndexM; cSatipSectionFilter *filtersM[eMaxSecFilterCount]; + struct pollfd pollFdsM[eMaxSecFilterCount]; bool Delete(unsigned int indexP); bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const; + bool Send(void); protected: virtual void Action(void); From 27e86dd3ea2f709ed2c2ef84e84199e680e94c73 Mon Sep 17 00:00:00 2001 From: Alexander Pipelka Date: Mon, 25 Sep 2017 20:32:51 +0200 Subject: [PATCH 2/7] Removed dead sleep code (comment) in cSatipSectionFilterHandler::Action(). --- sectionfilter.c | 1 - 1 file changed, 1 deletion(-) diff --git a/sectionfilter.c b/sectionfilter.c index a77643d..89e6a9b 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -332,7 +332,6 @@ void cSatipSectionFilterHandler::Action(void) mutexM.Lock(); while (Send()) ; mutexM.Unlock(); - //cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load } debug1("%s Exiting [device %d]", __PRETTY_FUNCTION__, deviceIndexM); } From e90926d5f61fee4fa3d1e6c2613ee2266795b531 Mon Sep 17 00:00:00 2001 From: Alexander Pipelka Date: Mon, 25 Sep 2017 20:34:38 +0200 Subject: [PATCH 3/7] Minor coding-style fix. --- sectionfilter.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sectionfilter.c b/sectionfilter.c index 89e6a9b..7b94a52 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -290,7 +290,7 @@ bool cSatipSectionFilterHandler::Send(void) // send data for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if(pollFdsM[i].revents & POLLOUT) + if (pollFdsM[i].revents & POLLOUT) filtersM[i]->Send(); } return true; From c4c2ba8d14ab7d6df91f494f5d592711f789d87e Mon Sep 17 00:00:00 2001 From: Alexander Pipelka Date: Mon, 25 Sep 2017 20:37:57 +0200 Subject: [PATCH 4/7] Defined section filter send timeout. --- sectionfilter.c | 2 +- sectionfilter.h | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sectionfilter.c b/sectionfilter.c index 7b94a52..8aa1e39 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -285,7 +285,7 @@ bool cSatipSectionFilterHandler::Send(void) } // anyone ready for writing - if (poll(pollFdsM, eMaxSecFilterCount, 10) <= 0) + if (poll(pollFdsM, eMaxSecFilterCount, eSecFilterSendTimeoutMs) <= 0) return false; // send data diff --git a/sectionfilter.h b/sectionfilter.h index 4d3195d..750538a 100644 --- a/sectionfilter.h +++ b/sectionfilter.h @@ -65,7 +65,8 @@ public: class cSatipSectionFilterHandler : public cThread { private: enum { - eMaxSecFilterCount = 32 + eMaxSecFilterCount = 32, + eSecFilterSendTimeoutMs = 10 }; cRingBufferLinear *ringBufferM; cMutex mutexM; From 5159508f2d78ef22fceddd45282138ffaf607a5e Mon Sep 17 00:00:00 2001 From: Alexander Pipelka Date: Mon, 25 Sep 2017 20:48:20 +0200 Subject: [PATCH 5/7] Use error() to display error messages. --- sectionfilter.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sectionfilter.c b/sectionfilter.c index 8aa1e39..3936372 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -223,8 +223,8 @@ void cSatipSectionFilter::Send(void) // Update statistics AddSectionStatistic(count, 1); } - else - esyslog("failed to send section data (%i bytes) to fd: %i (errno: %i)", count, socketM[1], errno); + else if (errno != EAGAIN) + error("failed to send section data (%i bytes) [device=%d]", count, deviceIndexM); } ringBufferM->Drop(section); } From ece52576dd3b437f177f7b0478df8c26e5e9f680 Mon Sep 17 00:00:00 2001 From: Alexander Pipelka Date: Tue, 26 Sep 2017 18:23:34 +0200 Subject: [PATCH 6/7] Changed include header. --- sectionfilter.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sectionfilter.h b/sectionfilter.h index 750538a..c0ea1d6 100644 --- a/sectionfilter.h +++ b/sectionfilter.h @@ -8,7 +8,7 @@ #ifndef __SATIP_SECTIONFILTER_H #define __SATIP_SECTIONFILTER_H -#include +#include #include #include "common.h" From 46a197d8f85b99cebc0145925cd760fc2fbd4692 Mon Sep 17 00:00:00 2001 From: Alexander Pipelka Date: Thu, 28 Sep 2017 10:25:16 +0200 Subject: [PATCH 7/7] Moved send loop one level down. --- sectionfilter.c | 41 +++++++++++++++++++++-------------------- sectionfilter.h | 2 +- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/sectionfilter.c b/sectionfilter.c index 3936372..026efbd 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -271,29 +271,30 @@ cSatipSectionFilterHandler::~cSatipSectionFilterHandler() Delete(i); } -bool cSatipSectionFilterHandler::Send(void) +void cSatipSectionFilterHandler::SendAll(void) { - // zero polling structures - memset(pollFdsM, 0, sizeof(pollFdsM)); + while (true) { + // zero polling structures + memset(pollFdsM, 0, sizeof(pollFdsM)); - // assemble all handlers to poll - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (filtersM[i] && filtersM[i]->Available() != 0) { - pollFdsM[i].fd = filtersM[i]->GetFd(); - pollFdsM[i].events = POLLOUT; - } - } + // assemble all handlers to poll + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i] && filtersM[i]->Available() != 0) { + pollFdsM[i].fd = filtersM[i]->GetFd(); + pollFdsM[i].events = POLLOUT; + } + } - // anyone ready for writing - if (poll(pollFdsM, eMaxSecFilterCount, eSecFilterSendTimeoutMs) <= 0) - return false; + // anyone ready for writing + if (poll(pollFdsM, eMaxSecFilterCount, eSecFilterSendTimeoutMs) <= 0) + return; - // send data - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (pollFdsM[i].revents & POLLOUT) - filtersM[i]->Send(); - } - return true; + // send data + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (pollFdsM[i].revents & POLLOUT) + filtersM[i]->Send(); + } + } } void cSatipSectionFilterHandler::Action(void) @@ -330,7 +331,7 @@ void cSatipSectionFilterHandler::Action(void) // Send demuxed section packets through all filters mutexM.Lock(); - while (Send()) ; + SendAll(); mutexM.Unlock(); } debug1("%s Exiting [device %d]", __PRETTY_FUNCTION__, deviceIndexM); diff --git a/sectionfilter.h b/sectionfilter.h index c0ea1d6..e5573d3 100644 --- a/sectionfilter.h +++ b/sectionfilter.h @@ -76,7 +76,7 @@ private: bool Delete(unsigned int indexP); bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const; - bool Send(void); + void SendAll(void); protected: virtual void Action(void);