vdr-plugin-streamdev/client/filter.c

240 lines
6.0 KiB
C

/*
* $Id: filter.c,v 1.10 2007/04/23 12:52:28 schmirl Exp $
*/
#include "client/filter.h"
#include "client/socket.h"
#include "tools/select.h"
#include "common.h"
#include <vdr/device.h>
#if VDRVERSNUM >= 10300
// --- cStreamdevFilter ------------------------------------------------------
class cStreamdevFilter: public cListObject {
private:
uchar m_Buffer[4096];
int m_Used;
int m_Pipe[2];
u_short m_Pid;
u_char m_Tid;
u_char m_Mask;
public:
cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask);
virtual ~cStreamdevFilter();
bool Matches(u_short Pid, u_char Tid);
bool PutSection(const uchar *Data, int Length);
int ReadPipe(void) const { return m_Pipe[0]; }
bool IsClosed(void);
void Reset(void);
u_short Pid(void) const { return m_Pid; }
u_char Tid(void) const { return m_Tid; }
u_char Mask(void) const { return m_Mask; }
};
inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) {
return m_Pid == Pid && m_Tid == (Tid & m_Mask);
}
cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) {
m_Used = 0;
m_Pid = Pid;
m_Tid = Tid;
m_Mask = Mask;
m_Pipe[0] = m_Pipe[1] = -1;
#ifdef SOCK_SEQPACKET
// SOCK_SEQPACKET (since kernel 2.6.4)
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, m_Pipe) != 0) {
esyslog("streamdev-client: socketpair(SOCK_SEQPACKET) failed: %m, trying SOCK_DGRAM");
}
#endif
if (m_Pipe[0] < 0 && socketpair(AF_UNIX, SOCK_DGRAM, 0, m_Pipe) != 0) {
esyslog("streamdev-client: couldn't open section filter socket: %m");
}
else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 ||
fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) {
esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m");
}
}
cStreamdevFilter::~cStreamdevFilter() {
Dprintf("~cStreamdevFilter %p\n", this);
// ownership of handle m_Pipe[0] has been transferred to VDR section handler
//if (m_Pipe[0] >= 0)
// close(m_Pipe[0]);
if (m_Pipe[1] >= 0)
close(m_Pipe[1]);
}
bool cStreamdevFilter::PutSection(const uchar *Data, int Length) {
if (m_Used + Length >= (int)sizeof(m_Buffer)) {
esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)",
Length);
Reset();
return true;
}
memcpy(m_Buffer + m_Used, Data, Length);
m_Used += Length;
if (m_Used > 3) {
int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3;
if (m_Used == length) {
m_Used = 0;
if (write(m_Pipe[1], m_Buffer, length) < 0) {
if(errno == EAGAIN || errno == EWOULDBLOCK)
dsyslog("cStreamdevFilter::PutSection socket overflow, "
"Pid %4d Tid %3d", m_Pid, m_Tid);
else
return false;
}
}
if (m_Used > length) {
dsyslog("cStreamdevFilter::PutSection: m_Used > length ! Pid %2d, Tid%2d "
"(len %3d, got %d/%d)", m_Pid, m_Tid, Length, m_Used, length);
if(Length < TS_SIZE-5) {
// TS packet not full -> this must be last TS packet of section data -> safe to reset now
Reset();
}
}
}
return true;
}
void cStreamdevFilter::Reset(void) {
if(m_Used)
dsyslog("cStreamdevFilter::Reset skipping %d bytes", m_Used);
m_Used = 0;
}
bool cStreamdevFilter::IsClosed(void) {
char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */
// Test if pipe/socket has been closed by writing empty section
if (write(m_Pipe[1], m_Buffer, 3) < 0 &&
errno != EAGAIN &&
errno != EWOULDBLOCK) {
if (errno != ECONNREFUSED &&
errno != ECONNRESET &&
errno != EPIPE)
esyslog("cStreamdevFilter::IsClosed failed: %m");
return true;
}
return false;
}
// --- cStreamdevFilters -----------------------------------------------------
cStreamdevFilters::cStreamdevFilters(void):
cThread("streamdev-client: sections assembler") {
m_Active = false;
m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true);
Start();
}
cStreamdevFilters::~cStreamdevFilters() {
if (m_Active) {
m_Active = false;
Cancel(3);
}
delete m_RingBuffer;
}
int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
CarbageCollect();
cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask);
int fh = f->ReadPipe();
Lock();
Add(f);
Unlock();
return fh;
}
void cStreamdevFilters::CarbageCollect(void) {
LOCK_THREAD;
for (cStreamdevFilter *fi = First(); fi;) {
if (fi->IsClosed()) {
if (errno == ECONNREFUSED ||
errno == ECONNRESET ||
errno == EPIPE) {
ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false);
Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)",
(int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
cStreamdevFilter *next = Prev(fi);
Del(fi);
fi = next ? Next(next) : First();
} else {
esyslog("cStreamdevFilters::CarbageCollector() error: "
"Pid %4d, Tid %3d, Mask %2x (%d filters left) failed",
(int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
LOG_ERROR;
fi = Next(fi);
}
} else {
fi = Next(fi);
}
}
}
cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) {
for (cStreamdevFilter *f = First(); f; f = Next(f)) {
if (f->Matches(Pid, Tid))
return f;
}
return NULL;
}
void cStreamdevFilters::Put(const uchar *Data) {
int p = m_RingBuffer->Put(Data, TS_SIZE);
if (p != TS_SIZE)
m_RingBuffer->ReportOverflow(TS_SIZE - p);
}
void cStreamdevFilters::Action(void) {
m_Active = true;
while (m_Active) {
int recvd;
const uchar *block = m_RingBuffer->Get(recvd);
if (block && recvd > 0) {
cStreamdevFilter *f;
u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2];
u_char tid = block[3];
if ((f = Matches(pid, tid)) != NULL) {
int len = block[4];
if (!f->PutSection(block + 5, len)) {
if (errno != EPIPE) {
esyslog("streamdev-client: couldn't send section packet: %m");
Dprintf("FATAL ERROR: %m\n");
}
ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false);
Del(f);
}
}
m_RingBuffer->Del(TS_SIZE);
} else
usleep(1);
}
}
#endif // VDRVERSNUM >= 10300