Refactored the section filtering.

This commit is contained in:
Rolf Ahrenberg 2014-04-05 18:43:23 +03:00
parent 591fb8431b
commit 688c7db1fc
2 changed files with 69 additions and 48 deletions

View File

@ -17,6 +17,7 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t
secLenM(0), secLenM(0),
tsFeedpM(0), tsFeedpM(0),
pidM(pidP), pidM(pidP),
ringBufferM(new cRingBufferFrame(eDmxMaxSectionCount * eDmxMaxSectionSize)),
deviceIndexM(deviceIndexP) deviceIndexM(deviceIndexP)
{ {
//debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, deviceIndexM, pidM); //debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, deviceIndexM, pidM);
@ -33,11 +34,11 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t
filterMaskM[0] = maskP; filterMaskM[0] = maskP;
// Invert the filter // Invert the filter
for (i = 0; i < DMX_MAX_FILTER_SIZE; ++i) for (i = 0; i < eDmxMaxFilterSize; ++i)
filterValueM[i] ^= 0xFF; filterValueM[i] ^= 0xFF;
uint8_t mask, mode, doneq = 0; uint8_t mask, mode, doneq = 0;
for (i = 0; i < DMX_MAX_FILTER_SIZE; ++i) { for (i = 0; i < eDmxMaxFilterSize; ++i) {
mode = filterModeM[i]; mode = filterModeM[i];
mask = filterMaskM[i]; mask = filterMaskM[i];
maskAndModeM[i] = (uint8_t)(mask & mode); maskAndModeM[i] = (uint8_t)(mask & mode);
@ -48,7 +49,7 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t
// Create sockets // Create sockets
socketM[0] = socketM[1] = -1; socketM[0] = socketM[1] = -1;
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, socketM) != 0) { if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, socketM) != 0) {
char tmp[64]; char tmp[64];
error("Opening section filter sockets failed (device=%d pid=%d): %s", deviceIndexM, pidM, strerror_r(errno, tmp, sizeof(tmp))); error("Opening section filter sockets failed (device=%d pid=%d): %s", deviceIndexM, pidM, strerror_r(errno, tmp, sizeof(tmp)));
} }
@ -70,6 +71,7 @@ cIptvSectionFilter::~cIptvSectionFilter()
if (tmp >= 0) if (tmp >= 0)
close(tmp); close(tmp);
secBufM = NULL; secBufM = NULL;
DELETENULL(ringBufferM);
} }
inline uint16_t cIptvSectionFilter::GetLength(const uint8_t *dataP) inline uint16_t cIptvSectionFilter::GetLength(const uint8_t *dataP)
@ -89,7 +91,7 @@ int cIptvSectionFilter::Filter(void)
int i; int i;
uint8_t neq = 0; uint8_t neq = 0;
for (i = 0; i < DMX_MAX_FILTER_SIZE; ++i) { for (i = 0; i < eDmxMaxFilterSize; ++i) {
uint8_t calcxor = (uint8_t)(filterValueM[i] ^ secBufM[i]); uint8_t calcxor = (uint8_t)(filterValueM[i] ^ secBufM[i]);
if (maskAndModeM[i] & calcxor) if (maskAndModeM[i] & calcxor)
return 0; return 0;
@ -99,13 +101,8 @@ int cIptvSectionFilter::Filter(void)
if (doneqM && !neq) if (doneqM && !neq)
return 0; return 0;
// There is no data in the read socket, more can be written if (ringBufferM && (secLenM > 0))
if ((socketM[0] >= 0) && (socketM[1] >= 0) /*&& !select_single_desc(socketM[0], 0, false)*/) { ringBufferM->Put(new cFrame(secBufM, secLenM));
ssize_t len = write(socketM[1], secBufM, secLenM);
ERROR_IF(len < 0, "write()");
// Update statistics
AddSectionStatistic(len, 1);
}
} }
return 0; return 0;
} }
@ -122,11 +119,11 @@ int cIptvSectionFilter::CopyDump(const uint8_t *bufP, uint8_t lenP)
{ {
uint16_t limit, seclen, n; uint16_t limit, seclen, n;
if (tsFeedpM >= DMX_MAX_SECFEED_SIZE) if (tsFeedpM >= eDmxMaxSectionFeedSize)
return 0; return 0;
if (tsFeedpM + lenP > DMX_MAX_SECFEED_SIZE) if (tsFeedpM + lenP > eDmxMaxSectionFeedSize)
lenP = (uint8_t)(DMX_MAX_SECFEED_SIZE - tsFeedpM); lenP = (uint8_t)(eDmxMaxSectionFeedSize - tsFeedpM);
if (lenP <= 0) if (lenP <= 0)
return 0; return 0;
@ -135,7 +132,7 @@ int cIptvSectionFilter::CopyDump(const uint8_t *bufP, uint8_t lenP)
tsFeedpM = uint16_t(tsFeedpM + lenP); tsFeedpM = uint16_t(tsFeedpM + lenP);
limit = tsFeedpM; limit = tsFeedpM;
if (limit > DMX_MAX_SECFEED_SIZE) if (limit > eDmxMaxSectionFeedSize)
return -1; // internal error should never happen return -1; // internal error should never happen
// Always set secbuf // Always set secbuf
@ -143,7 +140,7 @@ int cIptvSectionFilter::CopyDump(const uint8_t *bufP, uint8_t lenP)
for (n = 0; secBufpM + 2 < limit; ++n) { for (n = 0; secBufpM + 2 < limit; ++n) {
seclen = GetLength(secBufM); seclen = GetLength(secBufM);
if ((seclen <= 0) || (seclen > DMX_MAX_SECTION_SIZE) || ((seclen + secBufpM) > limit)) if ((seclen <= 0) || (seclen > eDmxMaxSectionSize) || ((seclen + secBufpM) > limit))
return 0; return 0;
secLenM = seclen; secLenM = seclen;
if (pusiSeenM) if (pusiSeenM)
@ -211,13 +208,33 @@ void cIptvSectionFilter::Process(const uint8_t* dataP)
} }
} }
bool cIptvSectionFilter::Send(void)
{
bool result = false;
cFrame *section = ringBufferM->Get();
if (section) {
uchar *data = section->Data();
int count = section->Count();
if (data && (count > 0) && (socketM[1] >= 0) && (socketM[0] >= 0)) {
ssize_t len = send(socketM[1], data, count, MSG_EOR);
ERROR_IF(len < 0 && errno != EAGAIN, "send()");
if (len > 0) {
ringBufferM->Drop(section);
result = !!ringBufferM->Available();
// Update statistics
AddSectionStatistic(len, 1);
}
}
}
return result;
}
cIptvSectionFilterHandler::cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP) cIptvSectionFilterHandler::cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP)
: cThread("IPTV section handler", true), : cThread("IPTV section handler"),
ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("IPTV SECTION HANDLER %d", deviceIndexP))),
mutexM(), mutexM(),
deviceIndexM(deviceIndexP), deviceIndexM(deviceIndexP)
processedM(false),
ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("IPTV SECTION HANDLER %d", deviceIndexP)))
{ {
debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM);
@ -238,8 +255,9 @@ cIptvSectionFilterHandler::cIptvSectionFilterHandler(int deviceIndexP, unsigned
cIptvSectionFilterHandler::~cIptvSectionFilterHandler() cIptvSectionFilterHandler::~cIptvSectionFilterHandler()
{ {
debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM); debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM);
Stop(); // Stop thread
if (Running())
Cancel(3);
DELETE_POINTER(ringBufferM); DELETE_POINTER(ringBufferM);
// Destroy all filters // Destroy all filters
@ -248,26 +266,28 @@ cIptvSectionFilterHandler::~cIptvSectionFilterHandler()
Delete(i); Delete(i);
} }
bool cIptvSectionFilterHandler::Stop(void)
{
debug("cIptvSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM);
// Stop thread
if (Running())
Cancel(3);
return true;
}
void cIptvSectionFilterHandler::Action(void) void cIptvSectionFilterHandler::Action(void)
{ {
debug("cIptvSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM); debug("cIptvSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM);
bool processed = false;
// Do the thread loop // Do the thread loop
while (Running()) { while (Running()) {
// Send demuxed section packets through all filters
bool retry = false;
mutexM.Lock();
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i] && filtersM[i]->Send())
retry = true;
}
mutexM.Unlock();
if (retry)
continue;
// Read one TS packet // Read one TS packet
if (ringBufferM) { if (ringBufferM) {
int len = 0; int len = 0;
if (processedM) { if (processed) {
ringBufferM->Del(TS_SIZE); ringBufferM->Del(TS_SIZE);
processedM = false; processed = false;
} }
uchar *p = ringBufferM->Get(len); uchar *p = ringBufferM->Get(len);
if (p && (len >= TS_SIZE)) { if (p && (len >= TS_SIZE)) {
@ -289,7 +309,7 @@ void cIptvSectionFilterHandler::Action(void)
filtersM[i]->Process(p); filtersM[i]->Process(p);
} }
mutexM.Unlock(); mutexM.Unlock();
processedM = true; processed = true;
continue; continue;
} }
} }
@ -358,7 +378,7 @@ int cIptvSectionFilterHandler::Open(u_short pidP, u_char tidP, u_char maskP)
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) { for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (!filtersM[i]) { if (!filtersM[i]) {
filtersM[i] = new cIptvSectionFilter(deviceIndexM, pidP, tidP, maskP); filtersM[i] = new cIptvSectionFilter(deviceIndexM, pidP, tidP, maskP);
debug("cIptvSectionFilterHandler::%s(%d): pid=%d tid=%02X mask=%02X handle=%d index=%d", __FUNCTION__, deviceIndexM, pidP, tidP, maskP, filtersM[i]->GetFd(), i); //debug("cIptvSectionFilterHandler::%s(%d): pid=%d tid=%02X mask=%02X handle=%d index=%u", __FUNCTION__, deviceIndexM, pidP, tidP, maskP, filtersM[i]->GetFd(), i);
return filtersM[i]->GetFd(); return filtersM[i]->GetFd();
} }
} }

View File

@ -18,10 +18,11 @@
class cIptvSectionFilter : public cIptvSectionStatistics { class cIptvSectionFilter : public cIptvSectionStatistics {
private: private:
enum dmx_limits { enum {
DMX_MAX_FILTER_SIZE = 18, eDmxMaxFilterSize = 18,
DMX_MAX_SECTION_SIZE = 4096, eDmxMaxSectionCount = 64,
DMX_MAX_SECFEED_SIZE = (DMX_MAX_SECTION_SIZE + TS_SIZE) eDmxMaxSectionSize = 4096,
eDmxMaxSectionFeedSize = (eDmxMaxSectionSize + TS_SIZE)
}; };
int pusiSeenM; int pusiSeenM;
@ -29,21 +30,22 @@ private:
int doneqM; int doneqM;
uint8_t *secBufM; uint8_t *secBufM;
uint8_t secBufBaseM[DMX_MAX_SECFEED_SIZE]; uint8_t secBufBaseM[eDmxMaxSectionFeedSize];
uint16_t secBufpM; uint16_t secBufpM;
uint16_t secLenM; uint16_t secLenM;
uint16_t tsFeedpM; uint16_t tsFeedpM;
uint16_t pidM; uint16_t pidM;
cRingBufferFrame *ringBufferM;
int deviceIndexM; int deviceIndexM;
int socketM[2]; int socketM[2];
uint8_t filterValueM[DMX_MAX_FILTER_SIZE]; uint8_t filterValueM[eDmxMaxFilterSize];
uint8_t filterMaskM[DMX_MAX_FILTER_SIZE]; uint8_t filterMaskM[eDmxMaxFilterSize];
uint8_t filterModeM[DMX_MAX_FILTER_SIZE]; uint8_t filterModeM[eDmxMaxFilterSize];
uint8_t maskAndModeM[DMX_MAX_FILTER_SIZE]; uint8_t maskAndModeM[eDmxMaxFilterSize];
uint8_t maskAndNotModeM[DMX_MAX_FILTER_SIZE]; uint8_t maskAndNotModeM[eDmxMaxFilterSize];
inline uint16_t GetLength(const uint8_t *dataP); inline uint16_t GetLength(const uint8_t *dataP);
void New(void); void New(void);
@ -56,6 +58,7 @@ public:
cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP); cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP);
virtual ~cIptvSectionFilter(); virtual ~cIptvSectionFilter();
void Process(const uint8_t* dataP); void Process(const uint8_t* dataP);
bool Send(void);
int GetFd(void) { return socketM[0]; } int GetFd(void) { return socketM[0]; }
uint16_t GetPid(void) const { return pidM; } uint16_t GetPid(void) const { return pidM; }
}; };
@ -65,10 +68,9 @@ private:
enum { enum {
eMaxSecFilterCount = 32 eMaxSecFilterCount = 32
}; };
cRingBufferLinear *ringBufferM;
cMutex mutexM; cMutex mutexM;
int deviceIndexM; int deviceIndexM;
bool processedM;
cRingBufferLinear *ringBufferM;
cIptvSectionFilter *filtersM[eMaxSecFilterCount]; cIptvSectionFilter *filtersM[eMaxSecFilterCount];
bool Delete(unsigned int indexP); bool Delete(unsigned int indexP);
@ -80,7 +82,6 @@ protected:
public: public:
cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP); cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP);
virtual ~cIptvSectionFilterHandler(); virtual ~cIptvSectionFilterHandler();
bool Stop(void);
cString GetInformation(void); cString GetInformation(void);
int Open(u_short pidP, u_char tidP, u_char maskP); int Open(u_short pidP, u_char tidP, u_char maskP);
void Close(int handleP); void Close(int handleP);