Fixed and refactored the section filtering code, fixed a possible crash in the file protocol, and updated Makefile's install target.

This commit is contained in:
Rolf Ahrenberg 2013-03-27 22:13:15 +02:00
parent f30817677e
commit d940e616e3
18 changed files with 343 additions and 186 deletions

View File

@ -204,3 +204,5 @@ VDR Plugin 'iptv' Revision History
- Enabled I/O throttling and tweaked buffer timeouts. - Enabled I/O throttling and tweaked buffer timeouts.
- Fixed a nasty network byte order bug. - Fixed a nasty network byte order bug.
- Fixed and refactored the section filtering code.
- Fixed a possible crash in the file protocol.

View File

@ -32,6 +32,7 @@ PKGCFG = $(if $(VDRDIR),$(shell pkg-config --variable=$(1) $(VDRDIR)/vdr.pc),$(s
LIBDIR = $(call PKGCFG,libdir) LIBDIR = $(call PKGCFG,libdir)
LOCDIR = $(call PKGCFG,locdir) LOCDIR = $(call PKGCFG,locdir)
PLGCFG = $(call PKGCFG,plgcfg) PLGCFG = $(call PKGCFG,plgcfg)
CFGDIR = $(call PKGCFG,configdir)
# #
TMPDIR ?= /tmp TMPDIR ?= /tmp
@ -144,7 +145,11 @@ endif
install-lib: $(SOFILE) install-lib: $(SOFILE)
install -D $^ $(DESTDIR)$(LIBDIR)/$^.$(APIVERSION) install -D $^ $(DESTDIR)$(LIBDIR)/$^.$(APIVERSION)
install: install-lib install-i18n install-conf:
@mkdir -p $(DESTDIR)$(CFGDIR)/plugins/$(PLUGIN)
@cp -pn $(PLUGIN)/* $(DESTDIR)$(CFGDIR)/plugins/$(PLUGIN)/
install: install-lib install-i18n install-conf
dist: $(I18Npo) clean dist: $(I18Npo) clean
@-rm -rf $(TMPDIR)/$(ARCHIVE) @-rm -rf $(TMPDIR)/$(ARCHIVE)

8
README
View File

@ -41,14 +41,8 @@ MP3 radio streams, mms video streams and so on.
Installation: Installation:
cd /put/your/path/here/VDR/PLUGINS/src
tar -xzf /put/your/path/here/vdr-iptv-X.Y.Z.tgz tar -xzf /put/your/path/here/vdr-iptv-X.Y.Z.tgz
ln -s iptv-X.Y.Z iptv make -C iptv-X.Y.Z install
cd /put/your/path/here/VDR
cp -R PLUGINS/src/iptv/iptv /path/to/vdrresource/plugins/
make
make plugins
./vdr -P iptv
Setup menu: Setup menu:

149
device.c
View File

@ -26,20 +26,21 @@ cIptvDevice::cIptvDevice(unsigned int indexP)
bufsize -= (bufsize % TS_SIZE); bufsize -= (bufsize % TS_SIZE);
isyslog("creating IPTV device %d (CardIndex=%d)", deviceIndexM, CardIndex()); isyslog("creating IPTV device %d (CardIndex=%d)", deviceIndexM, CardIndex());
tsBufferM = new cRingBufferLinear(bufsize + 1, TS_SIZE, false, tsBufferM = new cRingBufferLinear(bufsize + 1, TS_SIZE, false,
*cString::sprintf("IPTV %d", deviceIndexM)); *cString::sprintf("IPTV TS %d", deviceIndexM));
tsBufferM->SetTimeouts(100, 100); if (tsBufferM) {
tsBufferM->SetIoThrottle(); tsBufferM->SetTimeouts(100, 100);
tsBufferM->SetIoThrottle();
pIptvStreamerM = new cIptvStreamer(*this, tsBufferM->Free());
}
ResetBuffering(); ResetBuffering();
pUdpProtocolM = new cIptvProtocolUdp(); pUdpProtocolM = new cIptvProtocolUdp();
pCurlProtocolM = new cIptvProtocolCurl(); pCurlProtocolM = new cIptvProtocolCurl();
pHttpProtocolM = new cIptvProtocolHttp(); pHttpProtocolM = new cIptvProtocolHttp();
pFileProtocolM = new cIptvProtocolFile(); pFileProtocolM = new cIptvProtocolFile();
pExtProtocolM = new cIptvProtocolExt(); pExtProtocolM = new cIptvProtocolExt();
pIptvStreamerM = new cIptvStreamer(tsBufferM, (100 * TS_SIZE));
pPidScannerM = new cPidScanner(); pPidScannerM = new cPidScanner();
// Initialize filter pointers
memset(secFiltersM, 0, sizeof(secFiltersM));
// Start section handler for iptv device // Start section handler for iptv device
pIptvSectionM = new cIptvSectionFilterHandler(deviceIndexM, bufsize + 1);
StartSectionHandler(); StartSectionHandler();
// Sid scanner must be created after the section handler // Sid scanner must be created after the section handler
AttachFilter(pSidScannerM = new cSidScanner()); AttachFilter(pSidScannerM = new cSidScanner());
@ -68,10 +69,7 @@ cIptvDevice::~cIptvDevice()
DELETE_POINTER(pSidScannerM); DELETE_POINTER(pSidScannerM);
// Stop section handler of iptv device // Stop section handler of iptv device
StopSectionHandler(); StopSectionHandler();
// Destroy all filters DELETE_POINTER(pIptvSectionM);
cMutexLock MutexLock(&mutexM);
for (int i = 0; i < eMaxSecFilterCount; ++i)
DeleteFilter(i);
// Close dvr fifo // Close dvr fifo
if (dvrFdM >= 0) { if (dvrFdM >= 0) {
int fd = dvrFdM; int fd = dvrFdM;
@ -136,20 +134,7 @@ cString cIptvDevice::GetPidsInformation(void)
cString cIptvDevice::GetFiltersInformation(void) cString cIptvDevice::GetFiltersInformation(void)
{ {
//debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM); //debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM);
unsigned int count = 0; return cString::sprintf("Active section filters:\n%s", pIptvSectionM ? *pIptvSectionM->GetInformation() : "");
cString s("Active section filters:\n");
// loop through active section filters
cMutexLock MutexLock(&mutexM);
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (secFiltersM[i]) {
s = cString::sprintf("%sFilter %d: %s Pid=0x%02X (%s)\n", *s, i,
*secFiltersM[i]->GetSectionStatistic(), secFiltersM[i]->GetPid(),
id_pid(secFiltersM[i]->GetPid()));
if (++count > IPTV_STATS_ACTIVE_FILTERS_COUNT)
break;
}
}
return s;
} }
cString cIptvDevice::GetInformation(unsigned int pageP) cString cIptvDevice::GetInformation(unsigned int pageP)
@ -184,7 +169,7 @@ cString cIptvDevice::GetInformation(unsigned int pageP)
cString cIptvDevice::DeviceType(void) const cString cIptvDevice::DeviceType(void) const
{ {
debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM); //debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM);
return "IPTV"; return "IPTV";
} }
@ -300,80 +285,19 @@ bool cIptvDevice::SetPid(cPidHandle *handleP, int typeP, bool onP)
return true; return true;
} }
bool cIptvDevice::DeleteFilter(unsigned int indexP)
{
if ((indexP < eMaxSecFilterCount) && secFiltersM[indexP]) {
//debug("cIptvDevice::%s(%d): index=%d", __FUNCTION__, deviceIndexM, indexP);
cIptvSectionFilter *tmp = secFiltersM[indexP];
secFiltersM[indexP] = NULL;
delete tmp;
return true;
}
return false;
}
bool cIptvDevice::IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const
{
//debug("cIptvDevice::%s(%d): pid=%d tid=%02X mask=%02X", __FUNCTION__, deviceIndexM, pidP, tidP, maskP);
// loop through section filter table
for (int i = 0; i < SECTION_FILTER_TABLE_SIZE; ++i) {
int index = IptvConfig.GetDisabledFilters(i);
// Check if matches
if ((index >= 0) && (index < SECTION_FILTER_TABLE_SIZE) &&
(section_filter_table[index].pid == pidP) && (section_filter_table[index].tid == tidP) &&
(section_filter_table[index].mask == maskP)) {
//debug("cIptvDevice::%s(%d): found %s", __FUNCTION__, deviceIndexM, section_filter_table[index].description);
return true;
}
}
return false;
}
int cIptvDevice::OpenFilter(u_short pidP, u_char tidP, u_char maskP) int cIptvDevice::OpenFilter(u_short pidP, u_char tidP, u_char maskP)
{ {
// Check if disabled by user debug("cIptvDevice::%s(%d): pid=%d tid=%d mask=%d", __FUNCTION__, deviceIndexM, pidP, tidP, maskP);
if (!IptvConfig.GetSectionFiltering()) if (pIptvSectionM && IptvConfig.GetSectionFiltering())
return -1; return pIptvSectionM->Open(pidP, tidP, maskP);
// Lock
cMutexLock MutexLock(&mutexM);
// Blacklist check, refuse certain filters
if (IsBlackListed(pidP, tidP, maskP))
return -1;
// Search the next free filter slot
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (!secFiltersM[i]) {
//debug("cIptvDevice::%s(%d): pid=%d tid=%02X mask=%02X index=%d", __FUNCTION__, deviceIndexM, pidP, tidP, maskP, i);
secFiltersM[i] = new cIptvSectionFilter(deviceIndexM, pidP, tidP, maskP);
if (secFiltersM[i])
return i;
break;
}
}
// No free filter slot found
return -1; return -1;
} }
int cIptvDevice::ReadFilter(int handleP, void *bufferP, size_t lengthP)
{
// Lock
cMutexLock MutexLock(&mutexM);
// ... and load
if (secFiltersM[handleP]) {
return secFiltersM[handleP]->Read(bufferP, lengthP);
//debug("cIptvDevice::%s(%d): handle=%d length=%d", __FUNCTION__, deviceIndexM, handleP, lengthP);
}
return 0;
}
void cIptvDevice::CloseFilter(int handleP) void cIptvDevice::CloseFilter(int handleP)
{ {
// Lock debug("cIptvDevice::%s(%d): handle=%d", __FUNCTION__, deviceIndexM, handleP);
cMutexLock MutexLock(&mutexM); if (pIptvSectionM)
// ... and load pIptvSectionM->Close(handleP);
if (secFiltersM[handleP]) {
//debug("cIptvDevice::%s(%d): handle=%d", __FUNCTION__, deviceIndexM, handleP);
DeleteFilter(handleP);
}
} }
bool cIptvDevice::OpenDvr(void) bool cIptvDevice::OpenDvr(void)
@ -431,6 +355,32 @@ bool cIptvDevice::IsBuffering(void) const
return false; return false;
} }
void cIptvDevice::WriteData(uchar *bufferP, int lengthP)
{
//debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM);
int len;
// Send data to dvr fifo
if (dvrFdM >= 0)
len = write(dvrFdM, bufferP, lengthP);
// Fill up TS buffer
if (tsBufferM) {
len = tsBufferM->Put(bufferP, lengthP);
if (len != lengthP)
tsBufferM->ReportOverflow(lengthP - len);
}
// Filter the sections
if (pIptvSectionM && IptvConfig.GetSectionFiltering())
pIptvSectionM->Write(bufferP, lengthP);
}
unsigned int cIptvDevice::CheckData(void)
{
//debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM);
if (tsBufferM)
return (unsigned int)tsBufferM->Free();
return 0;
}
bool cIptvDevice::GetTSPacket(uchar *&Data) bool cIptvDevice::GetTSPacket(uchar *&Data)
{ {
//debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM); //debug("cIptvDevice::%s(%d)", __FUNCTION__, deviceIndexM);
@ -452,26 +402,13 @@ bool cIptvDevice::GetTSPacket(uchar *&Data)
} }
} }
tsBufferM->Del(Count); tsBufferM->Del(Count);
error("Skipped %d bytes to sync on TS packet\n", Count); error("Skipped %d bytes to sync on TS packet", Count);
return false; return false;
} }
isPacketDeliveredM = true; isPacketDeliveredM = true;
Data = p; Data = p;
// Update pid statistics // Update pid statistics
AddPidStatistic(ts_pid(p), payload(p)); AddPidStatistic(ts_pid(p), payload(p));
// Send data also to dvr fifo
if (dvrFdM >= 0)
Count = (int)write(dvrFdM, p, TS_SIZE);
// Analyze incomplete streams with built-in pid analyzer
if (pidScanEnabledM && pPidScannerM)
pPidScannerM->Process(p);
// Lock
cMutexLock MutexLock(&mutexM);
// Run the data through all filters
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (secFiltersM[i])
secFiltersM[i]->Process(p);
}
return true; return true;
} }
} }

View File

@ -10,6 +10,7 @@
#include <vdr/device.h> #include <vdr/device.h>
#include "common.h" #include "common.h"
#include "deviceif.h"
#include "protocoludp.h" #include "protocoludp.h"
#include "protocolcurl.h" #include "protocolcurl.h"
#include "protocolhttp.h" #include "protocolhttp.h"
@ -21,7 +22,7 @@
#include "sidscanner.h" #include "sidscanner.h"
#include "statistics.h" #include "statistics.h"
class cIptvDevice : public cDevice, public cIptvPidStatistics, public cIptvBufferStatistics { class cIptvDevice : public cDevice, public cIptvPidStatistics, public cIptvBufferStatistics, public cIptvDeviceIf {
// static ones // static ones
public: public:
static unsigned int deviceCount; static unsigned int deviceCount;
@ -31,9 +32,6 @@ public:
// private parts // private parts
private: private:
enum {
eMaxSecFilterCount = 32
};
unsigned int deviceIndexM; unsigned int deviceIndexM;
int dvrFdM; int dvrFdM;
bool isPacketDeliveredM; bool isPacketDeliveredM;
@ -49,10 +47,10 @@ private:
cIptvProtocolFile *pFileProtocolM; cIptvProtocolFile *pFileProtocolM;
cIptvProtocolExt *pExtProtocolM; cIptvProtocolExt *pExtProtocolM;
cIptvStreamer *pIptvStreamerM; cIptvStreamer *pIptvStreamerM;
cIptvSectionFilterHandler *pIptvSectionM;
cPidScanner *pPidScannerM; cPidScanner *pPidScannerM;
cSidScanner *pSidScannerM; cSidScanner *pSidScannerM;
cMutex mutexM; cMutex mutexM;
cIptvSectionFilter *secFiltersM[eMaxSecFilterCount];
// constructor & destructor // constructor & destructor
public: public:
@ -74,8 +72,6 @@ private:
private: private:
void ResetBuffering(void); void ResetBuffering(void);
bool IsBuffering(void) const; bool IsBuffering(void) const;
bool DeleteFilter(unsigned int indexP);
bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const;
// for channel info // for channel info
public: public:
@ -104,7 +100,6 @@ protected:
// for section filtering // for section filtering
public: public:
virtual int OpenFilter(u_short pidP, u_char tidP, u_char maskP); virtual int OpenFilter(u_short pidP, u_char tidP, u_char maskP);
virtual int ReadFilter(int handleP, void *bufferP, size_t lengthP);
virtual void CloseFilter(int handleP); virtual void CloseFilter(int handleP);
// for transponder lock // for transponder lock
@ -114,6 +109,11 @@ public:
// for common interface // for common interface
public: public:
virtual bool HasInternalCam(void); virtual bool HasInternalCam(void);
// for internal device interface
public:
virtual void WriteData(u_char *bufferP, int lengthP);
virtual unsigned int CheckData(void);
}; };
#endif // __IPTV_DEVICE_H #endif // __IPTV_DEVICE_H

23
deviceif.h Normal file
View File

@ -0,0 +1,23 @@
/*
* deviceif.h: IPTV plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#ifndef __IPTV_DEVICEIF_H
#define __IPTV_DEVICEIF_H
class cIptvDeviceIf {
public:
cIptvDeviceIf() {}
virtual ~cIptvDeviceIf() {}
virtual void WriteData(u_char *bufferP, int lengthP) = 0;
virtual unsigned int CheckData(void) = 0;
private:
cIptvDeviceIf(const cIptvDeviceIf&);
cIptvDeviceIf& operator=(const cIptvDeviceIf&);
};
#endif // __IPTV_DEVICEIF_H

View File

@ -56,7 +56,7 @@ void cPidScanner::Process(const uint8_t* bufP)
// Verify TS packet // Verify TS packet
if (bufP[0] != 0x47) { if (bufP[0] != 0x47) {
error("Not TS packet: 0x%X\n", bufP[0]); error("Not TS packet: 0x%02X", bufP[0]);
return; return;
} }

View File

@ -9,14 +9,18 @@
#include "config.h" #include "config.h"
#include "protocolcurl.h" #include "protocolcurl.h"
#ifdef CURLOPT_RTSPHEADER
#define USE_RTSP
#endif
#define iptv_curl_easy_setopt(X, Y, Z) \ #define iptv_curl_easy_setopt(X, Y, Z) \
if ((res = curl_easy_setopt((X), (Y), (Z))) != CURLE_OK) { \ if ((res = curl_easy_setopt((X), (Y), (Z))) != CURLE_OK) { \
error("curl_easy_setopt(%s, %s, %s) failed: %d\n", #X, #Y, #Z, res); \ error("curl_easy_setopt(%s, %s, %s) failed: %d", #X, #Y, #Z, res); \
} }
#define iptv_curl_easy_perform(X) \ #define iptv_curl_easy_perform(X) \
if ((res = curl_easy_perform((X))) != CURLE_OK) { \ if ((res = curl_easy_perform((X))) != CURLE_OK) { \
error("curl_easy_perform(%s) failed: %d\n", #X, res); \ error("curl_easy_perform(%s) failed: %d", #X, res); \
} }
cIptvProtocolCurl::cIptvProtocolCurl() cIptvProtocolCurl::cIptvProtocolCurl()
@ -210,7 +214,7 @@ unsigned char *cIptvProtocolCurl::GetData(int &lenP)
break; break;
} }
} }
error("IPTV skipped %d bytes to sync on TS packet\n", count); error("IPTV skipped %d bytes to sync on TS packet", count);
ringBufferM->Del(count); ringBufferM->Del(count);
lenP = 0; lenP = 0;
return NULL; return NULL;
@ -274,6 +278,7 @@ bool cIptvProtocolCurl::Connect()
// Protocol specific initializations // Protocol specific initializations
switch (modeM) { switch (modeM) {
#ifdef USE_RTSP
case eModeRtsp: case eModeRtsp:
{ {
cString uri, control, transport, range; cString uri, control, transport, range;
@ -317,7 +322,7 @@ bool cIptvProtocolCurl::Connect()
iptv_curl_easy_perform(handleM); iptv_curl_easy_perform(handleM);
} }
break; break;
#endif
case eModeHttp: case eModeHttp:
case eModeHttps: case eModeHttps:
{ {
@ -366,6 +371,7 @@ bool cIptvProtocolCurl::Disconnect()
if (handleM && multiM) { if (handleM && multiM) {
// Mode specific tricks // Mode specific tricks
switch (modeM) { switch (modeM) {
#ifdef USE_RTSP
case eModeRtsp: case eModeRtsp:
{ {
CURLcode res = CURLE_OK; CURLcode res = CURLE_OK;
@ -377,7 +383,7 @@ bool cIptvProtocolCurl::Disconnect()
rtspControlM = ""; rtspControlM = "";
} }
break; break;
#endif
case eModeHttp: case eModeHttp:
case eModeHttps: case eModeHttps:
case eModeFile: case eModeFile:
@ -424,6 +430,7 @@ int cIptvProtocolCurl::Read(unsigned char* bufferAddrP, unsigned int bufferLenP)
// Fill up the buffer // Fill up the buffer
if (handleM && multiM) { if (handleM && multiM) {
switch (modeM) { switch (modeM) {
#ifdef USE_RTSP
case eModeRtsp: case eModeRtsp:
{ {
cMutexLock MutexLock(&mutexM); cMutexLock MutexLock(&mutexM);
@ -433,7 +440,7 @@ int cIptvProtocolCurl::Read(unsigned char* bufferAddrP, unsigned int bufferLenP)
// @todo - How to detect eof? // @todo - How to detect eof?
} }
break; break;
#endif
case eModeFile: case eModeFile:
case eModeHttp: case eModeHttp:
case eModeHttps: case eModeHttps:

View File

@ -60,7 +60,7 @@ int cIptvProtocolFile::Read(unsigned char* bufferAddrP, unsigned int bufferLenP)
{ {
//debug("cIptvProtocolFile::%s()", __FUNCTION__); //debug("cIptvProtocolFile::%s()", __FUNCTION__);
// Check errors // Check errors
if (ferror(fileStreamM)) { if (!fileStreamM || ferror(fileStreamM)) {
debug("cIptvProtocolFile::%s(): stream error", __FUNCTION__); debug("cIptvProtocolFile::%s(): stream error", __FUNCTION__);
return -1; return -1;
} }

View File

@ -113,13 +113,13 @@ bool cIptvProtocolHttp::GetHeaderLine(char* destP, unsigned int destLenP,
++bufptr; ++bufptr;
// Check that buffer won't be exceeded // Check that buffer won't be exceeded
if (recvLenP >= destLenP) { if (recvLenP >= destLenP) {
error("Header wouldn't fit into buffer\n"); error("Header wouldn't fit into buffer");
recvLenP = 0; recvLenP = 0;
return false; return false;
} }
} }
else { else {
error("No HTTP response received in 500ms\n"); error("No HTTP response received in 500ms");
return false; return false;
} }
} }
@ -143,14 +143,14 @@ bool cIptvProtocolHttp::ProcessHeaders(void)
if (!GetHeaderLine(buf, sizeof(buf), lineLength)) if (!GetHeaderLine(buf, sizeof(buf), lineLength))
return false; return false;
if (!responseFound && sscanf(buf, fmt, &version, &response) != 2) { if (!responseFound && sscanf(buf, fmt, &version, &response) != 2) {
error("Expected HTTP header not found\n"); error("Expected HTTP header not found");
continue; continue;
} }
else else
responseFound = true; responseFound = true;
// Allow only 'OK' and 'Partial Content' // Allow only 'OK' and 'Partial Content'
if ((response != 200) && (response != 206)) { if ((response != 200) && (response != 206)) {
error("Invalid HTTP response (%d): %s\n", response, buf); error("Invalid HTTP response (%d): %s", response, buf);
return false; return false;
} }
} }

View File

@ -17,9 +17,9 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t
secLenM(0), secLenM(0),
tsFeedpM(0), tsFeedpM(0),
pidM(pidP), pidM(pidP),
devIdM(deviceIndexP) deviceIndexM(deviceIndexP)
{ {
//debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, devIdM, pidM); //debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, deviceIndexM, pidM);
int i; int i;
memset(secBufBaseM, 0, sizeof(secBufBaseM)); memset(secBufBaseM, 0, sizeof(secBufBaseM));
@ -46,32 +46,32 @@ cIptvSectionFilter::cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t
} }
doneqM = doneq ? 1 : 0; doneqM = doneq ? 1 : 0;
// Create filtering buffer // Create sockets
ringbufferM = new cRingBufferLinear(KILOBYTE(128), 0, false, *cString::sprintf("IPTV SECTION %d/%d", devIdM, pidM)); socketM[0] = socketM[1] = -1;
if (ringbufferM) if (socketpair(AF_UNIX, SOCK_DGRAM, 0, socketM) != 0) {
ringbufferM->SetTimeouts(10, 10); char tmp[64];
else error("Opening section filter sockets failed (device=%d pid=%d): %s", deviceIndexM, pidM, strerror_r(errno, tmp, sizeof(tmp)));
error("Failed to allocate buffer for section filter (device=%d pid=%d): ", devIdM, pidM); }
else if ((fcntl(socketM[0], F_SETFL, O_NONBLOCK) != 0) || (fcntl(socketM[1], F_SETFL, O_NONBLOCK) != 0)) {
char tmp[64];
error("Setting section filter socket to non-blocking mode failed (device=%d pid=%d): %s", deviceIndexM, pidM, strerror_r(errno, tmp, sizeof(tmp)));
}
} }
cIptvSectionFilter::~cIptvSectionFilter() cIptvSectionFilter::~cIptvSectionFilter()
{ {
//debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, devIdM, pidM); //debug("cIptvSectionFilter::%s(%d, %d)", __FUNCTION__, deviceIndexM, pidM);
DELETE_POINTER(ringbufferM); int tmp = socketM[1];
socketM[1] = -1;
if (tmp >= 0)
close(tmp);
tmp = socketM[0];
socketM[0] = -1;
if (tmp >= 0)
close(tmp);
secBufM = NULL; secBufM = NULL;
} }
int cIptvSectionFilter::Read(void *Data, size_t Length)
{
int count = 0;
uchar *p = ringbufferM->Get(count);
if (p && count > 0) {
memcpy(Data, p, count);
ringbufferM->Del(count);
}
return count;
}
inline uint16_t cIptvSectionFilter::GetLength(const uint8_t *dataP) inline uint16_t cIptvSectionFilter::GetLength(const uint8_t *dataP)
{ {
return (uint16_t)(3 + ((dataP[1] & 0x0f) << 8) + dataP[2]); return (uint16_t)(3 + ((dataP[1] & 0x0f) << 8) + dataP[2]);
@ -99,10 +99,10 @@ int cIptvSectionFilter::Filter(void)
if (doneqM && !neq) if (doneqM && !neq)
return 0; return 0;
if (ringbufferM) { // There is no data in the read socket, more can be written
int len = ringbufferM->Put(secBufM, secLenM); if ((socketM[0] >= 0) && (socketM[1] >= 0) /*&& !select_single_desc(socketM[0], 0, false)*/) {
if (len != secLenM) ssize_t len = write(socketM[1], secBufM, secLenM);
ringbufferM->ReportOverflow(secLenM - len); ERROR_IF(len < 0, "write()");
// Update statistics // Update statistics
AddSectionStatistic(len, 1); AddSectionStatistic(len, 1);
} }
@ -210,3 +210,175 @@ void cIptvSectionFilter::Process(const uint8_t* dataP)
CopyDump(&dataP[p], count); CopyDump(&dataP[p], count);
} }
} }
cIptvSectionFilterHandler::cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP)
: cThread("IPTV section handler", true),
mutexM(),
deviceIndexM(deviceIndexP),
processedM(false),
ringBufferM(new cRingBufferLinear(bufferLenP, TS_SIZE, false, *cString::sprintf("IPTV SECTION HANDLER %d", deviceIndexP)))
{
debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM);
// Initialize filter pointers
memset(filtersM, 0, sizeof(filtersM));
// Create input buffer
if (ringBufferM) {
ringBufferM->SetTimeouts(100, 100);
ringBufferM->SetIoThrottle();
}
else
error("Failed to allocate buffer for section filter handler (device=%d): ", deviceIndexM);
Start();
}
cIptvSectionFilterHandler::~cIptvSectionFilterHandler()
{
debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM);
// Stop thread
if (Running())
Cancel(3);
// Destroy all filters
cMutexLock MutexLock(&mutexM);
for (int i = 0; i < eMaxSecFilterCount; ++i)
Delete(i);
}
void cIptvSectionFilterHandler::Action(void)
{
debug("cIptvSectionFilterHandler::%s(%d): entering", __FUNCTION__, deviceIndexM);
// Do the thread loop
while (Running()) {
// Read one TS packet
if (ringBufferM) {
int len = 0;
if (processedM) {
ringBufferM->Del(TS_SIZE);
processedM = false;
}
uchar *p = ringBufferM->Get(len);
if (p && (len >= TS_SIZE)) {
if (*p != TS_SYNC_BYTE) {
for (int i = 1; i < len; ++i) {
if (p[i] == TS_SYNC_BYTE) {
len = i;
break;
}
}
ringBufferM->Del(len);
debug("cIptvSectionFilterHandler::%s(%d): Skipped %d bytes to sync on TS packet", __FUNCTION__, deviceIndexM, len);
continue;
}
// Process TS packet through all filters
mutexM.Lock();
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i])
filtersM[i]->Process(p);
}
mutexM.Unlock();
processedM = true;
continue;
}
}
cCondWait::SleepMs(10); // to avoid busy loop and reduce cpu load
}
debug("cIptvSectionFilterHandler::%s(%d): exiting", __FUNCTION__, deviceIndexM);
}
cString cIptvSectionFilterHandler::GetInformation(void)
{
//debug("cIptvSectionFilterHandler::%s(%d)", __FUNCTION__, deviceIndexM);
// loop through active section filters
cMutexLock MutexLock(&mutexM);
cString s = "";
unsigned int count = 0;
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i]) {
s = cString::sprintf("%sFilter %d: %s Pid=0x%02X (%s)\n", *s, i,
*filtersM[i]->GetSectionStatistic(), filtersM[i]->GetPid(),
id_pid(filtersM[i]->GetPid()));
if (++count > IPTV_STATS_ACTIVE_FILTERS_COUNT)
break;
}
}
return s;
}
bool cIptvSectionFilterHandler::Delete(unsigned int indexP)
{
//debug("cIptvSectionFilterHandler::%s(%d): index=%d", __FUNCTION__, deviceIndexM, indexP);
if ((indexP < eMaxSecFilterCount) && filtersM[indexP]) {
//debug("cIptvSectionFilterHandler::%s(%d): found %d", __FUNCTION__, deviceIndexM, indexP);
cIptvSectionFilter *tmp = filtersM[indexP];
filtersM[indexP] = NULL;
delete tmp;
return true;
}
return false;
}
bool cIptvSectionFilterHandler::IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const
{
//debug("cIptvSectionFilterHandler::%s(%d): pid=%d tid=%02X mask=%02X", __FUNCTION__, deviceIndexM, pidP, tidP, maskP);
// loop through section filter table
for (int i = 0; i < SECTION_FILTER_TABLE_SIZE; ++i) {
int index = IptvConfig.GetDisabledFilters(i);
// Check if matches
if ((index >= 0) && (index < SECTION_FILTER_TABLE_SIZE) &&
(section_filter_table[index].pid == pidP) && (section_filter_table[index].tid == tidP) &&
(section_filter_table[index].mask == maskP)) {
//debug("cIptvSectionFilterHandler::%s(%d): found %s", __FUNCTION__, deviceIndexM, section_filter_table[index].description);
return true;
}
}
return false;
}
int cIptvSectionFilterHandler::Open(u_short pidP, u_char tidP, u_char maskP)
{
// Lock
cMutexLock MutexLock(&mutexM);
// Blacklist check, refuse certain filters
if (IsBlackListed(pidP, tidP, maskP))
return -1;
// Search the next free filter slot
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (!filtersM[i]) {
//debug("cIptvSectionFilterHandler::%s(%d): pid=%d tid=%02X mask=%02X index=%d", __FUNCTION__, deviceIndexM, pidP, tidP, maskP, i);
filtersM[i] = new cIptvSectionFilter(deviceIndexM, pidP, tidP, maskP);
return filtersM[i]->GetFd();
}
}
// No free filter slot found
return -1;
}
void cIptvSectionFilterHandler::Close(int handleP)
{
// Lock
cMutexLock MutexLock(&mutexM);
// Search the filter for deletion
for (unsigned int i = 0; i < eMaxSecFilterCount; ++i) {
if (filtersM[i] && (handleP == filtersM[i]->GetFd())) {
//debug(""cIptvSectionFilterHandler::%s(%d): handle=%d", __FUNCTION__, deviceIndex, handleP);
Delete(i);
break;
}
}
}
void cIptvSectionFilterHandler::Write(uchar *bufferP, int lengthP)
{
//debug("cIptvSectionFilterHandler::%s(%d): length=%d", __FUNCTION__, deviceIndexM, lengthP);
// Fill up the buffer
if (ringBufferM) {
int len = ringBufferM->Put(bufferP, lengthP);
if (len != lengthP)
ringBufferM->ReportOverflow(lengthP - len);
}
}

View File

@ -35,7 +35,8 @@ private:
uint16_t tsFeedpM; uint16_t tsFeedpM;
uint16_t pidM; uint16_t pidM;
int devIdM; int deviceIndexM;
int socketM[2];
uint8_t filterValueM[DMX_MAX_FILTER_SIZE]; uint8_t filterValueM[DMX_MAX_FILTER_SIZE];
uint8_t filterMaskM[DMX_MAX_FILTER_SIZE]; uint8_t filterMaskM[DMX_MAX_FILTER_SIZE];
@ -44,8 +45,6 @@ private:
uint8_t maskAndModeM[DMX_MAX_FILTER_SIZE]; uint8_t maskAndModeM[DMX_MAX_FILTER_SIZE];
uint8_t maskAndNotModeM[DMX_MAX_FILTER_SIZE]; uint8_t maskAndNotModeM[DMX_MAX_FILTER_SIZE];
cRingBufferLinear *ringbufferM;
inline uint16_t GetLength(const uint8_t *dataP); inline uint16_t GetLength(const uint8_t *dataP);
void New(void); void New(void);
int Filter(void); int Filter(void);
@ -57,8 +56,34 @@ public:
cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP); cIptvSectionFilter(int deviceIndexP, uint16_t pidP, uint8_t tidP, uint8_t maskP);
virtual ~cIptvSectionFilter(); virtual ~cIptvSectionFilter();
void Process(const uint8_t* dataP); void Process(const uint8_t* dataP);
int Read(void *bufferP, size_t lengthP); int GetFd(void) { return socketM[0]; }
uint16_t GetPid(void) const { return pidM; } uint16_t GetPid(void) const { return pidM; }
}; };
class cIptvSectionFilterHandler : public cThread {
private:
enum {
eMaxSecFilterCount = 32
};
cMutex mutexM;
int deviceIndexM;
bool processedM;
cRingBufferLinear *ringBufferM;
cIptvSectionFilter *filtersM[eMaxSecFilterCount];
bool Delete(unsigned int indexP);
bool IsBlackListed(u_short pidP, u_char tidP, u_char maskP) const;
protected:
virtual void Action(void);
public:
cIptvSectionFilterHandler(int deviceIndexP, unsigned int bufferLenP);
virtual ~cIptvSectionFilterHandler();
cString GetInformation(void);
int Open(u_short pidP, u_char tidP, u_char maskP);
void Close(int handleP);
void Write(u_char *bufferP, int lengthP);
};
#endif // __IPTV_SECTIONFILTER_H #endif // __IPTV_SECTIONFILTER_H

View File

@ -76,10 +76,10 @@ void cSidScanner::Process(u_short pidP, u_char tidP, const u_char *dataP, int le
break; // default to the first one break; // default to the first one
} }
if (nit.getNetworkId() != channelIdM.Nid()) { if (nit.getNetworkId() != channelIdM.Nid()) {
debug("cSidScanner::%s(): nid=%d\n", __FUNCTION__, ts.getTransportStreamId()); debug("cSidScanner::%s(): nid=%d", __FUNCTION__, ts.getTransportStreamId());
newNid = nit.getNetworkId(); newNid = nit.getNetworkId();
} }
nidFoundM = true; nidFoundM = true;
} }
} }
if ((newSid >= 0) || (newNid >= 0) || (newTid >= 0)) { if ((newSid >= 0) || (newNid >= 0) || (newTid >= 0)) {

View File

@ -226,7 +226,7 @@ int cIptvUdpSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP)
//debug("cIptvUdpSocket::%s()", __FUNCTION__); //debug("cIptvUdpSocket::%s()", __FUNCTION__);
// Error out if socket not initialized // Error out if socket not initialized
if (socketDescM <= 0) { if (socketDescM <= 0) {
error("Invalid socket in cIptvUdpSocket::%s()\n", __FUNCTION__); error("Invalid socket in cIptvUdpSocket::%s()", __FUNCTION__);
return -1; return -1;
} }
int len = 0; int len = 0;
@ -359,7 +359,7 @@ int cIptvTcpSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP)
//debug("cIptvTcpSocket::%s()", __FUNCTION__); //debug("cIptvTcpSocket::%s()", __FUNCTION__);
// Error out if socket not initialized // Error out if socket not initialized
if (socketDescM <= 0) { if (socketDescM <= 0) {
error("Invalid socket in cIptvTcpSocket::%s()\n", __FUNCTION__); error("Invalid socket in cIptvTcpSocket::%s()", __FUNCTION__);
return -1; return -1;
} }
int len = 0; int len = 0;
@ -376,7 +376,7 @@ bool cIptvTcpSocket::ReadChar(char *bufferAddrP, unsigned int timeoutMsP)
//debug("cIptvTcpSocket::%s()", __FUNCTION__); //debug("cIptvTcpSocket::%s()", __FUNCTION__);
// Error out if socket not initialized // Error out if socket not initialized
if (socketDescM <= 0) { if (socketDescM <= 0) {
error("Invalid socket in cIptvTcpSocket::%s()\n", __FUNCTION__); error("Invalid socket in cIptvTcpSocket::%s()", __FUNCTION__);
return false; return false;
} }
socklen_t addrlen = sizeof(sockAddrM); socklen_t addrlen = sizeof(sockAddrM);
@ -401,7 +401,7 @@ bool cIptvTcpSocket::Write(const char *bufferAddrP, unsigned int bufferLenP)
//debug("cIptvTcpSocket::%s()", __FUNCTION__); //debug("cIptvTcpSocket::%s()", __FUNCTION__);
// Error out if socket not initialized // Error out if socket not initialized
if (socketDescM <= 0) { if (socketDescM <= 0) {
error("Invalid socket in cIptvTcpSocket::%s()\n", __FUNCTION__); error("Invalid socket in cIptvTcpSocket::%s()", __FUNCTION__);
return false; return false;
} }
ERROR_IF_RET(send(socketDescM, bufferAddrP, bufferLenP, 0) < 0, "send()", return false); ERROR_IF_RET(send(socketDescM, bufferAddrP, bufferLenP, 0) < 0, "send()", return false);

View File

@ -120,7 +120,7 @@ bool cIptvTransponderParameters::Parse(const char *strP)
if (found_s && found_p && found_f && found_u && found_a) if (found_s && found_p && found_f && found_u && found_a)
result = true; result = true;
else else
error("Invalid channel parameters: %s\n", str); error("Invalid channel parameters: %s", str);
free(str); free(str);
} }

View File

@ -44,7 +44,7 @@ cString cIptvSectionStatistics::GetSectionStatistic()
void cIptvSectionStatistics::AddSectionStatistic(long bytesP, long callsP) void cIptvSectionStatistics::AddSectionStatistic(long bytesP, long callsP)
{ {
//debug("cIptvSectionStatistics::%s(%ld, %ld)", __FUNCTION__, bytesP, callsP); //debug("cIptvSectionStatistics::%s(%ld, %ld)", __FUNCTION__, bytesP, callsP);
cMutexLock MutexLock(&mutexM); cMutexLock MutexLock(&mutexM);
filteredDataM += bytesP; filteredDataM += bytesP;
numberOfCallsM += callsP; numberOfCallsM += callsP;

View File

@ -5,16 +5,13 @@
* *
*/ */
#include <vdr/thread.h>
#include <vdr/ringbuffer.h>
#include "common.h" #include "common.h"
#include "streamer.h" #include "streamer.h"
cIptvStreamer::cIptvStreamer(cRingBufferLinear* ringBufferP, unsigned int packetLenP) cIptvStreamer::cIptvStreamer(cIptvDeviceIf &deviceP, unsigned int packetLenP)
: cThread("IPTV streamer"), : cThread("IPTV streamer"),
ringBufferM(ringBufferP),
sleepM(), sleepM(),
deviceM(&deviceP),
packetBufferLenM(packetLenP), packetBufferLenM(packetLenP),
protocolM(NULL) protocolM(NULL)
{ {
@ -33,7 +30,6 @@ cIptvStreamer::~cIptvStreamer()
// Close the protocol // Close the protocol
Close(); Close();
protocolM = NULL; protocolM = NULL;
ringBufferM = NULL;
// Free allocated memory // Free allocated memory
free(packetBufferM); free(packetBufferM);
} }
@ -46,16 +42,12 @@ void cIptvStreamer::Action(void)
// Do the thread loop // Do the thread loop
while (packetBufferM && Running()) { while (packetBufferM && Running()) {
int length = -1; int length = -1;
unsigned int size = min((unsigned int)ringBufferM->Free(), packetBufferLenM); unsigned int size = min(deviceM->CheckData(), packetBufferLenM);
if (protocolM && (size > 0)) if (protocolM && (size > 0))
length = protocolM->Read(packetBufferM, size); length = protocolM->Read(packetBufferM, size);
if (length > 0) { if (length > 0) {
AddStreamerStatistic(length); AddStreamerStatistic(length);
if (ringBufferM) { deviceM->WriteData(packetBufferM, length);
int p = ringBufferM->Put(packetBufferM, length);
if (p != length)
ringBufferM->ReportOverflow(length - p);
}
} }
else else
sleepM.Wait(10); // to avoid busy loop and reduce cpu load sleepM.Wait(10); // to avoid busy loop and reduce cpu load

View File

@ -11,15 +11,15 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <vdr/thread.h> #include <vdr/thread.h>
#include <vdr/ringbuffer.h>
#include "deviceif.h"
#include "protocolif.h" #include "protocolif.h"
#include "statistics.h" #include "statistics.h"
class cIptvStreamer : public cThread, public cIptvStreamerStatistics { class cIptvStreamer : public cThread, public cIptvStreamerStatistics {
private: private:
cRingBufferLinear* ringBufferM;
cCondWait sleepM; cCondWait sleepM;
cIptvDeviceIf* deviceM;
unsigned char* packetBufferM; unsigned char* packetBufferM;
unsigned int packetBufferLenM; unsigned int packetBufferLenM;
cIptvProtocolIf* protocolM; cIptvProtocolIf* protocolM;
@ -28,7 +28,7 @@ protected:
virtual void Action(void); virtual void Action(void);
public: public:
cIptvStreamer(cRingBufferLinear* ringBufferP, unsigned int packetLenP); cIptvStreamer(cIptvDeviceIf &deviceP, unsigned int packetLenP);
virtual ~cIptvStreamer(); virtual ~cIptvStreamer();
bool Set(const char* locationP, const int parameterP, const int indexP, cIptvProtocolIf* protocolP); bool Set(const char* locationP, const int parameterP, const int indexP, cIptvProtocolIf* protocolP);
bool Open(void); bool Open(void);