mirror of
https://projects.vdr-developer.org/git/vdr-plugin-streamdev.git
synced 2023-10-10 19:16:51 +02:00
client_filter-data-handling.patch by Petri Hintukainen
- regonize PUSI flag in TS packets (bullet-proof section start+end indicator) - Use own TS buffer to read directly from socket, no need for ring buffer anymore - Re-activate all active filters after re-connection to server - Simplify thread start/stop/running detection to current VDR style - Update "filter closed by VDR" detection (datagram sockets return different errno's than pipes) - Deliver data to first matching and active filter (do not drop data if first matching filter has been closed, there is quite likely new filter for it) - Add disconnect detection to avoid 100% CPU usage in cTSBuffer::Action() Modified Files: client/filter.c client/filter.h
This commit is contained in:
parent
52bf110aa9
commit
518886b08b
125
client/filter.c
125
client/filter.c
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* $Id: filter.c,v 1.10 2007/04/23 12:52:28 schmirl Exp $
|
* $Id: filter.c,v 1.11 2007/04/24 11:23:16 schmirl Exp $
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "client/filter.h"
|
#include "client/filter.h"
|
||||||
@ -27,7 +27,7 @@ public:
|
|||||||
virtual ~cStreamdevFilter();
|
virtual ~cStreamdevFilter();
|
||||||
|
|
||||||
bool Matches(u_short Pid, u_char Tid);
|
bool Matches(u_short Pid, u_char Tid);
|
||||||
bool PutSection(const uchar *Data, int Length);
|
bool PutSection(const uchar *Data, int Length, bool Pusi);
|
||||||
int ReadPipe(void) const { return m_Pipe[0]; }
|
int ReadPipe(void) const { return m_Pipe[0]; }
|
||||||
|
|
||||||
bool IsClosed(void);
|
bool IsClosed(void);
|
||||||
@ -75,7 +75,13 @@ cStreamdevFilter::~cStreamdevFilter() {
|
|||||||
close(m_Pipe[1]);
|
close(m_Pipe[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool cStreamdevFilter::PutSection(const uchar *Data, int Length) {
|
bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) {
|
||||||
|
|
||||||
|
if (!m_Used && !Pusi) /* wait for payload unit start indicator */
|
||||||
|
return true;
|
||||||
|
if (m_Used && Pusi) /* reset at payload unit start */
|
||||||
|
Reset();
|
||||||
|
|
||||||
if (m_Used + Length >= (int)sizeof(m_Buffer)) {
|
if (m_Used + Length >= (int)sizeof(m_Buffer)) {
|
||||||
esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)",
|
esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)",
|
||||||
Length);
|
Length);
|
||||||
@ -141,17 +147,11 @@ bool cStreamdevFilter::IsClosed(void) {
|
|||||||
|
|
||||||
cStreamdevFilters::cStreamdevFilters(void):
|
cStreamdevFilters::cStreamdevFilters(void):
|
||||||
cThread("streamdev-client: sections assembler") {
|
cThread("streamdev-client: sections assembler") {
|
||||||
m_Active = false;
|
m_TSBuffer = NULL;
|
||||||
m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true);
|
|
||||||
Start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cStreamdevFilters::~cStreamdevFilters() {
|
cStreamdevFilters::~cStreamdevFilters() {
|
||||||
if (m_Active) {
|
SetConnection(-1);
|
||||||
m_Active = false;
|
|
||||||
Cancel(3);
|
|
||||||
}
|
|
||||||
delete m_RingBuffer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
|
int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
|
||||||
@ -194,46 +194,101 @@ void cStreamdevFilters::CarbageCollect(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) {
|
bool cStreamdevFilters::ReActivateFilters(void)
|
||||||
for (cStreamdevFilter *f = First(); f; f = Next(f)) {
|
{
|
||||||
if (f->Matches(Pid, Tid))
|
LOCK_THREAD;
|
||||||
return f;
|
|
||||||
|
bool res = true;
|
||||||
|
CarbageCollect();
|
||||||
|
for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) {
|
||||||
|
res = ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res;
|
||||||
|
Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL");
|
||||||
}
|
}
|
||||||
return NULL;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
void cStreamdevFilters::Put(const uchar *Data) {
|
void cStreamdevFilters::SetConnection(int Handle) {
|
||||||
int p = m_RingBuffer->Put(Data, TS_SIZE);
|
|
||||||
if (p != TS_SIZE)
|
Cancel(2);
|
||||||
m_RingBuffer->ReportOverflow(TS_SIZE - p);
|
DELETENULL(m_TSBuffer);
|
||||||
|
|
||||||
|
if (Handle >= 0) {
|
||||||
|
m_TSBuffer = new cTSBuffer(Handle, MEGABYTE(1), 1);
|
||||||
|
ReActivateFilters();
|
||||||
|
Start();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void cStreamdevFilters::Action(void) {
|
void cStreamdevFilters::Action(void) {
|
||||||
m_Active = true;
|
int fails = 0;
|
||||||
while (m_Active) {
|
|
||||||
int recvd;
|
|
||||||
const uchar *block = m_RingBuffer->Get(recvd);
|
|
||||||
|
|
||||||
if (block && recvd > 0) {
|
while (Running()) {
|
||||||
cStreamdevFilter *f;
|
const uchar *block = m_TSBuffer->Get();
|
||||||
|
if (block) {
|
||||||
u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2];
|
u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2];
|
||||||
u_char tid = block[3];
|
u_char tid = block[3];
|
||||||
|
bool Pusi = block[1] & 0x40;
|
||||||
if ((f = Matches(pid, tid)) != NULL) {
|
int len = block[4];
|
||||||
int len = block[4];
|
#if 0
|
||||||
if (!f->PutSection(block + 5, len)) {
|
if (block[1] == 0xff &&
|
||||||
if (errno != EPIPE) {
|
block[2] == 0xff &&
|
||||||
esyslog("streamdev-client: couldn't send section packet: %m");
|
block[3] == 0xff &&
|
||||||
|
block[4] == 0x7f)
|
||||||
|
isyslog("*********** TRANSPONDER -> %s **********", block+5);
|
||||||
|
#endif
|
||||||
|
LOCK_THREAD;
|
||||||
|
cStreamdevFilter *f = First();
|
||||||
|
while (f) {
|
||||||
|
cStreamdevFilter *next = Next(f);
|
||||||
|
if (f->Matches(pid, tid)) {
|
||||||
|
|
||||||
|
if (f->PutSection(block + 5, len, Pusi))
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (errno != ECONNREFUSED &&
|
||||||
|
errno != ECONNRESET &&
|
||||||
|
errno != EPIPE) {
|
||||||
Dprintf("FATAL ERROR: %m\n");
|
Dprintf("FATAL ERROR: %m\n");
|
||||||
|
esyslog("streamdev-client: couldn't send section packet: %m");
|
||||||
}
|
}
|
||||||
ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false);
|
ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false);
|
||||||
Del(f);
|
Del(f);
|
||||||
|
// Filter was closed.
|
||||||
|
// - need to check remaining filters for another match
|
||||||
}
|
}
|
||||||
|
f = next;
|
||||||
}
|
}
|
||||||
m_RingBuffer->Del(TS_SIZE);
|
} else {
|
||||||
} else
|
#if 1 // TODO: this should be fixed in vdr cTSBuffer
|
||||||
usleep(1);
|
// Check disconnection
|
||||||
|
int fd = *ClientSocket.DataSocket(siLiveFilter);
|
||||||
|
if(fd < 0)
|
||||||
|
break;
|
||||||
|
cPoller Poller(fd);
|
||||||
|
if (Poller.Poll()) {
|
||||||
|
char tmp[1];
|
||||||
|
errno = 0;
|
||||||
|
Dprintf("cStreamdevFilters::Action(): checking connection");
|
||||||
|
if (recv(fd, tmp, 1, MSG_PEEK) == 0 && errno != EAGAIN) {
|
||||||
|
++fails;
|
||||||
|
if (fails >= 10) {
|
||||||
|
esyslog("cStreamdevFilters::Action(): stream disconnected ?");
|
||||||
|
ClientSocket.CloseDataConnection(siLiveFilter);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fails = 0;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fails = 0;
|
||||||
|
}
|
||||||
|
cCondWait::SleepMs(10);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DELETENULL(m_TSBuffer);
|
||||||
|
dsyslog("StreamdevFilters::Action() ended");
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif // VDRVERSNUM >= 10300
|
#endif // VDRVERSNUM >= 10300
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* $Id: filter.h,v 1.3 2007/04/23 12:52:28 schmirl Exp $
|
* $Id: filter.h,v 1.4 2007/04/24 11:23:16 schmirl Exp $
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef VDR_STREAMDEV_FILTER_H
|
#ifndef VDR_STREAMDEV_FILTER_H
|
||||||
@ -12,26 +12,25 @@
|
|||||||
#include <vdr/tools.h>
|
#include <vdr/tools.h>
|
||||||
#include <vdr/thread.h>
|
#include <vdr/thread.h>
|
||||||
|
|
||||||
class cRingBufferLinear;
|
|
||||||
class cTSBuffer;
|
class cTSBuffer;
|
||||||
class cStreamdevFilter;
|
class cStreamdevFilter;
|
||||||
|
|
||||||
class cStreamdevFilters: public cList<cStreamdevFilter>, public cThread {
|
class cStreamdevFilters: public cList<cStreamdevFilter>, public cThread {
|
||||||
private:
|
private:
|
||||||
bool m_Active;
|
cTSBuffer *m_TSBuffer;
|
||||||
cRingBufferLinear *m_RingBuffer;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void Action(void);
|
virtual void Action(void);
|
||||||
void CarbageCollect(void);
|
void CarbageCollect(void);
|
||||||
|
|
||||||
|
bool ReActivateFilters(void);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
cStreamdevFilters(void);
|
cStreamdevFilters(void);
|
||||||
virtual ~cStreamdevFilters();
|
virtual ~cStreamdevFilters();
|
||||||
|
|
||||||
|
void SetConnection(int Handle);
|
||||||
int OpenFilter(u_short Pid, u_char Tid, u_char Mask);
|
int OpenFilter(u_short Pid, u_char Tid, u_char Mask);
|
||||||
cStreamdevFilter *Matches(u_short Pid, u_char Tid);
|
|
||||||
void Put(const uchar *Data);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
# endif // VDRVERSNUM >= 10300
|
# endif // VDRVERSNUM >= 10300
|
||||||
|
Loading…
x
Reference in New Issue
Block a user