From 186671647147d05821224f2b37b2ad70750f4548 Mon Sep 17 00:00:00 2001 From: Frank Schmirler Date: Sun, 16 Dec 2012 13:21:19 +0100 Subject: [PATCH] Close writer when streamer is finished --- HISTORY | 1 + server/livefilter.c | 16 ++++++++++++++++ server/livefilter.h | 2 ++ server/livestreamer.c | 8 +++++++- server/livestreamer.h | 2 ++ server/streamer.c | 6 ++++++ server/streamer.h | 1 + 7 files changed, 35 insertions(+), 1 deletion(-) diff --git a/HISTORY b/HISTORY index 0d79204..03d3fbe 100644 --- a/HISTORY +++ b/HISTORY @@ -1,6 +1,7 @@ VDR Plugin 'streamdev' Revision History --------------------------------------- +- Close writer when streamer is finished - Don't abort VTP connection if filter stream is broken - Restructured cStreamdevStreamer: Moved inbound buffer into actual subclass. - In cStreamdevStreamer dropped Activate(bool) and moved its code into Start(). diff --git a/server/livefilter.c b/server/livefilter.c index c707b3b..c159602 100644 --- a/server/livefilter.c +++ b/server/livefilter.c @@ -18,13 +18,17 @@ class cStreamdevLiveFilter: public cFilter { private: cStreamdevFilterStreamer *m_Streamer; + bool m_On; protected: virtual void Process(u_short Pid, u_char Tid, const u_char *Data, int Length); + virtual void SetStatus(bool On); public: cStreamdevLiveFilter(cStreamdevFilterStreamer *Streamer); + virtual bool IsAttached(void) const { return m_On; }; + void Set(u_short Pid, u_char Tid, u_char Mask) { cFilter::Set(Pid, Tid, Mask); } @@ -34,9 +38,16 @@ public: }; cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevFilterStreamer *Streamer) { + m_On = false; m_Streamer = Streamer; } +void cStreamdevLiveFilter::SetStatus(bool On) +{ + m_On = On; + cFilter::SetStatus(On); +} + void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data, int Length) { uchar buffer[TS_SIZE]; @@ -113,6 +124,11 @@ void cStreamdevFilterStreamer::SetDevice(cDevice *Device) Attach(); } +bool cStreamdevFilterStreamer::IsReceiving(void) const +{ + return m_Filter && m_Filter->IsAttached(); +} + bool cStreamdevFilterStreamer::SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On) { Dprintf("cStreamdevFilterStreamer::SetFilter(%u,0x%x,0x%x,%s)\n", Pid, Tid, Mask, On?"On":"Off"); diff --git a/server/livefilter.h b/server/livefilter.h index fd23931..f3e2d6d 100644 --- a/server/livefilter.h +++ b/server/livefilter.h @@ -26,6 +26,8 @@ public: void SetDevice(cDevice *Device); bool SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On); + + virtual bool IsReceiving(void) const; void Receive(uchar *Data); virtual void Attach(void); diff --git a/server/livestreamer.c b/server/livestreamer.c index c601573..8ae9437 100644 --- a/server/livestreamer.c +++ b/server/livestreamer.c @@ -449,13 +449,19 @@ cString cStreamdevLiveStreamer::ToText() const return cString(""); } +bool cStreamdevLiveStreamer::IsReceiving(void) const +{ + cThreadLock ThreadLock(m_Device); + return m_Receiver && m_Receiver->IsAttached(); +} + void cStreamdevLiveStreamer::StartReceiver(void) { if (m_NumPids > 0) { Dprintf("Creating Receiver to respect changed pids\n"); cReceiver *current = m_Receiver; - m_Receiver = new cStreamdevLiveReceiver(this, m_Channel, m_Priority, m_Pids); cThreadLock ThreadLock(m_Device); + m_Receiver = new cStreamdevLiveReceiver(this, m_Channel, m_Priority, m_Pids); if (IsRunning()) Attach(); delete current; diff --git a/server/livestreamer.h b/server/livestreamer.h index 4af7b11..8c7fc88 100644 --- a/server/livestreamer.h +++ b/server/livestreamer.h @@ -52,6 +52,8 @@ public: cString ToText() const; void Receive(uchar *Data, int Length); + virtual bool IsReceiving(void) const; + virtual uchar *Get(int &Count); virtual void Del(int Count); diff --git a/server/streamer.c b/server/streamer.c index 9a6f43e..ad4dcc5 100644 --- a/server/streamer.c +++ b/server/streamer.c @@ -54,6 +54,11 @@ void cStreamdevWriter::Action(void) if (block == NULL) { block = m_Streamer->Get(count); offset = 0; + // still no data - are we done? + if (block == NULL && !m_Streamer->IsReceiving() && timeout++ > 20) { + esyslog("streamdev-server: streamer done - writer exiting"); + break; + } } if (block != NULL) { @@ -100,6 +105,7 @@ void cStreamdevWriter::Action(void) } } } + m_Socket->Close(); Dprintf("Max. Transmit Blocksize was: %d\n", max); } diff --git a/server/streamer.h b/server/streamer.h index 6bc2247..74c9524 100644 --- a/server/streamer.h +++ b/server/streamer.h @@ -84,6 +84,7 @@ public: virtual void Start(cTBSocket *Socket); virtual void Stop(void); + virtual bool IsReceiving(void) const = 0; bool Abort(void); virtual uchar *Get(int &Count) { return m_SendBuffer->Get(Count); }