mirror of
https://github.com/rofafor/vdr-plugin-satip.git
synced 2023-10-10 13:37:42 +02:00
Added transfer timeout for sectionfilter data.
This commit is contained in:
parent
dfb050c297
commit
e11396e84d
117
sectionfilter.c
117
sectionfilter.c
@ -212,27 +212,28 @@ void cSatipSectionFilter::Process(const uint8_t* dataP)
|
||||
}
|
||||
}
|
||||
|
||||
bool cSatipSectionFilter::Send(void)
|
||||
void cSatipSectionFilter::Send(void)
|
||||
{
|
||||
bool result = false;
|
||||
cFrame *section = ringBufferM->Get();
|
||||
if (section) {
|
||||
uchar *data = section->Data();
|
||||
int count = section->Count();
|
||||
if (data && (count > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) {
|
||||
ssize_t len = send(socketM[1], data, count, MSG_EOR);
|
||||
ERROR_IF(len < 0 && errno != EAGAIN, "send()");
|
||||
if (len > 0) {
|
||||
ringBufferM->Drop(section);
|
||||
result = !!ringBufferM->Available();
|
||||
if (send(socketM[1], data, count, MSG_EOR) > 0) {
|
||||
// Update statistics
|
||||
AddSectionStatistic(len, 1);
|
||||
AddSectionStatistic(count, 1);
|
||||
}
|
||||
else
|
||||
esyslog("failed to send section data (%i bytes) to fd: %i (errno: %i)", count, socketM[1], errno);
|
||||
}
|
||||
ringBufferM->Drop(section);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
int cSatipSectionFilter::Available(void) const
|
||||
{
|
||||
return ringBufferM->Available();
|
||||
}
|
||||
|
||||
cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP)
|
||||
: cThread(cString::sprintf("SATIP#%d section handler", deviceIndexP)),
|
||||
@ -270,54 +271,68 @@ cSatipSectionFilterHandler::~cSatipSectionFilterHandler()
|
||||
Delete(i);
|
||||
}
|
||||
|
||||
bool cSatipSectionFilterHandler::Send(void)
|
||||
{
|
||||
// zero polling structures
|
||||
memset(pollFdsM, 0, sizeof(pollFdsM));
|
||||
|
||||
// assemble all handlers to poll
|
||||
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
|
||||
if (filtersM[i] && filtersM[i]->Available() != 0) {
|
||||
pollFdsM[i].fd = filtersM[i]->GetFd();
|
||||
pollFdsM[i].events = POLLOUT;
|
||||
}
|
||||
}
|
||||
|
||||
// anyone ready for writing
|
||||
if (poll(pollFdsM, eMaxSecFilterCount, 10) <= 0)
|
||||
return false;
|
||||
|
||||
// send data
|
||||
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
|
||||
if(pollFdsM[i].revents & POLLOUT)
|
||||
filtersM[i]->Send();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void cSatipSectionFilterHandler::Action(void)
|
||||
{
|
||||
debug1("%s Entering [device %d]", __PRETTY_FUNCTION__, deviceIndexM);
|
||||
bool processed = false;
|
||||
// Do the thread loop
|
||||
uchar *p = NULL;
|
||||
while (Running()) {
|
||||
// Send demuxed section packets through all filters
|
||||
bool retry = false;
|
||||
mutexM.Lock();
|
||||
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
|
||||
if (filtersM[i] && filtersM[i]->Send())
|
||||
retry = true;
|
||||
}
|
||||
mutexM.Unlock();
|
||||
if (retry)
|
||||
continue;
|
||||
// Read one TS packet
|
||||
if (ringBufferM) {
|
||||
int len = 0;
|
||||
if (processed) {
|
||||
ringBufferM->Del(TS_SIZE);
|
||||
processed = false;
|
||||
}
|
||||
uchar *p = ringBufferM->Get(len);
|
||||
if (p && (len >= TS_SIZE)) {
|
||||
if (*p != TS_SYNC_BYTE) {
|
||||
for (int i = 1; i < len; ++i) {
|
||||
if (p[i] == TS_SYNC_BYTE) {
|
||||
len = i;
|
||||
break;
|
||||
int len = 0;
|
||||
// Process all pending TS packets
|
||||
while ((p = ringBufferM->Get(len)) != NULL) {
|
||||
if (p && (len >= TS_SIZE)) {
|
||||
if (*p != TS_SYNC_BYTE) {
|
||||
for (int i = 1; i < len; ++i) {
|
||||
if (p[i] == TS_SYNC_BYTE) {
|
||||
len = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
ringBufferM->Del(len);
|
||||
debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM);
|
||||
continue;
|
||||
}
|
||||
// Process TS packet through all filters
|
||||
mutexM.Lock();
|
||||
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
|
||||
if (filtersM[i])
|
||||
filtersM[i]->Process(p);
|
||||
}
|
||||
mutexM.Unlock();
|
||||
processed = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load
|
||||
ringBufferM->Del(len);
|
||||
debug1("%s Skipped %d bytes to sync on TS packet [device %d]", __PRETTY_FUNCTION__, len, deviceIndexM);
|
||||
continue;
|
||||
}
|
||||
// Process TS packet through all filters
|
||||
mutexM.Lock();
|
||||
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
|
||||
if (filtersM[i])
|
||||
filtersM[i]->Process(p);
|
||||
}
|
||||
mutexM.Unlock();
|
||||
ringBufferM->Del(TS_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
// Send demuxed section packets through all filters
|
||||
mutexM.Lock();
|
||||
while (Send()) ;
|
||||
mutexM.Unlock();
|
||||
//cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load
|
||||
}
|
||||
debug1("%s Exiting [device %d]", __PRETTY_FUNCTION__, deviceIndexM);
|
||||
}
|
||||
|
@ -8,6 +8,7 @@
|
||||
#ifndef __SATIP_SECTIONFILTER_H
|
||||
#define __SATIP_SECTIONFILTER_H
|
||||
|
||||
#include <sys/poll.h>
|
||||
#include <vdr/device.h>
|
||||
|
||||
#include "common.h"
|
||||
@ -55,9 +56,10 @@ public:
|
||||
cSatipSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP);
|
||||
virtual ~cSatipSectionFilter();
|
||||
void Process(const uint8_t* dataP);
|
||||
bool Send(void);
|
||||
void Send(void);
|
||||
int GetFd(void) { return socketM[0]; }
|
||||
uint16_t GetPid(void) const { return pidM; }
|
||||
int Available(void) const;
|
||||
};
|
||||
|
||||
class cSatipSectionFilterHandler : public cThread {
|
||||
@ -69,9 +71,11 @@ private:
|
||||
cMutex mutexM;
|
||||
int deviceIndexM;
|
||||
cSatipSectionFilter *filtersM[eMaxSecFilterCount];
|
||||
struct pollfd pollFdsM[eMaxSecFilterCount];
|
||||
|
||||
bool Delete(unsigned int indexP);
|
||||
bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const;
|
||||
bool Send(void);
|
||||
|
||||
protected:
|
||||
virtual void Action(void);
|
||||
|
Loading…
Reference in New Issue
Block a user