1
0
mirror of https://github.com/rofafor/vdr-plugin-satip.git synced 2023-10-10 13:37:42 +02:00

Added a check to write new sections only if there is no data in the read socket.

This commit is contained in:
Rolf Ahrenberg 2014-03-30 22:20:56 +03:00
parent f493df0e3d
commit 7b58d9f28f
5 changed files with 98 additions and 33 deletions

View File

@ -30,3 +30,5 @@ VDR Plugin 'satip' Revision History
- Changed implementation to report about RTP packet - Changed implementation to report about RTP packet
errors on 5 minutes interval only. errors on 5 minutes interval only.
- Added a check to write new sections only if there
is no data in the read socket.

View File

@ -57,6 +57,30 @@ char *StripTags(char *strP)
return NULL; return NULL;
} }
int select_single_desc(int descriptorP, const int msP, const bool selectWriteP)
{
// Wait for data
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = msP * 1000L;
// Use select
fd_set infd;
fd_set outfd;
fd_set errfd;
FD_ZERO(&infd);
FD_ZERO(&outfd);
FD_ZERO(&errfd);
FD_SET(descriptorP, &errfd);
if (selectWriteP)
FD_SET(descriptorP, &outfd);
else
FD_SET(descriptorP, &infd);
int retval = select(descriptorP + 1, &infd, &outfd, &errfd, &tv);
// Check if error
ERROR_IF_RET(retval < 0, "select()", return retval);
return retval;
}
cString ChangeCase(const cString &strP, bool upperP) cString ChangeCase(const cString &strP, bool upperP)
{ {
cString res(strP); cString res(strP);

View File

@ -93,6 +93,7 @@ uint16_t ts_pid(const uint8_t *bufP);
uint8_t payload(const uint8_t *bufP); uint8_t payload(const uint8_t *bufP);
const char *id_pid(const u_short pidP); const char *id_pid(const u_short pidP);
char *StripTags(char *strP); char *StripTags(char *strP);
int select_single_desc(int descriptorP, const int msP, const bool selectWriteP);
cString ChangeCase(const cString &strP, bool upperP); cString ChangeCase(const cString &strP, bool upperP);
struct section_filter_table_type { struct section_filter_table_type {

View File

@ -100,11 +100,19 @@ int cSatipSectionFilter::Filter(void)
return 0; return 0;
// There is no data in the read socket, more can be written // There is no data in the read socket, more can be written
if ((socketM[0] >= 0) && (socketM[1] >= 0) /*&& !select_single_desc(socketM[0], 0, false)*/) { if ((secLenM > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) {
for (i = 0; i < eWriteMaxRetries; ++i) {
if (select_single_desc(socketM[0], 10, false))
continue;
ssize_t len = write(socketM[1], secBufM, secLenM); ssize_t len = write(socketM[1], secBufM, secLenM);
ERROR_IF(len < 0, "write()"); ERROR_IF(len < 0, "write()");
// Update statistics // Update statistics
if (len >= 0)
AddSectionStatistic(len, 1); AddSectionStatistic(len, 1);
break;
}
if (i >= eWriteMaxRetries)
debug("Skipped section write (%d bytes)", secLenM);
} }
} }
return 0; return 0;
@ -213,17 +221,20 @@ void cSatipSectionFilter::Process(const uint8_t* dataP)
cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP) cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP)
: cThread("SAT>IP section handler", true), :
#ifdef USE_THREADED_SECTIONFILTER
cThread("SAT>IP section handler", true),
ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("SAT>IP SECTION HANDLER %d", deviceIndexP))),
#endif
mutexM(), mutexM(),
deviceIndexM(deviceIndexP), deviceIndexM(deviceIndexP)
processedM(false),
ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("SAT>IP SECTION HANDLER %d", deviceIndexP)))
{ {
debug("cSatipSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); debug("cSatipSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM);
// Initialize filter pointers // Initialize filter pointers
memset(filtersM, 0, sizeof(filtersM)); memset(filtersM, 0, sizeof(filtersM));
#ifdef USE_THREADED_SECTIONFILTER
// Create input buffer // Create input buffer
if (ringBufferM) { if (ringBufferM) {
ringBufferM->SetTimeouts(100, 100); ringBufferM->SetTimeouts(100, 100);
@ -231,16 +242,19 @@ cSatipSectionFilterHandler::cSatipSectionFilterHandler(int deviceIndexP, unsigne
} }
else else
error("Failed to allocate buffer for section filter handler (device=%d): ", deviceIndexM); error("Failed to allocate buffer for section filter handler (device=%d): ", deviceIndexM);
Start(); Start();
#endif
} }
cSatipSectionFilterHandler::~cSatipSectionFilterHandler() cSatipSectionFilterHandler::~cSatipSectionFilterHandler()
{ {
debug("cSatipSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); debug("cSatipSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM);
Stop(); #ifdef USE_THREADED_SECTIONFILTER
// Stop thread
if (Running())
Cancel(3);
DELETE_POINTER(ringBufferM); DELETE_POINTER(ringBufferM);
#endif
// Destroy all filters // Destroy all filters
cMutexLock MutexLock(&mutexM); cMutexLock MutexLock(&mutexM);
@ -248,26 +262,19 @@ cSatipSectionFilterHandler::~cSatipSectionFilterHandler()
Delete(i); Delete(i);
} }
bool cSatipSectionFilterHandler::Stop(void) #ifdef USE_THREADED_SECTIONFILTER
{
debug("cSatipSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM);
// Stop thread
if (Running())
Cancel(3);
return true;
}
void cSatipSectionFilterHandler::Action(void) void cSatipSectionFilterHandler::Action(void)
{ {
debug("cSatipSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM); debug("cSatipSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM);
bool processed = false;
// Do the thread loop // Do the thread loop
while (Running()) { while (Running()) {
// Read one TS packet // Read one TS packet
if (ringBufferM) { if (ringBufferM) {
int len = 0; int len = 0;
if (processedM) { if (processed) {
ringBufferM->Del(TS_SIZE); ringBufferM->Del(TS_SIZE);
processedM = false; processed = false;
} }
uchar *p = ringBufferM->Get(len); uchar *p = ringBufferM->Get(len);
if (p && (len >= TS_SIZE)) { if (p && (len >= TS_SIZE)) {
@ -289,7 +296,7 @@ void cSatipSectionFilterHandler::Action(void)
filtersM[i]->Process(p); filtersM[i]->Process(p);
} }
mutexM.Unlock(); mutexM.Unlock();
processedM = true; processed = true;
continue; continue;
} }
} }
@ -297,6 +304,7 @@ void cSatipSectionFilterHandler::Action(void)
} }
debug("cSatipSectionFilterHandler::%s(%d): exiting", __FUNCTION__, deviceIndexM); debug("cSatipSectionFilterHandler::%s(%d): exiting", __FUNCTION__, deviceIndexM);
} }
#endif
cString cSatipSectionFilterHandler::GetInformation(void) cString cSatipSectionFilterHandler::GetInformation(void)
{ {
@ -397,10 +405,35 @@ int cSatipSectionFilterHandler::GetPid(int handleP)
void cSatipSectionFilterHandler::Write(uchar *bufferP, int lengthP) void cSatipSectionFilterHandler::Write(uchar *bufferP, int lengthP)
{ {
//debug("cSatipSectionFilterHandler::%s(%d): length=%d", __FUNCTION__, deviceIndexM, lengthP); //debug("cSatipSectionFilterHandler::%s(%d): length=%d", __FUNCTION__, deviceIndexM, lengthP);
#ifdef USE_THREADED_SECTIONFILTER
// Fill up the buffer // Fill up the buffer
if (ringBufferM) { if (ringBufferM) {
int len = ringBufferM->Put(bufferP, lengthP); int len = ringBufferM->Put(bufferP, lengthP);
if (len != lengthP) if (len != lengthP)
ringBufferM->ReportOverflow(lengthP - len); ringBufferM->ReportOverflow(lengthP - len);
} }
#else
// Lock
cMutexLock MutexLock(&mutexM);
uchar *p = bufferP;
int len = lengthP;
// Process TS packets through all filters
while (p && (len >= TS_SIZE)) {
if (*p != TS_SYNC_BYTE) {
for (int i = 1; i < len; ++i) {
if (p[i] == TS_SYNC_BYTE) {
p += i;
len -= i;
break;
}
}
}
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i])
filtersM[i]->Process(p);
}
p += TS_SIZE;
len -= TS_SIZE;
}
#endif
} }

View File

@ -16,9 +16,12 @@
#include "common.h" #include "common.h"
#include "statistics.h" #include "statistics.h"
#define USE_THREADED_SECTIONFILTER
class cSatipSectionFilter : public cSatipSectionStatistics { class cSatipSectionFilter : public cSatipSectionStatistics {
private: private:
enum dmx_limits { enum {
eWriteMaxRetries = 20,
eDmxMaxFilterSize = 18, eDmxMaxFilterSize = 18,
eDmxMaxSectionSize = 4096, eDmxMaxSectionSize = 4096,
eDmxMaxSectionFeedSize = (eDmxMaxSectionSize + TS_SIZE) eDmxMaxSectionFeedSize = (eDmxMaxSectionSize + TS_SIZE)
@ -60,27 +63,29 @@ public:
uint16_t GetPid(void) const { return pidM; } uint16_t GetPid(void) const { return pidM; }
}; };
#ifdef USE_THREADED_SECTIONFILTER
class cSatipSectionFilterHandler : public cThread { class cSatipSectionFilterHandler : public cThread {
protected:
virtual void Action(void);
private:
cRingBufferLinear *ringBufferM;
#else
class cSatipSectionFilterHandler {
#endif
private: private:
enum { enum {
eMaxSecFilterCount = 32 eMaxSecFilterCount = 32
}; };
cMutex mutexM; cMutex mutexM;
int deviceIndexM; int deviceIndexM;
bool processedM;
cRingBufferLinear *ringBufferM;
cSatipSectionFilter *filtersM[eMaxSecFilterCount]; cSatipSectionFilter *filtersM[eMaxSecFilterCount];
bool Delete(unsigned int indexP); bool Delete(unsigned int indexP);
bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const; bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const;
protected:
virtual void Action(void);
public: public:
cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP); cSatipSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP);
virtual ~cSatipSectionFilterHandler(); virtual ~cSatipSectionFilterHandler();
bool Stop(void);
cString GetInformation(void); cString GetInformation(void);
int Open(u_short pidP, u_char tidP, u_char maskP); int Open(u_short pidP, u_char tidP, u_char maskP);
void Close(int handleP); void Close(int handleP);