Added transfer timeout for sectionfilter data.

This commit is contained in:
Alexander Pipelka 2017-09-24 20:29:51 +02:00
parent 97097f74af
commit 7e6b722747
2 changed files with 71 additions and 52 deletions

View File

@ -212,27 +212,28 @@ void cSatipSectionFilter::Process(const uint8_t* dataP)
} }
} }
bool cSatipSectionFilter::Send(void) void cSatipSectionFilter::Send(void)
{ {
bool result = false;
cFrame *section = ringBufferM->Get(); cFrame *section = ringBufferM->Get();
if (section) { if (section) {
uchar *data = section->Data(); uchar *data = section->Data();
int count = section->Count(); int count = section->Count();
if (data && (count > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) { if (data && (count > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) {
ssize_t len = send(socketM[1], data, count, MSG_EOR); if (send(socketM[1], data, count, MSG_EOR) > 0) {
ERROR_IF(len < 0 && errno != EAGAIN, "send()");
if (len > 0) {
ringBufferM->Drop(section);
result = !!ringBufferM->Available();
// Update statistics // Update statistics
AddSectionStatistic(len, 1); AddSectionStatistic(count, 1);
} }
else
esyslog("failed to send section data (%i bytes) to fd: %i (errno: %i)", count, socketM[1], errno);
} }
ringBufferM->Drop(section);
} }
return result;
} }
int cSatipSectionFilter::Available(void) const
{
return ringBufferM->Available();
}
cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP) cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP)
: cThread(cString::sprintf("SATIP#%d section handler", deviceIndexP)), : cThread(cString::sprintf("SATIP#%d section handler", deviceIndexP)),
@ -270,54 +271,68 @@ cSatipSectionFilterHandler::~cSatipSectionFilterHandler()
Delete(i); Delete(i);
} }
bool cSatipSectionFilterHandler::Send(void)
{
// zero polling structures
memset(pollFdsM, 0, sizeof(pollFdsM));
// assemble all handlers to poll
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i] && filtersM[i]->Available() != 0) {
pollFdsM[i].fd = filtersM[i]->GetFd();
pollFdsM[i].events = POLLOUT;
}
}
// anyone ready for writing
if (poll(pollFdsM, eMaxSecFilterCount, 10) <= 0)
return false;
// send data
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if(pollFdsM[i].revents & POLLOUT)
filtersM[i]->Send();
}
return true;
}
void cSatipSectionFilterHandler::Action(void) void cSatipSectionFilterHandler::Action(void)
{ {
debug1("%s Entering [device %d]", __PRETTY_FUNCTION__, deviceIndexM); debug1("%s Entering [device %d]", __PRETTY_FUNCTION__, deviceIndexM);
bool processed = false;
// Do the thread loop // Do the thread loop
uchar *p = NULL;
while (Running()) { while (Running()) {
// Send demuxed section packets through all filters int len = 0;
bool retry = false; // Process all pending TS packets
mutexM.Lock(); while ((p = ringBufferM->Get(len)) != NULL) {
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { if (p && (len >= TS_SIZE)) {
if (filtersM[i] && filtersM[i]->Send()) if (*p != TS_SYNC_BYTE) {
retry = true; for (int i = 1; i < len; ++i) {
} if (p[i] == TS_SYNC_BYTE) {
mutexM.Unlock(); len = i;
if (retry) break;
continue; }
// Read one TS packet
if (ringBufferM) {
int len = 0;
if (processed) {
ringBufferM->Del(TS_SIZE);
processed = false;
}
uchar *p = ringBufferM->Get(len);
if (p && (len >= TS_SIZE)) {
if (*p != TS_SYNC_BYTE) {
for (int i = 1; i < len; ++i) {
if (p[i] == TS_SYNC_BYTE) {
len = i;
break;
} }
} ringBufferM->Del(len);
ringBufferM->Del(len); debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM);
debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM); continue;
continue; }
} // Process TS packet through all filters
// Process TS packet through all filters mutexM.Lock();
mutexM.Lock(); for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { if (filtersM[i])
if (filtersM[i]) filtersM[i]->Process(p);
filtersM[i]->Process(p); }
} mutexM.Unlock();
mutexM.Unlock(); ringBufferM->Del(TS_SIZE);
processed = true; }
continue; }
}
} // Send demuxed section packets through all filters
cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load mutexM.Lock();
while (Send()) ;
mutexM.Unlock();
//cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load
} }
debug1("%s Exiting [device %d]", __PRETTY_FUNCTION__, deviceIndexM); debug1("%s Exiting [device %d]", __PRETTY_FUNCTION__, deviceIndexM);
} }

View File

@ -8,6 +8,7 @@
#ifndef __SATIP_SECTIONFILTER_H #ifndef __SATIP_SECTIONFILTER_H
#define __SATIP_SECTIONFILTER_H #define __SATIP_SECTIONFILTER_H
#include <sys/poll.h>
#include <vdr/device.h> #include <vdr/device.h>
#include "common.h" #include "common.h"
@ -55,9 +56,10 @@ public:
cSatipSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP); cSatipSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP);
virtual ~cSatipSectionFilter(); virtual ~cSatipSectionFilter();
void Process(const uint8_t* dataP); void Process(const uint8_t* dataP);
bool Send(void); void Send(void);
int GetFd(void) { return socketM[0]; } int GetFd(void) { return socketM[0]; }
uint16_t GetPid(void) const { return pidM; } uint16_t GetPid(void) const { return pidM; }
int Available(void) const;
}; };
class cSatipSectionFilterHandler : public cThread { class cSatipSectionFilterHandler : public cThread {
@ -69,9 +71,11 @@ private:
cMutex mutexM; cMutex mutexM;
int deviceIndexM; int deviceIndexM;
cSatipSectionFilter *filtersM[eMaxSecFilterCount]; cSatipSectionFilter *filtersM[eMaxSecFilterCount];
struct pollfd pollFdsM[eMaxSecFilterCount];
bool Delete(unsigned int indexP); bool Delete(unsigned int indexP);
bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const; bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const;
bool Send(void);
protected: protected:
virtual void Action(void); virtual void Action(void);