Modified sectionfilters to use socket pair instead of filesystem fifos.

This commit is contained in:
Rolf Ahrenberg 2009-10-01 18:48:28 +03:00
parent dbeb014a85
commit 49fcbc8921
10 changed files with 52 additions and 56 deletions

View File

@ -114,3 +114,5 @@ VDR Plugin 'iptv' Revision History
- Updated patches. - Updated patches.
- Added optional patches to disable EIT scanning. - Added optional patches to disable EIT scanning.
- Fixed handling of HTTP protocol headers. - Fixed handling of HTTP protocol headers.
- Modified sectionfilters to use socket pair instead of
filesystem fifos.

View File

@ -14,10 +14,10 @@
#ifdef DEBUG #ifdef DEBUG
#define debug(x...) dsyslog("IPTV: " x); #define debug(x...) dsyslog("IPTV: " x);
#define error(x...) esyslog("IPTV: " x); #define error(x...) esyslog("ERROR: " x);
#else #else
#define debug(x...) ; #define debug(x...) ;
#define error(x...) esyslog("IPTV: " x); #define error(x...) esyslog("ERROR: " x);
#endif #endif
#ifndef trNOOP #ifndef trNOOP
@ -28,7 +28,6 @@
#define trVDR(s) tr(s) #define trVDR(s) tr(s)
#endif #endif
#define IPTV_FILTER_FILENAME "/tmp/vdr-iptv%d.filter%d"
#define IPTV_DVR_FILENAME "/tmp/vdr-iptv%d.dvr" #define IPTV_DVR_FILENAME "/tmp/vdr-iptv%d.dvr"
#define IPTV_DEVICE_INFO_ALL 0 #define IPTV_DEVICE_INFO_ALL 0
@ -42,13 +41,13 @@
#define SECTION_FILTER_TABLE_SIZE 7 #define SECTION_FILTER_TABLE_SIZE 7
#define ERROR_IF_FUNC(exp, errstr, func, ret) \ #define ERROR_IF_FUNC(exp, errstr, func, ret) \
do { \ do { \
if (exp) { \ if (exp) { \
char tmp[64]; \ char tmp[64]; \
error("ERROR: "errstr": %s", strerror_r(errno, tmp, sizeof(tmp))); \ error(errstr": %s", strerror_r(errno, tmp, sizeof(tmp))); \
func; \ func; \
ret; \ ret; \
} \ } \
} while (0) } while (0)

View File

@ -272,7 +272,7 @@ bool cIptvDevice::SetChannelDevice(const cChannel *Channel, bool LiveView)
debug("cIptvDevice::SetChannelDevice(%d)\n", deviceIndex); debug("cIptvDevice::SetChannelDevice(%d)\n", deviceIndex);
location = GetChannelSettings(Channel->PluginParam(), &parameter, &sidscan, &pidscan, &protocol); location = GetChannelSettings(Channel->PluginParam(), &parameter, &sidscan, &pidscan, &protocol);
if (isempty(location)) { if (isempty(location)) {
error("ERROR: Unrecognized IPTV channel settings: %s", Channel->PluginParam()); error("Unrecognized IPTV channel settings: %s", Channel->PluginParam());
return false; return false;
} }
sidScanEnabled = sidscan ? true : false; sidScanEnabled = sidscan ? true : false;
@ -427,7 +427,7 @@ bool cIptvDevice::GetTSPacket(uchar *&Data)
} }
} }
tsBuffer->Del(Count); tsBuffer->Del(Count);
error("ERROR: skipped %d bytes to sync on TS packet\n", Count); error("Skipped %d bytes to sync on TS packet\n", Count);
return false; return false;
} }
isPacketDelivered = true; isPacketDelivered = true;

View File

@ -39,7 +39,7 @@ void cIptvProtocolExt::ExecuteScript(void)
debug("cIptvProtocolExt::ExecuteScript()\n"); debug("cIptvProtocolExt::ExecuteScript()\n");
// Check if already executing // Check if already executing
if (pid > 0) { if (pid > 0) {
error("ERROR: Cannot execute script!"); error("Cannot execute script!");
return; return;
} }
// Let's fork // Let's fork
@ -54,7 +54,7 @@ void cIptvProtocolExt::ExecuteScript(void)
cString cmd = cString::sprintf("%s %d %d", *scriptFile, scriptParameter, socketPort); cString cmd = cString::sprintf("%s %d %d", *scriptFile, scriptParameter, socketPort);
debug("cIptvProtocolExt::ExecuteScript(child): %s\n", *cmd); debug("cIptvProtocolExt::ExecuteScript(child): %s\n", *cmd);
if (execl("/bin/sh", "sh", "-c", *cmd, NULL) == -1) { if (execl("/bin/sh", "sh", "-c", *cmd, NULL) == -1) {
error("ERROR: Script execution failed: %s", *cmd); error("Script execution failed: %s", *cmd);
_exit(-1); _exit(-1);
} }
_exit(0); _exit(0);
@ -79,7 +79,7 @@ void cIptvProtocolExt::TerminateScript(void)
retval = 0; retval = 0;
waitms += timeoutms; waitms += timeoutms;
if ((waitms % 2000) == 0) { if ((waitms % 2000) == 0) {
error("ERROR: Script '%s' won't terminate - killing it!", *scriptFile); error("Script '%s' won't terminate - killing it!", *scriptFile);
kill(pid, SIGKILL); kill(pid, SIGKILL);
} }
// Clear wait status to make sure child exit status is accessible // Clear wait status to make sure child exit status is accessible
@ -140,7 +140,7 @@ bool cIptvProtocolExt::Set(const char* Location, const int Parameter, const int
// Update script file and parameter // Update script file and parameter
scriptFile = cString::sprintf("%s/%s", IptvConfig.GetConfigDirectory(), Location); scriptFile = cString::sprintf("%s/%s", IptvConfig.GetConfigDirectory(), Location);
if ((stat(*scriptFile, &stbuf) != 0) || (strstr(*scriptFile, "..") != 0)) { if ((stat(*scriptFile, &stbuf) != 0) || (strstr(*scriptFile, "..") != 0)) {
error("ERROR: Non-existent or relative path script '%s'", *scriptFile); error("Non-existent or relative path script '%s'", *scriptFile);
return false; return false;
} }
scriptParameter = Parameter; scriptParameter = Parameter;

View File

@ -53,9 +53,8 @@ bool cIptvProtocolHttp::Connect(void)
struct hostent *host; struct hostent *host;
host = gethostbyname(streamAddr); host = gethostbyname(streamAddr);
if (!host) { if (!host) {
error("%s is not valid address\n", streamAddr);
char tmp[64]; char tmp[64];
error("ERROR: %s", strerror_r(h_errno, tmp, sizeof(tmp))); error("%s is not valid address: %s", streamAddr, strerror_r(h_errno, tmp, sizeof(tmp)));
return false; return false;
} }
@ -77,9 +76,8 @@ bool cIptvProtocolHttp::Connect(void)
// If not any errors, then socket must be ready and connected // If not any errors, then socket must be ready and connected
if (socketStatus != 0) { if (socketStatus != 0) {
error("Cannot connect to %s\n", streamAddr);
char tmp[64]; char tmp[64];
error("ERROR: %s", strerror_r(socketStatus, tmp, sizeof(tmp))); error("Cannot connect to %s: %s", streamAddr, strerror_r(socketStatus, tmp, sizeof(tmp)));
CloseSocket(); CloseSocket();
return false; return false;
} }
@ -192,7 +190,7 @@ bool cIptvProtocolHttp::ProcessHeaders(void)
else else
responseFound = true; responseFound = true;
if (response != 200) { if (response != 200) {
error("ERROR: %s\n", buf); error("%s\n", buf);
return false; return false;
} }
} }

View File

@ -18,8 +18,7 @@ cIptvSectionFilter::cIptvSectionFilter(int DeviceIndex, int Index,
tsfeedp(0), tsfeedp(0),
pid(Pid), pid(Pid),
devid(DeviceIndex), devid(DeviceIndex),
id(Index), id(Index)
pipeName("")
{ {
//debug("cIptvSectionFilter::cIptvSectionFilter(%d, %d)\n", devid, id); //debug("cIptvSectionFilter::cIptvSectionFilter(%d, %d)\n", devid, id);
int i; int i;
@ -48,33 +47,35 @@ cIptvSectionFilter::cIptvSectionFilter(int DeviceIndex, int Index,
} }
doneq = local_doneq ? 1 : 0; doneq = local_doneq ? 1 : 0;
struct stat sb; // Create sockets
pipeName = cString::sprintf(IPTV_FILTER_FILENAME, devid, id); socket[0] = socket[1] = -1;
stat(pipeName, &sb); if (socketpair(AF_UNIX, SOCK_DGRAM, 0, socket) != 0) {
if (S_ISFIFO(sb.st_mode)) char tmp[64];
unlink(pipeName); error("Opening section filter sockets failed (device=%d id=%d): %s\n", devid, id, strerror_r(errno, tmp, sizeof(tmp)));
i = mknod(pipeName, 0644 | S_IFIFO, 0); }
ERROR_IF_RET(i < 0, "mknod()", return); else if ((fcntl(socket[0], F_SETFL, O_NONBLOCK) != 0) || (fcntl(socket[1], F_SETFL, O_NONBLOCK) != 0)) {
char tmp[64];
// Create descriptors error("Setting section filter socket to non-blocking mode failed (device=%d id=%d): %s", devid, id, strerror_r(errno, tmp, sizeof(tmp)));
fifoDescriptor = open(pipeName, O_RDWR | O_NONBLOCK); }
readDescriptor = open(pipeName, O_RDONLY | O_NONBLOCK);
} }
cIptvSectionFilter::~cIptvSectionFilter() cIptvSectionFilter::~cIptvSectionFilter()
{ {
//debug("cIptvSectionFilter::~cIptvSectionfilter(%d, %d)\n", devid, id); //debug("cIptvSectionFilter::~cIptvSectionfilter(%d, %d)\n", devid, id);
close(fifoDescriptor); int tmp = socket[1];
close(readDescriptor); socket[1] = -1;
unlink(pipeName); if (tmp >= 0)
fifoDescriptor = -1; close(tmp);
readDescriptor = -1; tmp = socket[0];
socket[0] = -1;
if (tmp >= 0)
close(tmp);
secbuf = NULL; secbuf = NULL;
} }
int cIptvSectionFilter::GetReadDesc(void) int cIptvSectionFilter::GetReadDesc(void)
{ {
return readDescriptor; return socket[0];
} }
inline uint16_t cIptvSectionFilter::GetLength(const uint8_t *Data) inline uint16_t cIptvSectionFilter::GetLength(const uint8_t *Data)
@ -104,9 +105,9 @@ int cIptvSectionFilter::Filter(void)
if (doneq && !neq) if (doneq && !neq)
return 0; return 0;
// There is no data in the fifo, more can be written // There is no data in the read socket, more can be written
if (!select_single_desc(fifoDescriptor, 0, false)) { if ((socket[0] >= 0) && (socket[1] >= 0) /*&& !select_single_desc(socket[0], 0, false)*/) {
ssize_t len = write(fifoDescriptor, secbuf, seclen); ssize_t len = write(socket[1], secbuf, seclen);
ERROR_IF(len < 0, "write()"); ERROR_IF(len < 0, "write()");
// Update statistics // Update statistics
AddSectionStatistic(len, 1); AddSectionStatistic(len, 1);
@ -148,8 +149,7 @@ int cIptvSectionFilter::CopyDump(const uint8_t *buf, uint8_t len)
for (n = 0; secbufp + 2 < limit; ++n) { for (n = 0; secbufp + 2 < limit; ++n) {
seclen_local = GetLength(secbuf); seclen_local = GetLength(secbuf);
if (seclen_local <= 0 || seclen_local > DMX_MAX_SECTION_SIZE || if ((seclen_local <= 0) || (seclen_local > DMX_MAX_SECTION_SIZE) || ((seclen_local + secbufp) > limit))
seclen_local + secbufp > limit)
return 0; return 0;
seclen = seclen_local; seclen = seclen_local;
if (pusi_seen) if (pusi_seen)

View File

@ -21,9 +21,6 @@ private:
DMX_MAX_SECFEED_SIZE = (DMX_MAX_SECTION_SIZE + TS_SIZE) DMX_MAX_SECFEED_SIZE = (DMX_MAX_SECTION_SIZE + TS_SIZE)
}; };
int fifoDescriptor;
int readDescriptor;
int pusi_seen; int pusi_seen;
int feedcc; int feedcc;
int doneq; int doneq;
@ -37,7 +34,7 @@ private:
int devid; int devid;
int id; int id;
cString pipeName; int socket[2];
uint8_t filter_value[DMX_MAX_FILTER_SIZE]; uint8_t filter_value[DMX_MAX_FILTER_SIZE];
uint8_t filter_mask[DMX_MAX_FILTER_SIZE]; uint8_t filter_mask[DMX_MAX_FILTER_SIZE];

View File

@ -103,7 +103,7 @@ int cIptvUdpSocket::Read(unsigned char* BufferAddr, unsigned int BufferLen)
//debug("cIptvUdpSocket::Read()\n"); //debug("cIptvUdpSocket::Read()\n");
// Error out if socket not initialized // Error out if socket not initialized
if (socketDesc <= 0) { if (socketDesc <= 0) {
error("ERROR: Invalid socket in %s\n", __FUNCTION__); error("Invalid socket in %s\n", __FUNCTION__);
return -1; return -1;
} }
socklen_t addrlen = sizeof(sockAddr); socklen_t addrlen = sizeof(sockAddr);
@ -168,7 +168,7 @@ int cIptvTcpSocket::Read(unsigned char* BufferAddr, unsigned int BufferLen)
//debug("cIptvTcpSocket::Read()\n"); //debug("cIptvTcpSocket::Read()\n");
// Error out if socket not initialized // Error out if socket not initialized
if (socketDesc <= 0) { if (socketDesc <= 0) {
error("ERROR: Invalid socket in %s\n", __FUNCTION__); error("Invalid socket in %s\n", __FUNCTION__);
return -1; return -1;
} }
socklen_t addrlen = sizeof(sockAddr); socklen_t addrlen = sizeof(sockAddr);

View File

@ -32,7 +32,7 @@ cString cIptvSectionStatistics::GetSectionStatistic()
cMutexLock MutexLock(&mutex); cMutexLock MutexLock(&mutex);
uint64_t elapsed = timer.Elapsed(); /* in milliseconds */ uint64_t elapsed = timer.Elapsed(); /* in milliseconds */
timer.Set(); timer.Set();
long bitrate = elapsed ? (1000L * filteredData / KILOBYTE(1) / elapsed) : 0L; long bitrate = elapsed ? (long)(1000.0L * filteredData / KILOBYTE(1) / elapsed) : 0L;
if (!IptvConfig.GetUseBytes()) if (!IptvConfig.GetUseBytes())
bitrate *= 8; bitrate *= 8;
// no trailing linefeed here! // no trailing linefeed here!
@ -75,7 +75,7 @@ cString cIptvPidStatistics::GetPidStatistic()
cString info("Active pids:\n"); cString info("Active pids:\n");
for (unsigned int i = 0; i < IPTV_STATS_ACTIVE_PIDS_COUNT; ++i) { for (unsigned int i = 0; i < IPTV_STATS_ACTIVE_PIDS_COUNT; ++i) {
if (mostActivePids[i].pid) { if (mostActivePids[i].pid) {
long bitrate = elapsed ? (1000L * mostActivePids[i].DataAmount / KILOBYTE(1) / elapsed) : 0L; long bitrate = elapsed ? (long)(1000.0L * mostActivePids[i].DataAmount / KILOBYTE(1) / elapsed) : 0L;
if (!IptvConfig.GetUseBytes()) if (!IptvConfig.GetUseBytes())
bitrate *= 8; bitrate *= 8;
info = cString::sprintf("%sPid %d: %4d (%4ld k%s/s)\n", *info, i, info = cString::sprintf("%sPid %d: %4d (%4ld k%s/s)\n", *info, i,
@ -145,7 +145,7 @@ cString cIptvStreamerStatistics::GetStreamerStatistic()
cMutexLock MutexLock(&mutex); cMutexLock MutexLock(&mutex);
uint64_t elapsed = timer.Elapsed(); /* in milliseconds */ uint64_t elapsed = timer.Elapsed(); /* in milliseconds */
timer.Set(); timer.Set();
long bitrate = elapsed ? (1000L * dataBytes / KILOBYTE(1) / elapsed) : 0L; long bitrate = elapsed ? (long)(1000.0L * dataBytes / KILOBYTE(1) / elapsed) : 0L;
if (!IptvConfig.GetUseBytes()) if (!IptvConfig.GetUseBytes())
bitrate *= 8; bitrate *= 8;
cString info = cString::sprintf("Stream bitrate: %ld k%s/s\n", bitrate, IptvConfig.GetUseBytes() ? "B" : "bit"); cString info = cString::sprintf("Stream bitrate: %ld k%s/s\n", bitrate, IptvConfig.GetUseBytes() ? "B" : "bit");
@ -182,7 +182,7 @@ cString cIptvBufferStatistics::GetBufferStatistic()
cMutexLock MutexLock(&mutex); cMutexLock MutexLock(&mutex);
uint64_t elapsed = timer.Elapsed(); /* in milliseconds */ uint64_t elapsed = timer.Elapsed(); /* in milliseconds */
timer.Set(); timer.Set();
long bitrate = elapsed ? (1000L * dataBytes / KILOBYTE(1) / elapsed) : 0L; long bitrate = elapsed ? (long)(1000.0L * dataBytes / KILOBYTE(1) / elapsed) : 0L;
long totalSpace = MEGABYTE(IptvConfig.GetTsBufferSize()); long totalSpace = MEGABYTE(IptvConfig.GetTsBufferSize());
float percentage = (float)((float)usedSpace / (float)totalSpace * 100.0); float percentage = (float)((float)usedSpace / (float)totalSpace * 100.0);
long totalKilos = totalSpace / KILOBYTE(1); long totalKilos = totalSpace / KILOBYTE(1);

View File

@ -23,7 +23,7 @@ cIptvStreamer::cIptvStreamer(cRingBufferLinear* RingBuffer, unsigned int PacketL
if (packetBuffer) if (packetBuffer)
memset(packetBuffer, 0, packetBufferLen); memset(packetBuffer, 0, packetBufferLen);
else else
error("ERROR: MALLOC() failed for packet buffer"); error("MALLOC() failed for packet buffer");
} }
cIptvStreamer::~cIptvStreamer() cIptvStreamer::~cIptvStreamer()