Restructured cStreamdevStreamer: Moved inbound buffer into actual subclass.

This commit is contained in:
Frank Schmirler 2012-12-16 12:40:44 +01:00
parent 0677f48329
commit f5da0ea1fc
7 changed files with 52 additions and 30 deletions

View File

@ -1,6 +1,7 @@
VDR Plugin 'streamdev' Revision History VDR Plugin 'streamdev' Revision History
--------------------------------------- ---------------------------------------
- Restructured cStreamdevStreamer: Moved inbound buffer into actual subclass.
- In cStreamdevStreamer dropped Activate(bool) and moved its code into Start(). - In cStreamdevStreamer dropped Activate(bool) and moved its code into Start().
- Moved cStreamdevFilterStreamer to livefilter.[hc] - Moved cStreamdevFilterStreamer to livefilter.[hc]
- Return HTTP/1.1 compliant response headers plus some always useful headers - Return HTTP/1.1 compliant response headers plus some always useful headers

View File

@ -11,18 +11,19 @@
# define TS_SYNC_BYTE 0x47 # define TS_SYNC_BYTE 0x47
#endif #endif
#define FILTERBUFSIZE (1000 * TS_SIZE)
// --- cStreamdevLiveFilter ------------------------------------------------- // --- cStreamdevLiveFilter -------------------------------------------------
class cStreamdevLiveFilter: public cFilter { class cStreamdevLiveFilter: public cFilter {
private: private:
cStreamdevStreamer *m_Streamer; cStreamdevFilterStreamer *m_Streamer;
protected: protected:
virtual void Process(u_short Pid, u_char Tid, const u_char *Data, int Length); virtual void Process(u_short Pid, u_char Tid, const u_char *Data, int Length);
public: public:
cStreamdevLiveFilter(cStreamdevStreamer *Streamer); cStreamdevLiveFilter(cStreamdevFilterStreamer *Streamer);
void Set(u_short Pid, u_char Tid, u_char Mask) { void Set(u_short Pid, u_char Tid, u_char Mask) {
cFilter::Set(Pid, Tid, Mask); cFilter::Set(Pid, Tid, Mask);
@ -32,7 +33,7 @@ public:
} }
}; };
cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevStreamer *Streamer) { cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevFilterStreamer *Streamer) {
m_Streamer = Streamer; m_Streamer = Streamer;
} }
@ -54,9 +55,7 @@ void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data,
length -= chunk; length -= chunk;
pos += chunk; pos += chunk;
int p = m_Streamer->Put(buffer, TS_SIZE); m_Streamer->Receive(buffer);
if (p != TS_SIZE)
m_Streamer->ReportOverflow(TS_SIZE - p);
} }
} }
@ -68,6 +67,8 @@ cStreamdevFilterStreamer::cStreamdevFilterStreamer():
m_Filter(NULL)/*, m_Filter(NULL)/*,
m_Channel(NULL)*/ m_Channel(NULL)*/
{ {
m_ReceiveBuffer = new cStreamdevBuffer(FILTERBUFSIZE, TS_SIZE);
m_ReceiveBuffer->SetTimeouts(0, 500);
} }
cStreamdevFilterStreamer::~cStreamdevFilterStreamer() cStreamdevFilterStreamer::~cStreamdevFilterStreamer()
@ -77,6 +78,14 @@ cStreamdevFilterStreamer::~cStreamdevFilterStreamer()
m_Device = NULL; m_Device = NULL;
DELETENULL(m_Filter); DELETENULL(m_Filter);
Stop(); Stop();
delete m_ReceiveBuffer;
}
void cStreamdevFilterStreamer::Receive(uchar *Data)
{
int p = m_ReceiveBuffer->PutTS(Data, TS_SIZE);
if (p != TS_SIZE)
m_ReceiveBuffer->ReportOverflow(TS_SIZE - p);
} }
void cStreamdevFilterStreamer::Attach(void) void cStreamdevFilterStreamer::Attach(void)

View File

@ -14,6 +14,11 @@ class cStreamdevFilterStreamer: public cStreamdevStreamer {
private: private:
cDevice *m_Device; cDevice *m_Device;
cStreamdevLiveFilter *m_Filter; cStreamdevLiveFilter *m_Filter;
cStreamdevBuffer *m_ReceiveBuffer;
protected:
virtual uchar* GetFromReceiver(int &Count) { return m_ReceiveBuffer->Get(Count); }
virtual void DelFromReceiver(int Count) { m_ReceiveBuffer->Del(Count); }
public: public:
cStreamdevFilterStreamer(); cStreamdevFilterStreamer();
@ -21,6 +26,7 @@ public:
void SetDevice(cDevice *Device); void SetDevice(cDevice *Device);
bool SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On); bool SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On);
void Receive(uchar *Data);
virtual void Attach(void); virtual void Attach(void);
virtual void Detach(void); virtual void Detach(void);

View File

@ -21,18 +21,17 @@ class cStreamdevLiveReceiver: public cReceiver {
friend class cStreamdevStreamer; friend class cStreamdevStreamer;
private: private:
cStreamdevStreamer *m_Streamer; cStreamdevLiveStreamer *m_Streamer;
protected: protected:
virtual void Receive(uchar *Data, int Length); virtual void Receive(uchar *Data, int Length);
public: public:
cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, const cChannel *Channel, int Priority, const int *Pids); cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, const cChannel *Channel, int Priority, const int *Pids);
virtual ~cStreamdevLiveReceiver(); virtual ~cStreamdevLiveReceiver();
}; };
cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, const cChannel *Channel, cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, const cChannel *Channel, int Priority, const int *Pids):
int Priority, const int *Pids):
cReceiver(Channel, Priority), cReceiver(Channel, Priority),
m_Streamer(Streamer) m_Streamer(Streamer)
{ {
@ -48,9 +47,7 @@ cStreamdevLiveReceiver::~cStreamdevLiveReceiver()
} }
void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) { void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) {
int p = m_Streamer->Receive(Data, Length); m_Streamer->Receive(Data, Length);
if (p != Length)
m_Streamer->ReportOverflow(Length - p);
} }
// --- cStreamdevPatFilter ---------------------------------------------------- // --- cStreamdevPatFilter ----------------------------------------------------
@ -341,18 +338,18 @@ cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority, const cServerConnec
m_PatFilter(NULL), m_PatFilter(NULL),
m_Remux(NULL) m_Remux(NULL)
{ {
m_ReceiveBuffer = new cStreamdevBuffer(LIVEBUFSIZE, TS_SIZE *2, true, "streamdev-livestreamer"),
m_ReceiveBuffer->SetTimeouts(0, 100);
} }
cStreamdevLiveStreamer::~cStreamdevLiveStreamer() cStreamdevLiveStreamer::~cStreamdevLiveStreamer()
{ {
Dprintf("Desctructing Live streamer\n"); Dprintf("Desctructing Live streamer\n");
Stop(); Stop();
if(m_PatFilter) {
Detach();
DELETENULL(m_PatFilter); DELETENULL(m_PatFilter);
}
DELETENULL(m_Receiver); DELETENULL(m_Receiver);
delete m_Remux; delete m_Remux;
delete m_ReceiveBuffer;
} }
bool cStreamdevLiveStreamer::HasPid(int Pid) bool cStreamdevLiveStreamer::HasPid(int Pid)
@ -522,6 +519,13 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType Str
} }
} }
void cStreamdevLiveStreamer::Receive(uchar *Data, int Length)
{
int p = m_ReceiveBuffer->PutTS(Data, Length);
if (p != Length)
m_ReceiveBuffer->ReportOverflow(Length - p);
}
int cStreamdevLiveStreamer::Put(const uchar *Data, int Count) int cStreamdevLiveStreamer::Put(const uchar *Data, int Count)
{ {
// insert si data // insert si data

View File

@ -7,6 +7,8 @@
#include "server/streamer.h" #include "server/streamer.h"
#include "common.h" #include "common.h"
#define LIVEBUFSIZE (20000 * TS_SIZE)
namespace Streamdev { namespace Streamdev {
class cTSRemux; class cTSRemux;
} }
@ -24,12 +26,19 @@ private:
const cChannel *m_Channel; const cChannel *m_Channel;
cDevice *m_Device; cDevice *m_Device;
cStreamdevLiveReceiver *m_Receiver; cStreamdevLiveReceiver *m_Receiver;
cStreamdevBuffer *m_ReceiveBuffer;
cStreamdevPatFilter *m_PatFilter; cStreamdevPatFilter *m_PatFilter;
Streamdev::cTSRemux *m_Remux; Streamdev::cTSRemux *m_Remux;
void StartReceiver(void); void StartReceiver(void);
bool HasPid(int Pid); bool HasPid(int Pid);
protected:
virtual uchar* GetFromReceiver(int &Count) { return m_ReceiveBuffer->Get(Count); }
virtual void DelFromReceiver(int Count) { m_ReceiveBuffer->Del(Count); }
virtual int Put(const uchar *Data, int Count);
public: public:
cStreamdevLiveStreamer(int Priority, const cServerConnection *Connection); cStreamdevLiveStreamer(int Priority, const cServerConnection *Connection);
virtual ~cStreamdevLiveStreamer(); virtual ~cStreamdevLiveStreamer();
@ -42,7 +51,7 @@ public:
void GetSignal(int *DevNum, int *Strength, int *Quality) const; void GetSignal(int *DevNum, int *Strength, int *Quality) const;
cString ToText() const; cString ToText() const;
virtual int Put(const uchar *Data, int Count); void Receive(uchar *Data, int Length);
virtual uchar *Get(int &Count); virtual uchar *Get(int &Count);
virtual void Del(int Count); virtual void Del(int Count);

View File

@ -109,18 +109,14 @@ cStreamdevStreamer::cStreamdevStreamer(const char *Name, const cServerConnection
cThread(Name), cThread(Name),
m_Connection(Connection), m_Connection(Connection),
m_Writer(NULL), m_Writer(NULL),
m_RingBuffer(new cStreamdevBuffer(STREAMERBUFSIZE, TS_SIZE * 2,
true, "streamdev-streamer")),
m_SendBuffer(new cStreamdevBuffer(WRITERBUFSIZE, TS_SIZE * 2)) m_SendBuffer(new cStreamdevBuffer(WRITERBUFSIZE, TS_SIZE * 2))
{ {
m_RingBuffer->SetTimeouts(0, 100);
m_SendBuffer->SetTimeouts(100, 100); m_SendBuffer->SetTimeouts(100, 100);
} }
cStreamdevStreamer::~cStreamdevStreamer() cStreamdevStreamer::~cStreamdevStreamer()
{ {
Dprintf("Desctructing streamer\n"); Dprintf("Desctructing streamer\n");
delete m_RingBuffer;
delete m_SendBuffer; delete m_SendBuffer;
} }
@ -152,12 +148,12 @@ void cStreamdevStreamer::Action(void)
SetPriority(-3); SetPriority(-3);
while (Running()) { while (Running()) {
int got; int got;
uchar *block = m_RingBuffer->Get(got); uchar *block = GetFromReceiver(got);
if (block) { if (block) {
int count = Put(block, got); int count = Put(block, got);
if (count) if (count)
m_RingBuffer->Del(count); DelFromReceiver(count);
} }
} }
} }

View File

@ -17,7 +17,6 @@ class cServerConnection;
#define TS_SIZE 188 #define TS_SIZE 188
#endif #endif
#define STREAMERBUFSIZE (20000 * TS_SIZE)
#define WRITERBUFSIZE (20000 * TS_SIZE) #define WRITERBUFSIZE (20000 * TS_SIZE)
// --- cStreamdevBuffer ------------------------------------------------------- // --- cStreamdevBuffer -------------------------------------------------------
@ -67,10 +66,12 @@ class cStreamdevStreamer: public cThread {
private: private:
const cServerConnection *m_Connection; const cServerConnection *m_Connection;
cStreamdevWriter *m_Writer; cStreamdevWriter *m_Writer;
cStreamdevBuffer *m_RingBuffer;
cStreamdevBuffer *m_SendBuffer; cStreamdevBuffer *m_SendBuffer;
protected: protected:
virtual uchar* GetFromReceiver(int &Count) = 0;
virtual void DelFromReceiver(int Count) = 0;
virtual int Put(const uchar *Data, int Count) { return m_SendBuffer->PutTS(Data, Count); }
virtual void Action(void); virtual void Action(void);
bool IsRunning(void) const { return m_Writer; } bool IsRunning(void) const { return m_Writer; }
@ -85,10 +86,6 @@ public:
virtual void Stop(void); virtual void Stop(void);
bool Abort(void); bool Abort(void);
int Receive(uchar *Data, int Length) { return m_RingBuffer->PutTS(Data, Length); }
void ReportOverflow(int Bytes) { m_RingBuffer->ReportOverflow(Bytes); }
virtual int Put(const uchar *Data, int Count) { return m_SendBuffer->PutTS(Data, Count); }
virtual uchar *Get(int &Count) { return m_SendBuffer->Get(Count); } virtual uchar *Get(int &Count) { return m_SendBuffer->Get(Count); }
virtual void Del(int Count) { m_SendBuffer->Del(Count); } virtual void Del(int Count) { m_SendBuffer->Del(Count); }