diff --git a/sectionfilter.c b/sectionfilter.c index ac731ed..98a0fdf 100644 --- a/sectionfilter.c +++ b/sectionfilter.c @@ -17,6 +17,7 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t secLenM(0), tsFeedpM(0), pidM(pidP), + ringBufferM(new cRingBufferFrame(eDmxMaxSectionCount * eDmxMaxSectionSize)), deviceIndexM(deviceIndexP) { //debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, deviceIndexM, pidM); @@ -33,11 +34,11 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t filterMaskM[0] = maskP; // Invert the filter - for (i = 0; i < DMX_MAX_FILTER_SIZE; ++i) + for (i = 0; i < eDmxMaxFilterSize; ++i) filterValueM[i] ^= 0xFF; uint8_t mask, mode, doneq = 0; - for (i = 0; i < DMX_MAX_FILTER_SIZE; ++i) { + for (i = 0; i < eDmxMaxFilterSize; ++i) { mode = filterModeM[i]; mask = filterMaskM[i]; maskAndModeM[i] = (uint8_t)(mask & mode); @@ -48,7 +49,7 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t // Create sockets socketM[0] = socketM[1] = -1; - if (socketpair(AF_UNIX, SOCK_DGRAM, 0, socketM) != 0) { + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, socketM) != 0) { char tmp[64]; error("Opening section filter sockets failed (device=%d pid=%d): %s", deviceIndexM, pidM, strerror_r(errno, tmp, sizeof(tmp))); } @@ -70,6 +71,7 @@ cIptvSectionFilter::~cIptvSectionFilter() if (tmp >= 0) close(tmp); secBufM = NULL; + DELETENULL(ringBufferM); } inline uint16_t cIptvSectionFilter::GetLength(const uint8_t *dataP) @@ -89,7 +91,7 @@ int cIptvSectionFilter::Filter(void) int i; uint8_t neq = 0; - for (i = 0; i < DMX_MAX_FILTER_SIZE; ++i) { + for (i = 0; i < eDmxMaxFilterSize; ++i) { uint8_t calcxor = (uint8_t)(filterValueM[i] ^ secBufM[i]); if (maskAndModeM[i] & calcxor) return 0; @@ -99,13 +101,8 @@ int cIptvSectionFilter::Filter(void) if (doneqM && !neq) return 0; - // There is no data in the read socket, more can be written - if ((socketM[0] >= 0) && (socketM[1] >= 0) /*&& !select_single_desc(socketM[0], 0, false)*/) { - ssize_t len = write(socketM[1], secBufM, secLenM); - ERROR_IF(len < 0, "write()"); - // Update statistics - AddSectionStatistic(len, 1); - } + if (ringBufferM && (secLenM > 0)) + ringBufferM->Put(new cFrame(secBufM, secLenM)); } return 0; } @@ -122,11 +119,11 @@ int cIptvSectionFilter::CopyDump(const uint8_t *bufP, uint8_t lenP) { uint16_t limit, seclen, n; - if (tsFeedpM >= DMX_MAX_SECFEED_SIZE) + if (tsFeedpM >= eDmxMaxSectionFeedSize) return 0; - if (tsFeedpM + lenP > DMX_MAX_SECFEED_SIZE) - lenP = (uint8_t)(DMX_MAX_SECFEED_SIZE - tsFeedpM); + if (tsFeedpM + lenP > eDmxMaxSectionFeedSize) + lenP = (uint8_t)(eDmxMaxSectionFeedSize - tsFeedpM); if (lenP <= 0) return 0; @@ -135,7 +132,7 @@ int cIptvSectionFilter::CopyDump(const uint8_t *bufP, uint8_t lenP) tsFeedpM = uint16_t(tsFeedpM + lenP); limit = tsFeedpM; - if (limit > DMX_MAX_SECFEED_SIZE) + if (limit > eDmxMaxSectionFeedSize) return -1; // internal error should never happen // Always set secbuf @@ -143,7 +140,7 @@ int cIptvSectionFilter::CopyDump(const uint8_t *bufP, uint8_t lenP) for (n = 0; secBufpM + 2 < limit; ++n) { seclen = GetLength(secBufM); - if ((seclen <= 0) || (seclen > DMX_MAX_SECTION_SIZE) || ((seclen + secBufpM) > limit)) + if ((seclen <= 0) || (seclen > eDmxMaxSectionSize) || ((seclen + secBufpM) > limit)) return 0; secLenM = seclen; if (pusiSeenM) @@ -211,13 +208,33 @@ void cIptvSectionFilter::Process(const uint8_t* dataP) } } +bool cIptvSectionFilter::Send(void) +{ + bool result = false; + cFrame *section = ringBufferM->Get(); + if (section) { + uchar *data = section->Data(); + int count = section->Count(); + if (data && (count > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) { + ssize_t len = send(socketM[1], data, count, MSG_EOR); + ERROR_IF(len < 0 && errno != EAGAIN, "send()"); + if (len > 0) { + ringBufferM->Drop(section); + result = !!ringBufferM->Available(); + // Update statistics + AddSectionStatistic(len, 1); + } + } + } + return result; +} + cIptvSectionFilterHandler::cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP) -: cThread("IPTV section handler", true), +: cThread("IPTV section handler"), + ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("IPTV SECTION HANDLER %d", deviceIndexP))), mutexM(), - deviceIndexM(deviceIndexP), - processedM(false), - ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("IPTV SECTION HANDLER %d", deviceIndexP))) + deviceIndexM(deviceIndexP) { debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); @@ -238,8 +255,9 @@ cIptvSectionFilterHandler::cIptvSectionFilterHandler(int deviceIndexP, unsigned cIptvSectionFilterHandler::~cIptvSectionFilterHandler() { debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); - Stop(); - + // Stop thread + if (Running()) + Cancel(3); DELETE_POINTER(ringBufferM); // Destroy all filters @@ -248,26 +266,28 @@ cIptvSectionFilterHandler::~cIptvSectionFilterHandler() Delete(i); } -bool cIptvSectionFilterHandler::Stop(void) -{ - debug("cIptvSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM); - // Stop thread - if (Running()) - Cancel(3); - return true; -} - void cIptvSectionFilterHandler::Action(void) { debug("cIptvSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM); + bool processed = false; // Do the thread loop while (Running()) { + // Send demuxed section packets through all filters + bool retry = false; + mutexM.Lock(); + for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { + if (filtersM[i] && filtersM[i]->Send()) + retry = true; + } + mutexM.Unlock(); + if (retry) + continue; // Read one TS packet if (ringBufferM) { int len = 0; - if (processedM) { + if (processed) { ringBufferM->Del(TS_SIZE); - processedM = false; + processed = false; } uchar *p = ringBufferM->Get(len); if (p && (len >= TS_SIZE)) { @@ -289,7 +309,7 @@ void cIptvSectionFilterHandler::Action(void) filtersM[i]->Process(p); } mutexM.Unlock(); - processedM = true; + processed = true; continue; } } @@ -358,7 +378,7 @@ int cIptvSectionFilterHandler::Open(u_short pidP, u_char tidP, u_char maskP) for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { if (!filtersM[i]) { filtersM[i] = new cIptvSectionFilter(deviceIndexM, pidP, tidP, maskP); - debug("cIptvSectionFilterHandler::%s(%d): pid=%d tid=%02X mask=%02X handle=%d index=%d", __FUNCTION__, deviceIndexM, pidP, tidP, maskP, filtersM[i]->GetFd(), i); + //debug("cIptvSectionFilterHandler::%s(%d): pid=%d tid=%02X mask=%02X handle=%d index=%u", __FUNCTION__, deviceIndexM, pidP, tidP, maskP, filtersM[i]->GetFd(), i); return filtersM[i]->GetFd(); } } diff --git a/sectionfilter.h b/sectionfilter.h index 9d022a8..cb24d25 100644 --- a/sectionfilter.h +++ b/sectionfilter.h @@ -18,10 +18,11 @@ class cIptvSectionFilter : public cIptvSectionStatistics { private: - enum dmx_limits { - DMX_MAX_FILTER_SIZE = 18, - DMX_MAX_SECTION_SIZE = 4096, - DMX_MAX_SECFEED_SIZE = (DMX_MAX_SECTION_SIZE + TS_SIZE) + enum { + eDmxMaxFilterSize = 18, + eDmxMaxSectionCount = 64, + eDmxMaxSectionSize = 4096, + eDmxMaxSectionFeedSize = (eDmxMaxSectionSize + TS_SIZE) }; int pusiSeenM; @@ -29,21 +30,22 @@ private: int doneqM; uint8_t *secBufM; - uint8_t secBufBaseM[DMX_MAX_SECFEED_SIZE]; + uint8_t secBufBaseM[eDmxMaxSectionFeedSize]; uint16_t secBufpM; uint16_t secLenM; uint16_t tsFeedpM; uint16_t pidM; + cRingBufferFrame *ringBufferM; int deviceIndexM; int socketM[2]; - uint8_t filterValueM[DMX_MAX_FILTER_SIZE]; - uint8_t filterMaskM[DMX_MAX_FILTER_SIZE]; - uint8_t filterModeM[DMX_MAX_FILTER_SIZE]; + uint8_t filterValueM[eDmxMaxFilterSize]; + uint8_t filterMaskM[eDmxMaxFilterSize]; + uint8_t filterModeM[eDmxMaxFilterSize]; - uint8_t maskAndModeM[DMX_MAX_FILTER_SIZE]; - uint8_t maskAndNotModeM[DMX_MAX_FILTER_SIZE]; + uint8_t maskAndModeM[eDmxMaxFilterSize]; + uint8_t maskAndNotModeM[eDmxMaxFilterSize]; inline uint16_t GetLength(const uint8_t *dataP); void New(void); @@ -56,6 +58,7 @@ public: cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP); virtual ~cIptvSectionFilter(); void Process(const uint8_t* dataP); + bool Send(void); int GetFd(void) { return socketM[0]; } uint16_t GetPid(void) const { return pidM; } }; @@ -65,10 +68,9 @@ private: enum { eMaxSecFilterCount = 32 }; + cRingBufferLinear *ringBufferM; cMutex mutexM; int deviceIndexM; - bool processedM; - cRingBufferLinear *ringBufferM; cIptvSectionFilter *filtersM[eMaxSecFilterCount]; bool Delete(unsigned int indexP); @@ -80,7 +82,6 @@ protected: public: cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP); virtual ~cIptvSectionFilterHandler(); - bool Stop(void); cString GetInformation(void); int Open(u_short pidP, u_char tidP, u_char maskP); void Close(int handleP);