diff --git a/sectionfilter.c b/sectionfilter.c index 1d17369..026efbd 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -212,27 +212,28 @@ void cSatipSectionFilter::Process(const uint8_t* dataP) } } -bool cSatipSectionFilter::Send(void) +void cSatipSectionFilter::Send(void) { - bool result = false; cFrame *section = ringBufferM->Get(); if (section) { uchar *data = section->Data(); int count = section->Count(); if (data && (count > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) { - ssize_t len = send(socketM[1], data, count, MSG_EOR); - ERROR_IF(len < 0 && errno != EAGAIN, "send()"); - if (len > 0) { - ringBufferM->Drop(section); - result = !!ringBufferM->Available(); + if (send(socketM[1], data, count, MSG_EOR) > 0) { // Update statistics - AddSectionStatistic(len, 1); + AddSectionStatistic(count, 1); } + else if (errno != EAGAIN) + error("failed to send section data (%i bytes) [device=%d]", count, deviceIndexM); } + ringBufferM->Drop(section); } - return result; } +int cSatipSectionFilter::Available(void) const +{ + return ringBufferM->Available(); +} cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP) : cThread(cString::sprintf("SATIP#%d section handler", deviceIndexP)), @@ -270,54 +271,68 @@ cSatipSectionFilterHandler::~cSatipSectionFilterHandler() Delete(i); } +void cSatipSectionFilterHandler::SendAll(void) +{ + while (true) { + // zero polling structures + memset(pollFdsM, 0, sizeof(pollFdsM)); + + // assemble all handlers to poll + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i] && filtersM[i]->Available() != 0) { + pollFdsM[i].fd = filtersM[i]->GetFd(); + pollFdsM[i].events = POLLOUT; + } + } + + // anyone ready for writing + if (poll(pollFdsM, eMaxSecFilterCount, eSecFilterSendTimeoutMs) <= 0) + return; + + // send data + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (pollFdsM[i].revents & POLLOUT) + filtersM[i]->Send(); + } + } +} + void cSatipSectionFilterHandler::Action(void) { debug1("%s Entering [device %d]", __PRETTY_FUNCTION__, deviceIndexM); - bool processed = false; // Do the thread loop + uchar *p = NULL; while (Running()) { - // Send demuxed section packets through all filters - bool retry = false; - mutexM.Lock(); - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (filtersM[i] && filtersM[i]->Send()) - retry = true; - } - mutexM.Unlock(); - if (retry) - continue; - // Read one TS packet - if (ringBufferM) { - int len = 0; - if (processed) { - ringBufferM->Del(TS_SIZE); - processed = false; - } - uchar *p = ringBufferM->Get(len); - if (p && (len >= TS_SIZE)) { - if (*p != TS_SYNC_BYTE) { - for (int i = 1; i < len; ++i) { - if (p[i] == TS_SYNC_BYTE) { - len = i; - break; + int len = 0; + // Process all pending TS packets + while ((p = ringBufferM->Get(len)) != NULL) { + if (p && (len >= TS_SIZE)) { + if (*p != TS_SYNC_BYTE) { + for (int i = 1; i < len; ++i) { + if (p[i] == TS_SYNC_BYTE) { + len = i; + break; + } } - } - ringBufferM->Del(len); - debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM); - continue; - } - // Process TS packet through all filters - mutexM.Lock(); - for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { - if (filtersM[i]) - filtersM[i]->Process(p); - } - mutexM.Unlock(); - processed = true; - continue; - } - } - cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load + ringBufferM->Del(len); + debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM); + continue; + } + // Process TS packet through all filters + mutexM.Lock(); + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i]) + filtersM[i]->Process(p); + } + mutexM.Unlock(); + ringBufferM->Del(TS_SIZE); + } + } + + // Send demuxed section packets through all filters + mutexM.Lock(); + SendAll(); + mutexM.Unlock(); } debug1("%s Exiting [device %d]", __PRETTY_FUNCTION__, deviceIndexM); } diff --git a/sectionfilter.h b/sectionfilter.h index 833511c..e5573d3 100644 --- a/sectionfilter.h +++ b/sectionfilter.h @@ -8,6 +8,7 @@ #ifndef __SATIP_SECTIONFILTER_H #define __SATIP_SECTIONFILTER_H +#include #include #include "common.h" @@ -55,23 +56,27 @@ public: cSatipSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP); virtual ~cSatipSectionFilter(); void Process(const uint8_t* dataP); - bool Send(void); + void Send(void); int GetFd(void) { return socketM[0]; } uint16_t GetPid(void) const { return pidM; } + int Available(void) const; }; class cSatipSectionFilterHandler : public cThread { private: enum { - eMaxSecFilterCount = 32 + eMaxSecFilterCount = 32, + eSecFilterSendTimeoutMs = 10 }; cRingBufferLinear *ringBufferM; cMutex mutexM; int deviceIndexM; cSatipSectionFilter *filtersM[eMaxSecFilterCount]; + struct pollfd pollFdsM[eMaxSecFilterCount]; bool Delete(unsigned int indexP); bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const; + void SendAll(void); protected: virtual void Action(void);