diff --git a/Makefile b/Makefile index 7be2627..8529750 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # # Makefile for a Video Disk Recorder plugin # -# $Id: Makefile,v 1.7 2006/09/14 10:30:16 schmirl Exp $ +# $Id: Makefile,v 1.8 2007/04/16 11:01:02 schmirl Exp $ # The official name of this plugin. # This name will be used in the '-P...' option of VDR to load the plugin. @@ -16,7 +16,7 @@ VERSION = $(shell grep 'const char \*VERSION *=' common.c | awk '{ print $$5 }' ### The C++ compiler and options: CXX ?= g++ -CXXFLAGS ?= -fPIC -W -Woverloaded-virtual +CXXFLAGS ?= -fPIC -Wall -Woverloaded-virtual ### The directory environment: diff --git a/client/device.c b/client/device.c index a2e4580..be79bb9 100644 --- a/client/device.c +++ b/client/device.c @@ -1,5 +1,5 @@ /* - * $Id: device.c,v 1.8 2007/01/15 12:15:12 schmirl Exp $ + * $Id: device.c,v 1.13 2007/05/07 12:18:18 schmirl Exp $ */ #include "client/device.h" @@ -42,6 +42,8 @@ cStreamdevDevice::cStreamdevDevice(void) { #endif m_Device = this; + m_Pids = 0; + m_DvrClosed = true; if (StreamdevClientSetup.SyncEPG) ClientSocket.SynchronizeEPG(); @@ -59,13 +61,22 @@ cStreamdevDevice::~cStreamdevDevice() { bool cStreamdevDevice::ProvidesSource(int Source) const { Dprintf("ProvidesSource, Source=%d\n", Source); - return false; + return true; } bool cStreamdevDevice::ProvidesTransponder(const cChannel *Channel) const { Dprintf("ProvidesTransponder\n"); - return false; + return true; +} + +bool cStreamdevDevice::IsTunedToTransponder(const cChannel *Channel) +{ + bool res = false; + if (ClientSocket.DataSocket(siLive) != NULL + && TRANSPONDER(Channel, m_Channel)) + res = true; + return res; } bool cStreamdevDevice::ProvidesChannel(const cChannel *Channel, int Priority, @@ -123,37 +134,118 @@ bool cStreamdevDevice::SetChannelDevice(const cChannel *Channel, bool cStreamdevDevice::SetPid(cPidHandle *Handle, int Type, bool On) { Dprintf("SetPid, Pid=%d, Type=%d, On=%d, used=%d\n", Handle->pid, Type, On, Handle->used); - if (Handle->pid && (On || !Handle->used)) - return ClientSocket.SetPid(Handle->pid, On); - return true; + LOCK_THREAD; + + if (On && !m_TSBuffer) { + Dprintf("SetPid: no data connection -> OpenDvr()"); + OpenDvrInt(); + } + + bool res = true; + if (Handle->pid && (On || !Handle->used)) { + res = ClientSocket.SetPid(Handle->pid, On); + + m_Pids += (!res) ? 0 : On ? 1 : -1; + if (m_Pids < 0) + m_Pids = 0; + + if(m_Pids < 1 && m_DvrClosed) { + Dprintf("SetPid: 0 pids left -> CloseDvr()"); + CloseDvrInt(); + } + } + + return res; +} + +bool cStreamdevDevice::OpenDvrInt(void) { + Dprintf("OpenDvrInt\n"); + LOCK_THREAD; + + CloseDvrInt(); + if (m_TSBuffer) { + Dprintf("cStreamdevDevice::OpenDvrInt(): DVR connection already open\n"); + return true; + } + + Dprintf("cStreamdevDevice::OpenDvrInt(): Connecting ...\n"); + if (ClientSocket.CreateDataConnection(siLive)) { + m_TSBuffer = new cTSBuffer(*ClientSocket.DataSocket(siLive), MEGABYTE(2), CardIndex() + 1); + return true; + } + esyslog("cStreamdevDevice::OpenDvrInt(): DVR connection FAILED"); + return false; } bool cStreamdevDevice::OpenDvr(void) { Dprintf("OpenDvr\n"); - CloseDvr(); - if (ClientSocket.CreateDataConnection(siLive)) { - //m_Assembler = new cStreamdevAssembler(ClientSocket.DataSocket(siLive)); - //m_TSBuffer = new cTSBuffer(m_Assembler->ReadPipe(), MEGABYTE(2), CardIndex() + 1); - m_TSBuffer = new cTSBuffer(*ClientSocket.DataSocket(siLive), MEGABYTE(2), CardIndex() + 1); - Dprintf("waiting\n"); - //m_Assembler->WaitForFill(); - Dprintf("resuming\n"); - return true; + LOCK_THREAD; + + m_DvrClosed = false; + return OpenDvrInt(); +} + +void cStreamdevDevice::CloseDvrInt(void) { + Dprintf("CloseDvrInt\n"); + LOCK_THREAD; + + if (ClientSocket.CheckConnection()) { + if (!m_DvrClosed) { + Dprintf("cStreamdevDevice::CloseDvrInt(): m_DvrClosed=false -> not closing yet\n"); + return; + } + if (m_Pids > 0) { + Dprintf("cStreamdevDevice::CloseDvrInt(): %d active pids -> not closing yet\n", m_Pids); + return; + } + } else { + Dprintf("cStreamdevDevice::CloseDvrInt(): Control connection gone !\n"); } - return false; + + Dprintf("cStreamdevDevice::CloseDvrInt(): Closing DVR connection\n"); + DELETENULL(m_TSBuffer); + ClientSocket.CloseDvr(); } void cStreamdevDevice::CloseDvr(void) { Dprintf("CloseDvr\n"); + LOCK_THREAD; - //DELETENULL(m_Assembler); - DELETENULL(m_TSBuffer); - ClientSocket.CloseDvr(); + m_DvrClosed = true; + CloseDvrInt(); } bool cStreamdevDevice::GetTSPacket(uchar *&Data) { if (m_TSBuffer) { Data = m_TSBuffer->Get(); +#if 1 // TODO: this should be fixed in vdr cTSBuffer + // simple disconnect detection + static int m_TSFails = 0; + if (!Data) { + LOCK_THREAD; + if(!ClientSocket.DataSocket(siLive)) { + return false; // triggers CloseDvr() + OpenDvr() in cDevice + } + cPoller Poller(*ClientSocket.DataSocket(siLive)); + errno = 0; + if (Poller.Poll() && !errno) { + char tmp[1]; + if (recv(*ClientSocket.DataSocket(siLive), tmp, 1, MSG_PEEK) == 0 && !errno) { +esyslog("cStreamDevice::GetTSPacket: GetChecked: NOTHING (%d)", m_TSFails); + m_TSFails++; + if (m_TSFails > 10) { + isyslog("cStreamdevDevice::GetTSPacket(): disconnected"); + m_Pids = 0; + CloseDvrInt(); + m_TSFails = 0; + return false; + } + return true; + } + } + m_TSFails = 0; + } +#endif return true; } return false; @@ -162,11 +254,24 @@ bool cStreamdevDevice::GetTSPacket(uchar *&Data) { #if VDRVERSNUM >= 10300 int cStreamdevDevice::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { Dprintf("OpenFilter\n"); - if (StreamdevClientSetup.StreamFilters - && ClientSocket.SetFilter(Pid, Tid, Mask, true)) { - return m_Filters->OpenFilter(Pid, Tid, Mask); - } else + + if (!StreamdevClientSetup.StreamFilters) return -1; + + + if (!ClientSocket.DataSocket(siLiveFilter)) { + if (ClientSocket.CreateDataConnection(siLiveFilter)) { + m_Filters->SetConnection(*ClientSocket.DataSocket(siLiveFilter)); + } else { + isyslog("cStreamdevDevice::OpenFilter: connect failed: %m"); + return -1; + } + } + + if (ClientSocket.SetFilter(Pid, Tid, Mask, true)) + return m_Filters->OpenFilter(Pid, Tid, Mask); + + return -1; } #endif @@ -177,11 +282,17 @@ bool cStreamdevDevice::Init(void) { } bool cStreamdevDevice::ReInit(void) { + if(m_Device) { + m_Device->Lock(); + m_Device->m_Filters->SetConnection(-1); + m_Device->m_Pids = 0; + } ClientSocket.Quit(); ClientSocket.Reset(); if (m_Device != NULL) { - DELETENULL(m_Device->m_TSBuffer); + //DELETENULL(m_Device->m_TSBuffer); DELETENULL(m_Device->m_Assembler); + m_Device->Unlock(); } return StreamdevClientSetup.StartClient ? Init() : true; } diff --git a/client/device.h b/client/device.h index bdfabb6..525c1d4 100644 --- a/client/device.h +++ b/client/device.h @@ -1,5 +1,5 @@ /* - * $Id: device.h,v 1.3 2005/02/08 15:21:19 lordjaxom Exp $ + * $Id: device.h,v 1.5 2007/04/24 10:43:40 schmirl Exp $ */ #ifndef VDR_STREAMDEV_DEVICE_H @@ -25,9 +25,14 @@ private: #if VDRVERSNUM >= 10307 cStreamdevFilters *m_Filters; #endif + int m_Pids; + bool m_DvrClosed; static cStreamdevDevice *m_Device; + bool OpenDvrInt(void); + void CloseDvrInt(void); + protected: virtual bool SetChannelDevice(const cChannel *Channel, bool LiveView); virtual bool HasLock(int TimeoutMs) @@ -51,9 +56,10 @@ public: virtual ~cStreamdevDevice(); virtual bool ProvidesSource(int Source) const; - virtual bool ProvidesTransponder(const cChannel *Channel) const; + virtual bool ProvidesTransponder(const cChannel *Channel) const; virtual bool ProvidesChannel(const cChannel *Channel, int Priority = -1, bool *NeedsDetachReceivers = NULL) const; + virtual bool IsTunedToTransponder(const cChannel *Channel); static bool Init(void); static bool ReInit(void); diff --git a/client/filter.c b/client/filter.c index daf534a..91af8b7 100644 --- a/client/filter.c +++ b/client/filter.c @@ -1,5 +1,5 @@ /* - * $Id: filter.c,v 1.3 2005/11/06 16:43:58 lordjaxom Exp $ + * $Id: filter.c,v 1.11 2007/04/24 11:23:16 schmirl Exp $ */ #include "client/filter.h" @@ -7,113 +7,288 @@ #include "tools/select.h" #include "common.h" -#include #include #if VDRVERSNUM >= 10300 +// --- cStreamdevFilter ------------------------------------------------------ + +class cStreamdevFilter: public cListObject { +private: + uchar m_Buffer[4096]; + int m_Used; + int m_Pipe[2]; + u_short m_Pid; + u_char m_Tid; + u_char m_Mask; + +public: + cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask); + virtual ~cStreamdevFilter(); + + bool Matches(u_short Pid, u_char Tid); + bool PutSection(const uchar *Data, int Length, bool Pusi); + int ReadPipe(void) const { return m_Pipe[0]; } + + bool IsClosed(void); + void Reset(void); + + u_short Pid(void) const { return m_Pid; } + u_char Tid(void) const { return m_Tid; } + u_char Mask(void) const { return m_Mask; } +}; + +inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) { + return m_Pid == Pid && m_Tid == (Tid & m_Mask); +} + cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) { m_Used = 0; m_Pid = Pid; m_Tid = Tid; m_Mask = Mask; + m_Pipe[0] = m_Pipe[1] = -1; - if (pipe(m_Pipe) != 0 || fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0) { - esyslog("streamev-client: coudln't open section filter pipe: %m"); - m_Pipe[0] = m_Pipe[1] = -1; +#ifdef SOCK_SEQPACKET + // SOCK_SEQPACKET (since kernel 2.6.4) + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, m_Pipe) != 0) { + esyslog("streamdev-client: socketpair(SOCK_SEQPACKET) failed: %m, trying SOCK_DGRAM"); + } +#endif + if (m_Pipe[0] < 0 && socketpair(AF_UNIX, SOCK_DGRAM, 0, m_Pipe) != 0) { + esyslog("streamdev-client: couldn't open section filter socket: %m"); + } + + else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 || + fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) { + esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m"); } } cStreamdevFilter::~cStreamdevFilter() { Dprintf("~cStreamdevFilter %p\n", this); - if (m_Pipe[0] >= 0) - close(m_Pipe[0]); + + // ownership of handle m_Pipe[0] has been transferred to VDR section handler + //if (m_Pipe[0] >= 0) + // close(m_Pipe[0]); if (m_Pipe[1] >= 0) close(m_Pipe[1]); } -bool cStreamdevFilter::PutSection(const uchar *Data, int Length) { +bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) { + + if (!m_Used && !Pusi) /* wait for payload unit start indicator */ + return true; + if (m_Used && Pusi) /* reset at payload unit start */ + Reset(); + if (m_Used + Length >= (int)sizeof(m_Buffer)) { esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)", Length); - m_Used = 0; + Reset(); return true; } + memcpy(m_Buffer + m_Used, Data, Length); m_Used += Length; - if (m_Used > 3) { int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3; if (m_Used == length) { - if (write(m_Pipe[1], m_Buffer, length) < 0) - return false; m_Used = 0; + if (write(m_Pipe[1], m_Buffer, length) < 0) { + if(errno == EAGAIN || errno == EWOULDBLOCK) + dsyslog("cStreamdevFilter::PutSection socket overflow, " + "Pid %4d Tid %3d", m_Pid, m_Tid); + + else + return false; + } } + + if (m_Used > length) { + dsyslog("cStreamdevFilter::PutSection: m_Used > length ! Pid %2d, Tid%2d " + "(len %3d, got %d/%d)", m_Pid, m_Tid, Length, m_Used, length); + if(Length < TS_SIZE-5) { + // TS packet not full -> this must be last TS packet of section data -> safe to reset now + Reset(); + } + } + } return true; } +void cStreamdevFilter::Reset(void) { + if(m_Used) + dsyslog("cStreamdevFilter::Reset skipping %d bytes", m_Used); + m_Used = 0; +} + +bool cStreamdevFilter::IsClosed(void) { + char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */ + + // Test if pipe/socket has been closed by writing empty section + if (write(m_Pipe[1], m_Buffer, 3) < 0 && + errno != EAGAIN && + errno != EWOULDBLOCK) { + + if (errno != ECONNREFUSED && + errno != ECONNRESET && + errno != EPIPE) + esyslog("cStreamdevFilter::IsClosed failed: %m"); + + return true; + } + + return false; +} + +// --- cStreamdevFilters ----------------------------------------------------- + cStreamdevFilters::cStreamdevFilters(void): cThread("streamdev-client: sections assembler") { - m_Active = false; - m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true); - Start(); + m_TSBuffer = NULL; } cStreamdevFilters::~cStreamdevFilters() { - if (m_Active) { - m_Active = false; - Cancel(3); - } - delete m_RingBuffer; + SetConnection(-1); } int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { + CarbageCollect(); + cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask); + int fh = f->ReadPipe(); + + Lock(); Add(f); - return f->ReadPipe(); + Unlock(); + + return fh; } -cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) { - for (cStreamdevFilter *f = First(); f; f = Next(f)) { - if (f->Matches(Pid, Tid)) - return f; +void cStreamdevFilters::CarbageCollect(void) { + LOCK_THREAD; + for (cStreamdevFilter *fi = First(); fi;) { + if (fi->IsClosed()) { + if (errno == ECONNREFUSED || + errno == ECONNRESET || + errno == EPIPE) { + ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false); + Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)", + (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); + + cStreamdevFilter *next = Prev(fi); + Del(fi); + fi = next ? Next(next) : First(); + } else { + esyslog("cStreamdevFilters::CarbageCollector() error: " + "Pid %4d, Tid %3d, Mask %2x (%d filters left) failed", + (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); + LOG_ERROR; + fi = Next(fi); + } + } else { + fi = Next(fi); + } } - return NULL; } -void cStreamdevFilters::Put(const uchar *Data) { - int p = m_RingBuffer->Put(Data, TS_SIZE); - if (p != TS_SIZE) - m_RingBuffer->ReportOverflow(TS_SIZE - p); +bool cStreamdevFilters::ReActivateFilters(void) +{ + LOCK_THREAD; + + bool res = true; + CarbageCollect(); + for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) { + res = ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res; + Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL"); + } + return res; +} + +void cStreamdevFilters::SetConnection(int Handle) { + + Cancel(2); + DELETENULL(m_TSBuffer); + + if (Handle >= 0) { + m_TSBuffer = new cTSBuffer(Handle, MEGABYTE(1), 1); + ReActivateFilters(); + Start(); + } } void cStreamdevFilters::Action(void) { - m_Active = true; - while (m_Active) { - int recvd; - const uchar *block = m_RingBuffer->Get(recvd); + int fails = 0; - if (block && recvd > 0) { - cStreamdevFilter *f; + while (Running()) { + const uchar *block = m_TSBuffer->Get(); + if (block) { u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2]; u_char tid = block[3]; - - if ((f = Matches(pid, tid)) != NULL) { - int len = block[4]; - if (!f->PutSection(block + 5, len)) { - if (errno != EPIPE) { - esyslog("streamdev-client: couldn't send section packet: %m"); + bool Pusi = block[1] & 0x40; + int len = block[4]; +#if 0 + if (block[1] == 0xff && + block[2] == 0xff && + block[3] == 0xff && + block[4] == 0x7f) + isyslog("*********** TRANSPONDER -> %s **********", block+5); +#endif + LOCK_THREAD; + cStreamdevFilter *f = First(); + while (f) { + cStreamdevFilter *next = Next(f); + if (f->Matches(pid, tid)) { + + if (f->PutSection(block + 5, len, Pusi)) + break; + + if (errno != ECONNREFUSED && + errno != ECONNRESET && + errno != EPIPE) { Dprintf("FATAL ERROR: %m\n"); + esyslog("streamdev-client: couldn't send section packet: %m"); } ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false); Del(f); + // Filter was closed. + // - need to check remaining filters for another match } + f = next; } - m_RingBuffer->Del(TS_SIZE); - } else - usleep(1); + } else { +#if 1 // TODO: this should be fixed in vdr cTSBuffer + // Check disconnection + int fd = *ClientSocket.DataSocket(siLiveFilter); + if(fd < 0) + break; + cPoller Poller(fd); + if (Poller.Poll()) { + char tmp[1]; + errno = 0; + Dprintf("cStreamdevFilters::Action(): checking connection"); + if (recv(fd, tmp, 1, MSG_PEEK) == 0 && errno != EAGAIN) { + ++fails; + if (fails >= 10) { + esyslog("cStreamdevFilters::Action(): stream disconnected ?"); + ClientSocket.CloseDataConnection(siLiveFilter); + break; + } + } else { + fails = 0; + } + } else { + fails = 0; + } + cCondWait::SleepMs(10); +#endif + } } + + DELETENULL(m_TSBuffer); + dsyslog("StreamdevFilters::Action() ended"); } #endif // VDRVERSNUM >= 10300 diff --git a/client/filter.h b/client/filter.h index cb46bf0..e0a1575 100644 --- a/client/filter.h +++ b/client/filter.h @@ -1,5 +1,5 @@ /* - * $Id: filter.h,v 1.1.1.1 2004/12/30 22:44:04 lordjaxom Exp $ + * $Id: filter.h,v 1.4 2007/04/24 11:23:16 schmirl Exp $ */ #ifndef VDR_STREAMDEV_FILTER_H @@ -12,52 +12,25 @@ #include #include -class cRingBufferFrame; -class cRingBufferLinear; - -class cStreamdevFilter: public cListObject { -private: - uchar m_Buffer[4096]; - int m_Used; - int m_Pipe[2]; - u_short m_Pid; - u_char m_Tid; - u_char m_Mask; - cRingBufferFrame *m_RingBuffer; - -public: - cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask); - virtual ~cStreamdevFilter(); - - bool Matches(u_short Pid, u_char Tid); - bool PutSection(const uchar *Data, int Length); - int ReadPipe(void) const { return m_Pipe[0]; } - - u_short Pid(void) const { return m_Pid; } - u_char Tid(void) const { return m_Tid; } - u_char Mask(void) const { return m_Mask; } - -}; - -inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) { - return m_Pid == Pid && m_Tid == (Tid & m_Mask); -} +class cTSBuffer; +class cStreamdevFilter; class cStreamdevFilters: public cList, public cThread { private: - bool m_Active; - cRingBufferLinear *m_RingBuffer; - + cTSBuffer *m_TSBuffer; + protected: virtual void Action(void); + void CarbageCollect(void); + + bool ReActivateFilters(void); public: cStreamdevFilters(void); virtual ~cStreamdevFilters(); + void SetConnection(int Handle); int OpenFilter(u_short Pid, u_char Tid, u_char Mask); - cStreamdevFilter *Matches(u_short Pid, u_char Tid); - void Put(const uchar *Data); }; # endif // VDRVERSNUM >= 10300 diff --git a/client/socket.c b/client/socket.c index e59c705..5db6efe 100644 --- a/client/socket.c +++ b/client/socket.c @@ -1,5 +1,5 @@ /* - * $Id: socket.c,v 1.7 2007/01/15 11:45:48 schmirl Exp $ + * $Id: socket.c,v 1.8 2007/04/24 10:57:34 schmirl Exp $ */ #include @@ -215,6 +215,24 @@ bool cClientSocket::CreateDataConnection(eSocketId Id) { return true; } +bool cClientSocket::CloseDataConnection(eSocketId Id) { + //if (!CheckConnection()) return false; + + CMD_LOCK; + + if(Id == siLive || Id == siLiveFilter) + if (m_DataSockets[Id] != NULL) { + std::string command = (std::string)"ABRT " + (const char*)itoa(Id); + if (!Command(command, 220)) { + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't cleanly close data connection"); + //return false; + } + DELETENULL(m_DataSockets[Id]); + } + return true; +} + bool cClientSocket::SetChannelDevice(const cChannel *Channel) { if (!CheckConnection()) return false; diff --git a/client/socket.h b/client/socket.h index 17478ce..e839223 100644 --- a/client/socket.h +++ b/client/socket.h @@ -1,5 +1,5 @@ /* - * $Id: socket.h,v 1.3 2005/02/08 17:22:35 lordjaxom Exp $ + * $Id: socket.h,v 1.4 2007/04/24 10:57:34 schmirl Exp $ */ #ifndef VDR_STREAMDEV_CLIENT_CONNECTION_H @@ -47,6 +47,7 @@ public: bool CheckConnection(void); bool ProvidesChannel(const cChannel *Channel, int Priority); bool CreateDataConnection(eSocketId Id); + bool CloseDataConnection(eSocketId Id); bool SetChannelDevice(const cChannel *Channel); bool SetPid(int Pid, bool On); #if VDRVERSNUM >= 10300 diff --git a/common.c b/common.c index d2a5fed..34495c1 100644 --- a/common.c +++ b/common.c @@ -11,7 +11,7 @@ using namespace std; -const char *VERSION = "0.3.3-20070403"; +const char *VERSION = "0.3.3-20070509"; const char *StreamTypes[st_Count] = { "TS", diff --git a/common.h b/common.h index e5f143d..1d14883 100644 --- a/common.h +++ b/common.h @@ -1,5 +1,5 @@ /* - * $Id: common.h,v 1.7 2005/11/06 16:43:58 lordjaxom Exp $ + * $Id: common.h,v 1.8 2007/04/24 10:50:13 schmirl Exp $ */ #ifndef VDR_STREAMDEV_COMMON_H @@ -83,6 +83,7 @@ enum eSuspendMode { enum eSocketId { siLive, siReplay, + siLiveFilter, si_Count }; diff --git a/server/connection.c b/server/connection.c index dff1945..629ed1d 100644 --- a/server/connection.c +++ b/server/connection.c @@ -1,5 +1,5 @@ /* - * $Id: connection.c,v 1.8 2007/01/15 12:00:19 schmirl Exp $ + * $Id: connection.c,v 1.10 2007/05/07 12:25:11 schmirl Exp $ */ #include "server/connection.h" @@ -101,9 +101,16 @@ bool cServerConnection::Respond(const char *Message, bool Last, ...) length = vasprintf(&buffer, Message, ap); va_end(ap); + if (length < 0) { + esyslog("ERROR: streamdev: buffer allocation failed (%s) for %s:%d", + m_Protocol, RemoteIp().c_str(), RemotePort()); + return false; + } + if (m_WriteBytes + length + 2 > sizeof(m_WriteBuffer)) { esyslog("ERROR: streamdev: output buffer overflow (%s) for %s:%d", m_Protocol, RemoteIp().c_str(), RemotePort()); + free(buffer); return false; } Dprintf("OUT: |%s|\n", buffer); diff --git a/server/connection.h b/server/connection.h index 2df850e..fe828d9 100644 --- a/server/connection.h +++ b/server/connection.h @@ -1,5 +1,5 @@ /* - * $Id: connection.h,v 1.4 2007/04/02 10:32:34 schmirl Exp $ + * $Id: connection.h,v 1.5 2007/04/16 11:01:02 schmirl Exp $ */ #ifndef VDR_STREAMDEV_SERVER_CONNECTION_H @@ -38,8 +38,8 @@ protected: Only one line at a time may be sent. If there are lines to follow, set Last to false. Command(NULL) will be called in the next cycle, so you can post the next line. */ - virtual bool Respond(const char *Message, bool Last = true, ...) - __attribute__ ((format (printf, 2, 4))); + virtual bool Respond(const char *Message, bool Last = true, ...); + //__attribute__ ((format (printf, 2, 4))); public: /* If you derive, specify a short string such as HTTP for Protocol, which diff --git a/server/connectionHTTP.c b/server/connectionHTTP.c index a526da9..8bde3aa 100644 --- a/server/connectionHTTP.c +++ b/server/connectionHTTP.c @@ -1,5 +1,5 @@ /* - * $Id: connectionHTTP.c,v 1.10 2006/01/26 19:40:18 lordjaxom Exp $ + * $Id: connectionHTTP.c,v 1.12 2007/05/09 09:12:42 schmirl Exp $ */ #include @@ -41,6 +41,8 @@ bool cConnectionHTTP::Command(char *Cmd) } Dprintf("header\n"); return true; + default: + break; } return false; // ??? shouldn't happen } @@ -69,6 +71,8 @@ bool cConnectionHTTP::ProcessRequest(void) device->SwitchChannel(m_Channel, false); if (m_LiveStreamer->SetChannel(m_Channel, m_StreamType, m_Apid)) { m_LiveStreamer->SetDevice(device); + if (!SetDSCP()) + LOG_ERROR_STR("unable to set DSCP sockopt"); if (m_StreamType == stES && (m_Apid != 0 || ISRADIO(m_Channel))) { return Respond("HTTP/1.0 200 OK") && Respond("Content-Type: audio/mpeg") @@ -153,7 +157,7 @@ bool cConnectionHTTP::CmdGET(const std::string &Opts) { const char *sp = Opts.c_str(), *ptr = sp, *ep; const cChannel *chan; - int apid = 0, pos; + int apid = 0; ptr = skipspace(ptr); while (*ptr == '/') diff --git a/server/connectionVTP.c b/server/connectionVTP.c index 18ea353..c847bf3 100644 --- a/server/connectionVTP.c +++ b/server/connectionVTP.c @@ -1,5 +1,5 @@ /* - * $Id: connectionVTP.c,v 1.8 2007/03/02 15:27:07 schmirl Exp $ + * $Id: connectionVTP.c,v 1.14 2007/05/09 09:12:42 schmirl Exp $ */ #include "server/connectionVTP.h" @@ -153,8 +153,6 @@ cLSTEHandler::~cLSTEHandler() bool cLSTEHandler::Next(bool &Last) { - char *buffer; - if (m_Error != NULL) { Last = true; cString str(m_Error, true); @@ -468,6 +466,8 @@ cConnectionVTP::cConnectionVTP(void): cServerConnection("VTP"), m_LiveSocket(NULL), m_LiveStreamer(NULL), + m_FilterSocket(NULL), + m_FilterStreamer(NULL), m_LastCommand(NULL), m_NoTSPIDS(false), m_LSTEHandler(NULL), @@ -482,11 +482,18 @@ cConnectionVTP::~cConnectionVTP() free(m_LastCommand); delete m_LiveStreamer; delete m_LiveSocket; + delete m_FilterStreamer; + delete m_FilterSocket; delete m_LSTTHandler; delete m_LSTCHandler; delete m_LSTEHandler; } +inline bool cConnectionVTP::Abort(void) const +{ + return m_LiveStreamer && m_LiveStreamer->Abort(); +} + void cConnectionVTP::Welcome(void) { Respond(220, "Welcome to Video Disk Recorder (VTP)"); @@ -500,12 +507,14 @@ void cConnectionVTP::Reject(void) void cConnectionVTP::Detach(void) { - if (m_LiveStreamer != NULL) m_LiveStreamer->Detach(); + if (m_LiveStreamer) m_LiveStreamer->Detach(); + if (m_FilterStreamer) m_FilterStreamer->Detach(); } void cConnectionVTP::Attach(void) { - if (m_LiveStreamer != NULL) m_LiveStreamer->Attach(); + if (m_LiveStreamer) m_LiveStreamer->Attach(); + if (m_FilterStreamer) m_FilterStreamer->Attach(); } bool cConnectionVTP::Command(char *Cmd) @@ -549,8 +558,8 @@ bool cConnectionVTP::Command(char *Cmd) else if (strcasecmp(Cmd, "ADDF") == 0) return CmdADDF(param); else if (strcasecmp(Cmd, "DELF") == 0) return CmdDELF(param); else if (strcasecmp(Cmd, "ABRT") == 0) return CmdABRT(param); - else if (strcasecmp(Cmd, "QUIT") == 0) return CmdQUIT(param); - else if (strcasecmp(Cmd, "SUSP") == 0) return CmdSUSP(param); + else if (strcasecmp(Cmd, "QUIT") == 0) return CmdQUIT(); + else if (strcasecmp(Cmd, "SUSP") == 0) return CmdSUSP(); // Commands adopted from SVDRP //else if (strcasecmp(Cmd, "DELR") == 0) return CmdDELR(param); else if (strcasecmp(Cmd, "MODT") == 0) return CmdMODT(param); @@ -562,8 +571,6 @@ bool cConnectionVTP::Command(char *Cmd) bool cConnectionVTP::CmdCAPS(char *Opts) { - char *buffer; - if (strcasecmp(Opts, "TS") == 0) { m_NoTSPIDS = true; return Respond(220, "Ignored, capability \"%s\" accepted for " @@ -575,6 +582,14 @@ bool cConnectionVTP::CmdCAPS(char *Opts) return Respond(220, "Capability \"%s\" accepted", Opts); } +#if VDRVERSNUM >= 10300 + // + // Deliver section filters data in separate, channel-independent data stream + // + if (strcasecmp(Opts, "FILTERS") == 0) + return Respond(220, "Capability \"%s\" accepted", Opts); +#endif + return Respond(561, "Capability \"%s\" not known", Opts); } @@ -607,9 +622,14 @@ bool cConnectionVTP::CmdPORT(char *Opts) id = strtoul(Opts, &ep, 10); if (ep == Opts || !isspace(*ep)) return Respond(500, "Use: PORT Id Destination"); - - if (id != 0) + +#if VDRVERSNUM >= 10300 + if (id != siLive && id != siLiveFilter) return Respond(501, "Wrong connection id %d", id); +#else + if (id != siLive) + return Respond(501, "Wrong connection id %d", id); +#endif Opts = skipspace(ep); n = 0; @@ -636,6 +656,33 @@ bool cConnectionVTP::CmdPORT(char *Opts) isyslog("Streamdev: Setting data connection to %s:%d", dataip, dataport); +#if VDRVERSNUM >= 10300 + if (id == siLiveFilter) { + if(m_FilterStreamer) + m_FilterStreamer->Stop(); + delete m_FilterSocket; + + m_FilterSocket = new cTBSocket(SOCK_STREAM); + if (!m_FilterSocket->Connect(dataip, dataport)) { + esyslog("ERROR: Streamdev: Couldn't open data connection to %s:%d: %s", + dataip, dataport, strerror(errno)); + DELETENULL(m_FilterSocket); + return Respond(551, "Couldn't open data connection"); + } + + if(!m_FilterStreamer) + m_FilterStreamer = new cStreamdevFilterStreamer; + m_FilterStreamer->Start(m_FilterSocket); + m_FilterStreamer->Activate(true); + + return Respond(220, "Port command ok, data connection opened"); + } +#endif + + if(m_LiveSocket && m_LiveStreamer) + m_LiveStreamer->Stop(); + delete m_LiveSocket; + m_LiveSocket = new cTBSocket(SOCK_STREAM); if (!m_LiveSocket->Connect(dataip, dataport)) { esyslog("ERROR: Streamdev: Couldn't open data connection to %s:%d: %s", @@ -644,10 +691,12 @@ bool cConnectionVTP::CmdPORT(char *Opts) return Respond(551, "Couldn't open data connection"); } - if (id == siLive) + if (!m_LiveSocket->SetDSCP()) + LOG_ERROR_STR("unable to set DSCP sockopt"); + if (m_LiveStreamer) m_LiveStreamer->Start(m_LiveSocket); - return Respond(220, "Port command ok, data connection opened"); + return Respond(220, "Port command ok, data connection opened"); } bool cConnectionVTP::CmdTUNE(char *Opts) @@ -668,7 +717,16 @@ bool cConnectionVTP::CmdTUNE(char *Opts) m_LiveStreamer = new cStreamdevLiveStreamer(1); m_LiveStreamer->SetChannel(chan, m_NoTSPIDS ? stTS : stTSPIDS); m_LiveStreamer->SetDevice(dev); + if(m_LiveSocket) + m_LiveStreamer->Start(m_LiveSocket); +#if VDRVERSNUM >= 10300 + if(!m_FilterStreamer) + m_FilterStreamer = new cStreamdevFilterStreamer; + m_FilterStreamer->SetDevice(dev); + //m_FilterStreamer->SetChannel(chan); +#endif + return Respond(220, "Channel tuned"); } @@ -706,8 +764,8 @@ bool cConnectionVTP::CmdADDF(char *Opts) int pid, tid, mask; char *ep; - if (m_LiveStreamer == NULL) - return Respond(560, "Can't set filters without a stream"); + if (m_FilterStreamer == NULL) + return Respond(560, "Can't set filters without a filter stream"); pid = strtol(Opts, &ep, 10); if (ep == Opts || (*ep != ' ')) @@ -721,7 +779,7 @@ bool cConnectionVTP::CmdADDF(char *Opts) if (ep == Opts || (*ep != '\0' && *ep != ' ')) return Respond(500, "Use: ADDF Pid Tid Mask"); - return m_LiveStreamer->SetFilter(pid, tid, mask, true) + return m_FilterStreamer->SetFilter(pid, tid, mask, true) ? Respond(220, "Filter %d transferring", pid) : Respond(560, "Filter %d not available", pid); #else @@ -735,7 +793,7 @@ bool cConnectionVTP::CmdDELF(char *Opts) int pid, tid, mask; char *ep; - if (m_LiveStreamer == NULL) + if (m_FilterStreamer == NULL) return Respond(560, "Can't delete filters without a stream"); pid = strtol(Opts, &ep, 10); @@ -750,9 +808,8 @@ bool cConnectionVTP::CmdDELF(char *Opts) if (ep == Opts || (*ep != '\0' && *ep != ' ')) return Respond(500, "Use: DELF Pid Tid Mask"); - return m_LiveStreamer->SetFilter(pid, tid, mask, false) - ? Respond(220, "Filter %d stopped", pid) - : Respond(560, "Filter %d not transferring", pid); + m_FilterStreamer->SetFilter(pid, tid, mask, false); + return Respond(220, "Filter %d stopped", pid); #else return Respond(500, "DELF known but unimplemented with VDR < 1.3.0"); #endif @@ -768,20 +825,32 @@ bool cConnectionVTP::CmdABRT(char *Opts) return Respond(500, "Use: ABRT Id"); switch (id) { - case 0: DELETENULL(m_LiveStreamer); break; + case siLive: + DELETENULL(m_LiveStreamer); + DELETENULL(m_LiveSocket); + break; +#if VDRVERSNUM >= 10300 + case siLiveFilter: + DELETENULL(m_FilterStreamer); + DELETENULL(m_FilterSocket); + break; +#endif + default: + return Respond(501, "Wrong connection id %d", id); + break; + } - DELETENULL(m_LiveSocket); return Respond(220, "Data connection closed"); } -bool cConnectionVTP::CmdQUIT(char *Opts) +bool cConnectionVTP::CmdQUIT(void) { DeferClose(); return Respond(221, "Video Disk Recorder closing connection"); } -bool cConnectionVTP::CmdSUSP(char *Opts) +bool cConnectionVTP::CmdSUSP(void) { if (StreamdevServerSetup.SuspendMode == smAlways || cSuspendCtl::IsActive()) return Respond(220, "Server is suspended"); diff --git a/server/connectionVTP.h b/server/connectionVTP.h index c6ab223..fffff4f 100644 --- a/server/connectionVTP.h +++ b/server/connectionVTP.h @@ -2,9 +2,10 @@ #define VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H #include "server/connection.h" -#include "server/livestreamer.h" class cTBSocket; +class cStreamdevLiveStreamer; +class cStreamdevFilterStreamer; class cLSTEHandler; class cLSTCHandler; class cLSTTHandler; @@ -16,8 +17,10 @@ class cConnectionVTP: public cServerConnection { using cServerConnection::Respond; private: - cTBSocket *m_LiveSocket; - cStreamdevLiveStreamer *m_LiveStreamer; + cTBSocket *m_LiveSocket; + cStreamdevLiveStreamer *m_LiveStreamer; + cTBSocket *m_FilterSocket; + cStreamdevFilterStreamer *m_FilterStreamer; char *m_LastCommand; bool m_NoTSPIDS; @@ -53,8 +56,8 @@ public: bool CmdADDF(char *Opts); bool CmdDELF(char *Opts); bool CmdABRT(char *Opts); - bool CmdQUIT(char *Opts); - bool CmdSUSP(char *Opts); + bool CmdQUIT(void); + bool CmdSUSP(void); // Thread-safe implementations of SVDRP commands bool CmdLSTE(char *Opts); @@ -73,9 +76,4 @@ public: __attribute__ ((format (printf, 3, 4))); }; -inline bool cConnectionVTP::Abort(void) const -{ - return m_LiveStreamer && m_LiveStreamer->Abort(); -} - #endif // VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H diff --git a/server/livefilter.c b/server/livefilter.c index 4524a88..e7d896c 100644 --- a/server/livefilter.c +++ b/server/livefilter.c @@ -1,20 +1,24 @@ /* - * $Id: livefilter.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ + * $Id: livefilter.c,v 1.4 2007/04/24 11:06:12 schmirl Exp $ */ #include "server/livefilter.h" -#include "server/livestreamer.h" +#include "server/streamer.h" #include "common.h" +#ifndef TS_SIZE +# define TS_SIZE 188 +#endif +#ifndef TS_SYNC_BYTE +# define TS_SYNC_BYTE 0x47 +#endif + #if VDRVERSNUM >= 10300 -cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevLiveStreamer *Streamer) { +cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevStreamer *Streamer) { m_Streamer = Streamer; } -cStreamdevLiveFilter::~cStreamdevLiveFilter() { -} - void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data, int Length) { uchar buffer[TS_SIZE]; @@ -24,7 +28,7 @@ void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data, while (length > 0) { int chunk = min(length, TS_SIZE - 5); buffer[0] = TS_SYNC_BYTE; - buffer[1] = (Pid >> 8) & 0xff; + buffer[1] = ((Pid >> 8) & 0x3f) | (pos==0 ? 0x40 : 0); /* bit 6: payload unit start indicator (PUSI) */ buffer[2] = Pid & 0xff; buffer[3] = Tid; buffer[4] = (uchar)chunk; diff --git a/server/livefilter.h b/server/livefilter.h index a30cba0..99c69d4 100644 --- a/server/livefilter.h +++ b/server/livefilter.h @@ -1,5 +1,5 @@ /* - * $Id: livefilter.h,v 1.2 2005/11/07 19:28:41 lordjaxom Exp $ + * $Id: livefilter.h,v 1.4 2007/04/24 11:29:29 schmirl Exp $ */ #ifndef VDR_STREAMEV_LIVEFILTER_H @@ -11,20 +11,24 @@ #include -class cStreamdevLiveStreamer; +class cStreamdevStreamer; class cStreamdevLiveFilter: public cFilter { - friend class cStreamdevLiveStreamer; - private: - cStreamdevLiveStreamer *m_Streamer; + cStreamdevStreamer *m_Streamer; protected: virtual void Process(u_short Pid, u_char Tid, const u_char *Data, int Length); public: - cStreamdevLiveFilter(cStreamdevLiveStreamer *Streamer); - virtual ~cStreamdevLiveFilter(); + cStreamdevLiveFilter(cStreamdevStreamer *Streamer); + + void Set(u_short Pid, u_char Tid, u_char Mask) { + cFilter::Set(Pid, Tid, Mask); + } + void Del(u_short Pid, u_char Tid, u_char Mask) { + cFilter::Del(Pid, Tid, Mask); + } }; # endif // VDRVERSNUM >= 10300 diff --git a/server/livestreamer.c b/server/livestreamer.c index 6148720..1bbeddb 100644 --- a/server/livestreamer.c +++ b/server/livestreamer.c @@ -1,6 +1,12 @@ +#include + +#include +#include + #include #include "server/livestreamer.h" +#include "server/livefilter.h" #include "remux/ts2ps.h" #include "remux/ts2es.h" #include "remux/extern.h" @@ -8,12 +14,31 @@ // --- cStreamdevLiveReceiver ------------------------------------------------- +class cStreamdevLiveReceiver: public cReceiver { + friend class cStreamdevStreamer; + +private: + cStreamdevStreamer *m_Streamer; + +protected: + virtual void Activate(bool On); + virtual void Receive(uchar *Data, int Length); + +public: #if VDRVERSNUM < 10500 -cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca, + cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, int Ca, int Priority, const int *Pids); +#else + cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, tChannelID ChannelID, int Priority, const int *Pids); +#endif + virtual ~cStreamdevLiveReceiver(); +}; + +#if VDRVERSNUM < 10500 +cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, int Ca, int Priority, const int *Pids): cReceiver(Ca, Priority, 0, Pids), #else -cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, tChannelID ChannelID, +cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, tChannelID ChannelID, int Priority, const int *Pids): cReceiver(ChannelID, Priority, 0, Pids), #endif @@ -33,6 +58,228 @@ void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) { m_Streamer->ReportOverflow(Length - p); } +inline void cStreamdevLiveReceiver::Activate(bool On) +{ + Dprintf("LiveReceiver->Activate(%d)\n", On); + m_Streamer->Activate(On); +} + +// --- cStreamdevPatFilter ---------------------------------------------------- + +class cStreamdevPatFilter : public cFilter { +private: + int pmtPid; + int pmtSid; + int pmtVersion; + + const cChannel *m_Channel; + cStreamdevLiveStreamer *m_Streamer; + + virtual void Process(u_short Pid, u_char Tid, const u_char *Data, int Length); + + int GetPid(SI::PMT::Stream& stream); +public: + cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel); +}; + +cStreamdevPatFilter::cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel) +{ + Dprintf("cStreamdevPatFilter(\"%s\")", Channel->Name()); + assert(Streamer); + m_Channel = Channel; + m_Streamer = Streamer; + pmtPid = 0; + pmtSid = 0; + pmtVersion = -1; + Set(0x00, 0x00); // PAT +} + +static const char * const psStreamTypes[] = { + "UNKNOWN", + "ISO/IEC 11172 Video", + "ISO/IEC 13818-2 Video", + "ISO/IEC 11172 Audio", + "ISO/IEC 13818-3 Audio", + "ISO/IEC 13818-1 Privete sections", + "ISO/IEC 13818-1 Private PES data", + "ISO/IEC 13512 MHEG", + "ISO/IEC 13818-1 Annex A DSM CC", + "0x09", + "ISO/IEC 13818-6 Multiprotocol encapsulation", + "ISO/IEC 13818-6 DSM-CC U-N Messages", + "ISO/IEC 13818-6 Stream Descriptors", + "ISO/IEC 13818-6 Sections (any type, including private data)", + "ISO/IEC 13818-1 auxiliary", + "ISO/IEC 13818-7 Audio with ADTS transport sytax", + "ISO/IEC 14496-2 Visual (MPEG-4)", + "ISO/IEC 14496-3 Audio with LATM transport syntax", + "0x12", "0x13", "0x14", "0x15", "0x16", "0x17", "0x18", "0x19", "0x1a", + "ISO/IEC 14496-10 Video (MPEG-4 part 10/AVC, aka H.264)", + "", +}; + +int cStreamdevPatFilter::GetPid(SI::PMT::Stream& stream) +{ + SI::Descriptor *d; + + if (!stream.getPid()) + return 0; + + switch (stream.getStreamType()) { + case 0x01: // ISO/IEC 11172 Video + case 0x02: // ISO/IEC 13818-2 Video + case 0x03: // ISO/IEC 11172 Audio + case 0x04: // ISO/IEC 13818-3 Audio +#if 0 + case 0x07: // ISO/IEC 13512 MHEG + case 0x08: // ISO/IEC 13818-1 Annex A DSM CC + case 0x0a: // ISO/IEC 13818-6 Multiprotocol encapsulation + case 0x0b: // ISO/IEC 13818-6 DSM-CC U-N Messages + case 0x0c: // ISO/IEC 13818-6 Stream Descriptors + case 0x0d: // ISO/IEC 13818-6 Sections (any type, including private data) + case 0x0e: // ISO/IEC 13818-1 auxiliary +#endif + case 0x0f: // ISO/IEC 13818-7 Audio with ADTS transport syntax + case 0x10: // ISO/IEC 14496-2 Visual (MPEG-4) + case 0x11: // ISO/IEC 14496-3 Audio with LATM transport syntax + case 0x1b: // ISO/IEC 14496-10 Video (MPEG-4 part 10/AVC, aka H.264) + Dprintf("cStreamdevPatFilter PMT scanner adding PID %d (%s)", + stream.getPid(), psStreamTypes[stream.getStreamType()]); + return stream.getPid(); + case 0x05: // ISO/IEC 13818-1 private sections + case 0x06: // ISO/IEC 13818-1 PES packets containing private data + for (SI::Loop::Iterator it; (d = stream.streamDescriptors.getNext(it)); ) { + switch (d->getDescriptorTag()) { + case SI::AC3DescriptorTag: + Dprintf("cStreamdevPatFilter PMT scanner: adding PID %d (%s) %s", + stream.getPid(), psStreamTypes[stream.getStreamType()], "AC3"); + return stream.getPid(); + case SI::TeletextDescriptorTag: + Dprintf("cStreamdevPatFilter PMT scanner: adding PID %d (%s) %s", + stream.getPid(), psStreamTypes[stream.getStreamType()], "Teletext"); + return stream.getPid(); + case SI::SubtitlingDescriptorTag: + Dprintf("cStreamdevPatFilter PMT scanner: adding PID %d (%s) %s", + stream.getPid(), psStreamTypes[stream.getStreamType()], "DVBSUB"); + return stream.getPid(); + default: + Dprintf("cStreamdevPatFilter PMT scanner: NOT adding PID %d (%s) %s", + stream.getPid(), psStreamTypes[stream.getStreamType()], "UNKNOWN"); + break; + } + delete d; + } + break; + default: + /* This following section handles all the cases where the audio track + * info is stored in PMT user info with stream id >= 0x80 + * we check the registration format identifier to see if it + * holds "AC-3" + */ + if (stream.getStreamType() >= 0x80) { + bool found = false; + for (SI::Loop::Iterator it; (d = stream.streamDescriptors.getNext(it)); ) { + switch (d->getDescriptorTag()) { + case SI::RegistrationDescriptorTag: + /* unfortunately libsi does not implement RegistrationDescriptor */ + if (d->getLength() >= 4) { + found = true; + SI::CharArray rawdata = d->getData(); + if (/*rawdata[0] == 5 && rawdata[1] >= 4 && */ + rawdata[2] == 'A' && rawdata[3] == 'C' && + rawdata[4] == '-' && rawdata[5] == '3') { + isyslog("cStreamdevPatFilter PMT scanner:" + "Adding pid %d (type 0x%x) RegDesc len %d (%c%c%c%c)", + stream.getPid(), stream.getStreamType(), + d->getLength(), rawdata[2], rawdata[3], + rawdata[4], rawdata[5]); + return stream.getPid(); + } + } + break; + default: + break; + } + delete d; + } + if(!found) { + isyslog("Adding pid %d (type 0x%x) RegDesc not found -> assume AC-3", + stream.getPid(), stream.getStreamType()); + return stream.getPid(); + } + } + Dprintf("cStreamdevPatFilter PMT scanner: NOT adding PID %d (%s) %s", + stream.getPid(), psStreamTypes[stream.getStreamType()<0x1c?stream.getStreamType():0], "UNKNOWN"); + break; + } + return 0; +} + +void cStreamdevPatFilter::Process(u_short Pid, u_char Tid, const u_char *Data, int Length) +{ + if (Pid == 0x00) { + if (Tid == 0x00 && !pmtPid) { + SI::PAT pat(Data, false); + if (!pat.CheckCRCAndParse()) + return; + SI::PAT::Association assoc; + for (SI::Loop::Iterator it; pat.associationLoop.getNext(assoc, it); ) { + if (!assoc.isNITPid()) { + const cChannel *Channel = Channels.GetByServiceID(Source(), Transponder(), assoc.getServiceId()); + if (Channel && (Channel == m_Channel)) { + if (0 != (pmtPid = assoc.getPid())) { + Dprintf("cStreamdevPatFilter: PMT pid for channel %s: %d", Channel->Name(), pmtPid); + pmtSid = assoc.getServiceId(); + if (Length < TS_SIZE-5) { + // repack PAT to TS frame and send to client + uint8_t pat_ts[TS_SIZE] = {TS_SYNC_BYTE, 0x40 /* pusi=1 */, 0 /* pid=0 */, 0x10 /* adaption=1 */, 0 /* pointer */}; + memcpy(pat_ts + 5, Data, Length); + m_Streamer->Put(pat_ts, TS_SIZE); + } else + isyslog("cStreamdevPatFilter: PAT size %d too large to fit in one TS", Length); + m_Streamer->SetPids(pmtPid); + Add(pmtPid, 0x02); + pmtVersion = -1; + return; + } + } + } + } + } + } else if (Pid == pmtPid && Tid == SI::TableIdPMT && Source() && Transponder()) { + SI::PMT pmt(Data, false); + if (!pmt.CheckCRCAndParse()) + return; + if (pmt.getServiceId() != pmtSid) + return; // skip broken PMT records + if (pmtVersion != -1) { + if (pmtVersion != pmt.getVersionNumber()) { + Dprintf("cStreamdevPatFilter: PMT version changed, detaching all pids"); + Del(pmtPid, 0x02); + pmtPid = 0; // this triggers PAT scan + } + return; + } + pmtVersion = pmt.getVersionNumber(); + + SI::PMT::Stream stream; + int pids[MAXRECEIVEPIDS + 1], npids = 0; + pids[npids++] = pmtPid; +#if 0 + pids[npids++] = 0x10; // pid 0x10, tid 0x40: NIT + pids[npids++] = 0x11; // pid 0x11, tid 0x42: SDT + pids[npids++] = 0x12; // pid 0x12, tid 0x4E...0x6F: EIT + pids[npids++] = 0x14; // pid 0x14, tid 0x70: TDT +#endif + for (SI::Loop::Iterator it; pmt.streamLoop.getNext(stream, it); ) + if (0 != (pids[npids] = GetPid(stream)) && npids < MAXRECEIVEPIDS) + npids++; + + pids[npids] = 0; + m_Streamer->SetPids(pmt.getPCRPid(), pids); + } +} + // --- cStreamdevLiveStreamer ------------------------------------------------- cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority): @@ -43,6 +290,7 @@ cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority): m_Channel(NULL), m_Device(NULL), m_Receiver(NULL), + m_PatFilter(NULL), m_PESRemux(NULL), m_ESRemux(NULL), m_PSRemux(NULL), @@ -54,14 +302,24 @@ cStreamdevLiveStreamer::~cStreamdevLiveStreamer() { Dprintf("Desctructing Live streamer\n"); Stop(); - delete m_Receiver; + if(m_PatFilter) { + Detach(); + DELETENULL(m_PatFilter); + } + DELETENULL(m_Receiver); delete m_PESRemux; delete m_ESRemux; delete m_PSRemux; delete m_ExtRemux; -#if VDRVERSNUM >= 10300 - //delete m_Filter; TODO -#endif +} + +bool cStreamdevLiveStreamer::HasPid(int Pid) +{ + int idx; + for (idx = 0; idx < m_NumPids; ++idx) + if (m_Pids[idx] == Pid) + return true; + return false; } bool cStreamdevLiveStreamer::SetPid(int Pid, bool On) @@ -93,6 +351,44 @@ bool cStreamdevLiveStreamer::SetPid(int Pid, bool On) } } + StartReceiver(); + return true; +} + +bool cStreamdevLiveStreamer::SetPids(int Pid, const int *Pids1, const int *Pids2, const int *Pids3) +{ + m_NumPids = 0; + + if (Pid) + m_Pids[m_NumPids++] = Pid; + + if (Pids1) + for ( ; *Pids1 && m_NumPids < MAXRECEIVEPIDS; Pids1++) + if (!HasPid(*Pids1)) + m_Pids[m_NumPids++] = *Pids1; + + if (Pids2) + for ( ; *Pids2 && m_NumPids < MAXRECEIVEPIDS; Pids2++) + if (!HasPid(*Pids2)) + m_Pids[m_NumPids++] = *Pids2; + + if (Pids3) + for ( ; *Pids3 && m_NumPids < MAXRECEIVEPIDS; Pids3++) + if (!HasPid(*Pids3)) + m_Pids[m_NumPids++] = *Pids3; + + if (m_NumPids >= MAXRECEIVEPIDS) { + esyslog("ERROR: Streamdev: No free slot to receive pid %d\n", Pid); + return false; + } + + m_Pids[m_NumPids] = 0; + StartReceiver(); + return true; +} + +void cStreamdevLiveStreamer::StartReceiver(void) +{ DELETENULL(m_Receiver); if (m_NumPids > 0) { Dprintf("Creating Receiver to respect changed pids\n"); @@ -106,15 +402,19 @@ bool cStreamdevLiveStreamer::SetPid(int Pid, bool On) Attach(); } } - return true; } bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType StreamType, int Apid) { Dprintf("Initializing Remuxer for full channel transfer\n"); - printf("ca pid: %d\n", Channel->Ca()); + //printf("ca pid: %d\n", Channel->Ca()); m_Channel = Channel; m_StreamType = StreamType; + + int apid[2] = { Apid, 0 }; + const int *Apids = Apid ? apid : m_Channel->Apids(); + const int *Dpids = Apid ? NULL : m_Channel->Dpids(); + switch (m_StreamType) { case stES: { @@ -122,51 +422,33 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType Str if (Apid != 0) pid = Apid; m_ESRemux = new cTS2ESRemux(pid); - return SetPid(pid, true); + return SetPids(pid); } case stPES: - Dprintf("PES\n"); m_PESRemux = new cRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(), m_Channel->Spids(), false); - if (Apid != 0) - return SetPid(m_Channel->Vpid(), true) - && SetPid(Apid, true); - else - return SetPid(m_Channel->Vpid(), true) - && SetPid(m_Channel->Apid(0), true) - && SetPid(m_Channel->Dpid(0), true); + return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids()); case stPS: m_PSRemux = new cTS2PSRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(), m_Channel->Spids()); - if (Apid != 0) - return SetPid(m_Channel->Vpid(), true) - && SetPid(Apid, true); - else - return SetPid(m_Channel->Vpid(), true) - && SetPid(m_Channel->Apid(0), true) - && SetPid(m_Channel->Dpid(0), true); + return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids()); case stTS: - if (Apid != 0) - return SetPid(m_Channel->Vpid(), true) - && SetPid(Apid, true); - else - return SetPid(m_Channel->Vpid(), true) - && SetPid(m_Channel->Apid(0), true) - && SetPid(m_Channel->Dpid(0), true); + // This should never happen, but ... + if (m_PatFilter) { + Detach(); + DELETENULL(m_PatFilter); + } + // Set pids from PMT + m_PatFilter = new cStreamdevPatFilter(this, m_Channel); + return true; case stExtern: m_ExtRemux = new cExternRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(), m_Channel->Spids()); - if (Apid != 0) - return SetPid(m_Channel->Vpid(), true) - && SetPid(Apid, true); - else - return SetPid(m_Channel->Vpid(), true) - && SetPid(m_Channel->Apid(0), true) - && SetPid(m_Channel->Dpid(0), true); + return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids()); case stTSPIDS: Dprintf("pid streaming mode\n"); @@ -175,25 +457,6 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType Str return false; } -bool cStreamdevLiveStreamer::SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On) -{ -#if 0 - Dprintf("setting filter\n"); - if (On) { - if (m_Filter == NULL) { - m_Filter = new cStreamdevLiveFilter(this); - Dprintf("attaching filter to device\n"); - m_Device->AttachFilter(m_Filter); - } - m_Filter->Set(Pid, Tid, Mask); - } else if (m_Filter != NULL) - m_Filter->Del(Pid, Tid, Mask); - return true; -#else - return false; -#endif -} - int cStreamdevLiveStreamer::Put(const uchar *Data, int Count) { switch (m_StreamType) { @@ -270,14 +533,28 @@ void cStreamdevLiveStreamer::Del(int Count) void cStreamdevLiveStreamer::Attach(void) { - printf("RIGHT ATTACH\n"); - m_Device->AttachReceiver(m_Receiver); + Dprintf("cStreamdevLiveStreamer::Attach()\n"); + if (m_Device) { + if (m_Receiver) { + m_Device->Detach(m_Receiver); + m_Device->AttachReceiver(m_Receiver); + } + if (m_PatFilter) { + m_Device->Detach(m_PatFilter); + m_Device->AttachFilter(m_PatFilter); + } + } } void cStreamdevLiveStreamer::Detach(void) { - printf("RIGHT DETACH\n"); - m_Device->Detach(m_Receiver); + Dprintf("cStreamdevLiveStreamer::Detach()\n"); + if (m_Device) { + if (m_Receiver) + m_Device->Detach(m_Receiver); + if (m_PatFilter) + m_Device->Detach(m_PatFilter); + } } std::string cStreamdevLiveStreamer::Report(void) @@ -296,3 +573,110 @@ std::string cStreamdevLiveStreamer::Report(void) result += "\n"; return result; } + +// --- cStreamdevFilterStreamer ------------------------------------------------- + +#if VDRVERSNUM >= 10300 +cStreamdevFilterStreamer::cStreamdevFilterStreamer(): + cStreamdevStreamer("streamdev-filterstreaming"), + m_Device(NULL), + m_Filter(NULL)/*, + m_Channel(NULL)*/ +{ +} + +cStreamdevFilterStreamer::~cStreamdevFilterStreamer() +{ + Dprintf("Desctructing Filter streamer\n"); + Detach(); + m_Device = NULL; + DELETENULL(m_Filter); + Stop(); +} + +void cStreamdevFilterStreamer::Attach(void) +{ + Dprintf("cStreamdevFilterStreamer::Attach()\n"); + LOCK_THREAD; + if(m_Device && m_Filter) + m_Device->AttachFilter(m_Filter); +} + +void cStreamdevFilterStreamer::Detach(void) +{ + Dprintf("cStreamdevFilterStreamer::Detach()\n"); + LOCK_THREAD; + if(m_Device && m_Filter) + m_Device->Detach(m_Filter); +} + +#if 0 +void cStreamdevFilterStreamer::SetChannel(const cChannel *Channel) +{ + LOCK_THREAD; + Dprintf("cStreamdevFilterStreamer::SetChannel(%s : %s)", Channel?Channel->Name():"", + Channel ? *Channel->GetChannelID().ToString() : ""); + m_Channel = Channel; +} +#endif + +void cStreamdevFilterStreamer::SetDevice(cDevice *Device) +{ + Dprintf("cStreamdevFilterStreamer::SetDevice()\n"); + LOCK_THREAD; + if(Device != m_Device) { + Detach(); + m_Device = Device; + //m_Channel = NULL; + Attach(); + } +} + +bool cStreamdevFilterStreamer::SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On) +{ + Dprintf("cStreamdevFilterStreamer::SetFilter(%u,0x%x,0x%x,%s)\n", Pid, Tid, Mask, On?"On":"Off"); + + if(!m_Device) + return false; + + if (On) { + if (m_Filter == NULL) { + m_Filter = new cStreamdevLiveFilter(this); + Dprintf("attaching filter to device\n"); + Attach(); + } + m_Filter->Set(Pid, Tid, Mask); + } else if (m_Filter != NULL) + m_Filter->Del(Pid, Tid, Mask); + + return true; +} + +#if 0 +void cStreamdevFilterStreamer::ChannelSwitch(const cDevice *Device, int ChannelNumber) { + LOCK_THREAD; + if(Device == m_Device) { + if(ChannelNumber > 0) { + cChannel *ch = Channels.GetByNumber(ChannelNumber); + if(ch != NULL) { + if(m_Filter != NULL && + m_Channel != NULL && + (! TRANSPONDER(ch, m_Channel))) { + + isyslog("***** LiveFilterStreamer: transponder changed ! %s", + *ch->GetChannelID().ToString()); + + uchar buffer[TS_SIZE] = {TS_SYNC_BYTE, 0xff, 0xff, 0xff, 0x7f, 0}; + strcpy((char*)(buffer + 5), ch->GetChannelID().ToString()); + int p = Put(buffer, TS_SIZE); + if (p != TS_SIZE) + ReportOverflow(TS_SIZE - p); + } + m_Channel = ch; + } + } + } +} +#endif + +#endif // if VDRVERSNUM >= 10300 diff --git a/server/livestreamer.h b/server/livestreamer.h index 0c525bf..1973f71 100644 --- a/server/livestreamer.h +++ b/server/livestreamer.h @@ -5,34 +5,14 @@ #include #include "server/streamer.h" -#include "server/livefilter.h" #include "common.h" class cTS2PSRemux; class cTS2ESRemux; class cExternRemux; class cRemux; - -// --- cStreamdevLiveReceiver ------------------------------------------------- - -class cStreamdevLiveReceiver: public cReceiver { - friend class cStreamdevLiveStreamer; - -private: - cStreamdevLiveStreamer *m_Streamer; - -protected: - virtual void Activate(bool On); - virtual void Receive(uchar *Data, int Length); - -public: -#if VDRVERSNUM < 10500 - cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca, int Priority, const int *Pids); -#else - cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, tChannelID ChannelID, int Priority, const int *Pids); -#endif - virtual ~cStreamdevLiveReceiver(); -}; +class cStreamdevPatFilter; +class cStreamdevLiveReceiver; // --- cStreamdevLiveStreamer ------------------------------------------------- @@ -45,19 +25,23 @@ private: const cChannel *m_Channel; cDevice *m_Device; cStreamdevLiveReceiver *m_Receiver; + cStreamdevPatFilter *m_PatFilter; cRemux *m_PESRemux; cTS2ESRemux *m_ESRemux; cTS2PSRemux *m_PSRemux; cExternRemux *m_ExtRemux; + void StartReceiver(void); + bool HasPid(int Pid); + public: cStreamdevLiveStreamer(int Priority); virtual ~cStreamdevLiveStreamer(); void SetDevice(cDevice *Device) { m_Device = Device; } bool SetPid(int Pid, bool On); + bool SetPids(int Pid, const int *Pids1 = NULL, const int *Pids2 = NULL, const int *Pids3 = NULL); bool SetChannel(const cChannel *Channel, eStreamType StreamType, int Apid = 0); - bool SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On); virtual int Put(const uchar *Data, int Count); virtual uchar *Get(int &Count); @@ -70,12 +54,36 @@ public: virtual std::string Report(void); }; -// --- cStreamdevLiveReceiver reverse inlines --------------------------------- -inline void cStreamdevLiveReceiver::Activate(bool On) -{ - Dprintf("LiveReceiver->Activate(%d)\n", On); - m_Streamer->Activate(On); -} +// --- cStreamdevFilterStreamer ------------------------------------------------- + +# if VDRVERSNUM >= 10300 + +//#include + +class cStreamdevLiveFilter; + +class cStreamdevFilterStreamer: public cStreamdevStreamer /*, public cStatus*/ { +private: + cDevice *m_Device; + cStreamdevLiveFilter *m_Filter; + //const cChannel *m_Channel; + +public: + cStreamdevFilterStreamer(); + virtual ~cStreamdevFilterStreamer(); + + void SetDevice(cDevice *Device); + //void SetChannel(const cChannel *Channel); + bool SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On); + + virtual void Attach(void); + virtual void Detach(void); + + // cStatus message handlers + //virtual void ChannelSwitch(const cDevice *Device, int ChannelNumber); +}; + +# endif // if VDRVERSNUM >= 10300 #endif // VDR_STREAMDEV_LIVESTREAMER_H diff --git a/streamdev-server.c b/streamdev-server.c index af5f104..1d0c097 100644 --- a/streamdev-server.c +++ b/streamdev-server.c @@ -3,7 +3,7 @@ * * See the README file for copyright information and how to reach the author. * - * $Id: streamdev-server.c,v 1.5 2007/02/19 12:08:16 schmirl Exp $ + * $Id: streamdev-server.c,v 1.6 2007/04/16 11:01:02 schmirl Exp $ */ #include @@ -63,7 +63,7 @@ bool cPluginStreamdevServer::Start(void) if (!StreamdevHosts.Load(STREAMDEVHOSTSPATH, true, true)) { esyslog("streamdev-server: error while loading %s", STREAMDEVHOSTSPATH); - fprintf(stderr, "streamdev-server: error while loading %s\n"); + fprintf(stderr, "streamdev-server: error while loading %s\n", STREAMDEVHOSTSPATH); if (access(STREAMDEVHOSTSPATH, F_OK) != 0) { fprintf(stderr, " Please install streamdevhosts.conf into the path " "printed above. Without it\n" diff --git a/tools/socket.c b/tools/socket.c index 4b5167d..e9266c5 100644 --- a/tools/socket.c +++ b/tools/socket.c @@ -6,6 +6,15 @@ #include #include +// default class: best effort +#define DSCP_BE 0 +// gold class (video): assured forwarding 4 with lowest drop precedence +#define DSCP_AF41 34 << 2 +// premium class (voip): expedited forwarding +#define DSCP_EF 46 << 2 +// actual DSCP value used +#define STREAMDEV_DSCP DSCP_AF41 + cTBSocket::cTBSocket(int Type) { memset(&m_LocalAddr, 0, sizeof(m_LocalAddr)); memset(&m_RemoteAddr, 0, sizeof(m_RemoteAddr)); @@ -141,3 +150,8 @@ bool cTBSocket::Shutdown(int how) { return ::shutdown(*this, how) != -1; } + +bool cTBSocket::SetDSCP(void) { + int dscp = STREAMDEV_DSCP; + return ::setsockopt(*this, SOL_IP, IP_TOS, &dscp, sizeof(dscp)) != -1; +} diff --git a/tools/socket.h b/tools/socket.h index d1a7d62..23272ec 100644 --- a/tools/socket.h +++ b/tools/socket.h @@ -68,6 +68,9 @@ public: an appropriate value. */ virtual bool Accept(const cTBSocket &Listener); + /* Sets DSCP sockopt */ + bool SetDSCP(void); + /* LocalPort() returns the port number this socket is connected to locally. The result is undefined for a non-open socket. */ int LocalPort(void) const { return ntohs(m_LocalAddr.sin_port); } diff --git a/tools/source.c b/tools/source.c index 80625e5..c328d7c 100644 --- a/tools/source.c +++ b/tools/source.c @@ -110,7 +110,7 @@ bool cTBSource::SafeWrite(const void *Buffer, size_t Length) { ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq, uint TimeoutMs) { - int seqlen, ms; + int ms; size_t len; cTBSelect sel;