Merge pull request #44 from pipelka/sectionfilter-send

Added transfer timeout for sectionfilter data.
This commit is contained in:
Rolf Ahrenberg 2017-10-01 22:32:48 +03:00 committed by GitHub
commit 3cc63dadac
2 changed files with 73 additions and 53 deletions

View File

@ -212,27 +212,28 @@ void cSatipSectionFilter::Process(const uint8_t* dataP)
}
}
bool cSatipSectionFilter::Send(void)
void cSatipSectionFilter::Send(void)
{
bool result = false;
cFrame *section = ringBufferM->Get();
if (section) {
uchar *data = section->Data();
int count = section->Count();
if (data && (count > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) {
ssize_t len = send(socketM[1], data, count, MSG_EOR);
ERROR_IF(len < 0 && errno != EAGAIN, "send()");
if (len > 0) {
ringBufferM->Drop(section);
result = !!ringBufferM->Available();
if (send(socketM[1], data, count, MSG_EOR) > 0) {
// Update statistics
AddSectionStatistic(len, 1);
AddSectionStatistic(count, 1);
}
else if (errno != EAGAIN)
error("failed to send section data (%i bytes) [device=%d]", count, deviceIndexM);
}
ringBufferM->Drop(section);
}
return result;
}
int cSatipSectionFilter::Available(void) const
{
return ringBufferM->Available();
}
cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP)
: cThread(cString::sprintf("SATIP#%d section handler", deviceIndexP)),
@ -270,54 +271,68 @@ cSatipSectionFilterHandler::~cSatipSectionFilterHandler()
Delete(i);
}
void cSatipSectionFilterHandler::SendAll(void)
{
while (true) {
// zero polling structures
memset(pollFdsM, 0, sizeof(pollFdsM));
// assemble all handlers to poll
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i] && filtersM[i]->Available() != 0) {
pollFdsM[i].fd = filtersM[i]->GetFd();
pollFdsM[i].events = POLLOUT;
}
}
// anyone ready for writing
if (poll(pollFdsM, eMaxSecFilterCount, eSecFilterSendTimeoutMs) <= 0)
return;
// send data
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (pollFdsM[i].revents & POLLOUT)
filtersM[i]->Send();
}
}
}
void cSatipSectionFilterHandler::Action(void)
{
debug1("%s Entering [device %d]", __PRETTY_FUNCTION__, deviceIndexM);
bool processed = false;
// Do the thread loop
uchar *p = NULL;
while (Running()) {
// Send demuxed section packets through all filters
bool retry = false;
mutexM.Lock();
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i] && filtersM[i]->Send())
retry = true;
}
mutexM.Unlock();
if (retry)
continue;
// Read one TS packet
if (ringBufferM) {
int len = 0;
if (processed) {
ringBufferM->Del(TS_SIZE);
processed = false;
}
uchar *p = ringBufferM->Get(len);
if (p && (len >= TS_SIZE)) {
if (*p != TS_SYNC_BYTE) {
for (int i = 1; i < len; ++i) {
if (p[i] == TS_SYNC_BYTE) {
len = i;
break;
int len = 0;
// Process all pending TS packets
while ((p = ringBufferM->Get(len)) != NULL) {
if (p && (len >= TS_SIZE)) {
if (*p != TS_SYNC_BYTE) {
for (int i = 1; i < len; ++i) {
if (p[i] == TS_SYNC_BYTE) {
len = i;
break;
}
}
}
ringBufferM->Del(len);
debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM);
continue;
}
// Process TS packet through all filters
mutexM.Lock();
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i])
filtersM[i]->Process(p);
}
mutexM.Unlock();
processed = true;
continue;
}
}
cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load
ringBufferM->Del(len);
debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM);
continue;
}
// Process TS packet through all filters
mutexM.Lock();
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i])
filtersM[i]->Process(p);
}
mutexM.Unlock();
ringBufferM->Del(TS_SIZE);
}
}
// Send demuxed section packets through all filters
mutexM.Lock();
SendAll();
mutexM.Unlock();
}
debug1("%s Exiting [device %d]", __PRETTY_FUNCTION__, deviceIndexM);
}

View File

@ -8,6 +8,7 @@
#ifndef __SATIP_SECTIONFILTER_H
#define __SATIP_SECTIONFILTER_H
#include <poll.h>
#include <vdr/device.h>
#include "common.h"
@ -55,23 +56,27 @@ public:
cSatipSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP);
virtual ~cSatipSectionFilter();
void Process(const uint8_t* dataP);
bool Send(void);
void Send(void);
int GetFd(void) { return socketM[0]; }
uint16_t GetPid(void) const { return pidM; }
int Available(void) const;
};
class cSatipSectionFilterHandler : public cThread {
private:
enum {
eMaxSecFilterCount = 32
eMaxSecFilterCount = 32,
eSecFilterSendTimeoutMs = 10
};
cRingBufferLinear *ringBufferM;
cMutex mutexM;
int deviceIndexM;
cSatipSectionFilter *filtersM[eMaxSecFilterCount];
struct pollfd pollFdsM[eMaxSecFilterCount];
bool Delete(unsigned int indexP);
bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const;
void SendAll(void);
protected:
virtual void Action(void);