diff --git a/server/connectionHTTP.c b/server/connectionHTTP.c index 3fd8c27..914f87d 100644 --- a/server/connectionHTTP.c +++ b/server/connectionHTTP.c @@ -1,5 +1,5 @@ /* - * $Id: connectionHTTP.c,v 1.4 2005/02/08 17:22:35 lordjaxom Exp $ + * $Id: connectionHTTP.c,v 1.5 2005/02/08 19:54:52 lordjaxom Exp $ */ #include "server/connectionHTTP.h" @@ -56,8 +56,8 @@ bool cConnectionHTTP::Command(char *Cmd) { cDevice *device = GetDevice(m_Channel, 0); if (device != NULL) { device->SwitchChannel(m_Channel, false); - m_LiveStreamer->SetDevice(device); if (m_LiveStreamer->SetChannel(m_Channel, m_StreamType)) { + m_LiveStreamer->SetDevice(device); m_Startup = true; if (m_StreamType == stES && (m_Channel->Vpid() == 0 || m_Channel->Vpid() == 1 || m_Channel->Vpid() == 0x1FFF)) { diff --git a/server/livestreamer.c b/server/livestreamer.c index 6beea18..fad227f 100644 --- a/server/livestreamer.c +++ b/server/livestreamer.c @@ -18,23 +18,25 @@ cStreamdevLiveReceiver::~cStreamdevLiveReceiver() Detach(); } +void cStreamdevLiveReceiver::Activate(bool On) +{ + m_Streamer->Activate(On); +} + void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) { - int p = m_Streamer->Put(Data, Length); + int p = m_Streamer->Receive(Data, Length); if (p != Length) m_Streamer->ReportOverflow(Length - p); } cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority): - cStreamdevStreamer("Live streamer") { + cStreamdevStreamer("streamdev-livestreaming") { m_Priority = Priority; m_NumPids = 0; m_Channel = NULL; m_Device = NULL; m_Receiver = NULL; m_Remux = NULL; - m_Buffer = NULL; - m_Sequence = 0; - memset(m_Pids, 0, sizeof(m_Pids)); } cStreamdevLiveStreamer::~cStreamdevLiveStreamer() { @@ -44,7 +46,6 @@ cStreamdevLiveStreamer::~cStreamdevLiveStreamer() { #if VDRVERSNUM >= 10300 //delete m_Filter; TODO #endif - free(m_Buffer); } void cStreamdevLiveStreamer::Detach(void) { @@ -91,7 +92,7 @@ bool cStreamdevLiveStreamer::SetPid(int Pid, bool On) { m_Receiver = new cStreamdevLiveReceiver(this, m_Channel->Ca(), m_Priority, m_Pids); if (m_Device != NULL) { Dprintf("Attaching new receiver\n"); - m_Device->AttachReceiver(m_Receiver); + Attach(); } } return true; @@ -155,6 +156,8 @@ bool cStreamdevLiveStreamer::SetFilter(u_short Pid, u_char Tid, u_char Mask, #endif } +// TODO: Remuxer einbinden +#if 0 uchar *cStreamdevLiveStreamer::Process(const uchar *Data, int &Count, int &Result) { uchar *remuxed = m_Remux != NULL ? m_Remux->Process(Data, Count, Result) : cStreamdevStreamer::Process(Data, Count, Result); @@ -184,6 +187,7 @@ uchar *cStreamdevLiveStreamer::Process(const uchar *Data, int &Count, int &Resul } return NULL; } +#endif std::string cStreamdevLiveStreamer::Report(void) { std::string result; diff --git a/server/livestreamer.h b/server/livestreamer.h index 7cf52c7..fce2daa 100644 --- a/server/livestreamer.h +++ b/server/livestreamer.h @@ -21,6 +21,7 @@ private: cStreamdevLiveStreamer *m_Streamer; protected: + virtual void Activate(bool On); virtual void Receive(uchar *Data, int Length); public: @@ -39,10 +40,9 @@ private: cStreamdevLiveReceiver *m_Receiver; cTSRemux *m_Remux; uchar *m_Buffer; - int m_Sequence; protected: - virtual uchar *Process(const uchar *Data, int &Count, int &Result); + //virtual uchar *Process(const uchar *Data, int &Count, int &Result); public: cStreamdevLiveStreamer(int Priority); diff --git a/server/streamer.c b/server/streamer.c index 465eb88..00ac6cb 100644 --- a/server/streamer.c +++ b/server/streamer.c @@ -1,5 +1,5 @@ /* - * $Id: streamer.c,v 1.3 2005/02/08 17:22:35 lordjaxom Exp $ + * $Id: streamer.c,v 1.4 2005/02/08 19:54:52 lordjaxom Exp $ */ #include @@ -13,34 +13,74 @@ #include "tools/socket.h" #include "common.h" -#define VIDEOBUFSIZE MEGABYTE(4) -#define MAXBLOCKSIZE TS_SIZE*10 +cStreamdevWriter::cStreamdevWriter(cTBSocket *Socket, cStreamdevStreamer *Streamer): + cThread("streamdev-writer"), + m_Streamer(Streamer), + m_Socket(Socket), + m_Active(false) +{ +} + +cStreamdevWriter::~cStreamdevWriter() +{ + m_Active = false; + Cancel(3); +} + +void cStreamdevWriter::Action(void) +{ + int max = 0; + m_Active = true; + while (m_Active) { + int count; + uchar *block = m_Streamer->Get(count); + + if (!m_Socket->SafeWrite(block, count)) { + esyslog("ERROR: streamdev-server: couldn't send data: %m"); + break; + } + m_Streamer->Del(count); + } + m_Active = false; + Dprintf("Max. Transmit Blocksize was: %d\n", max); +} cStreamdevStreamer::cStreamdevStreamer(const char *Name): - cThread(((std::string)"Streamdev: " + Name).c_str()) + cThread(Name), + m_Active(false), + m_Writer(NULL), + m_RingBuffer(new cRingBufferLinear(STREAMERBUFSIZE, TS_SIZE * 2, true, + "streamdev-streamer")), + m_SendBuffer(new cRingBufferLinear(WRITERBUFSIZE, MAXTRANSMITBLOCKSIZE)) { - m_Active = false; - m_Receivers = 0; - m_Buffer = NULL; - m_Name = Name; - m_Socket = NULL; - m_RingBuffer = new cRingBufferLinear(VIDEOBUFSIZE, TS_SIZE * 2, true); + m_RingBuffer->SetTimeouts(0, 100); + m_SendBuffer->SetTimeouts(0, 100); } -cStreamdevStreamer::~cStreamdevStreamer() { +cStreamdevStreamer::~cStreamdevStreamer() +{ Stop(); - if (m_Buffer != NULL) delete[] m_Buffer; delete m_RingBuffer; + delete m_Writer; + delete m_SendBuffer; } -void cStreamdevStreamer::Start(cTBSocket *Socket) { - m_Socket = Socket; +void cStreamdevStreamer::Start(cTBSocket *Socket) +{ + m_Writer = new cStreamdevWriter(Socket, this); Attach(); - if (!m_Active) - cThread::Start(); } -void cStreamdevStreamer::Stop(void) { +void cStreamdevStreamer::Activate(bool On) +{ + if (On && !m_Active) { + m_Writer->Start(); + cThread::Start(); + } +} + +void cStreamdevStreamer::Stop(void) +{ if (m_Active) { Dprintf("stopping live streamer\n"); m_Active = false; @@ -48,50 +88,35 @@ void cStreamdevStreamer::Stop(void) { } } -uchar *cStreamdevStreamer::Process(const uchar *Data, int &Count, int &Result) { - if (m_Buffer == NULL) - m_Buffer = new uchar[MAXBLOCKSIZE]; - - if (Count > MAXBLOCKSIZE) - Count = MAXBLOCKSIZE; - memcpy(m_Buffer, Data, Count); - Result = Count; - return m_Buffer; +int cStreamdevStreamer::Put(const uchar *Data, int Count) +{ + return m_SendBuffer->Put(Data, Count); } -void cStreamdevStreamer::Action(void) { - int max = 0; +uchar *cStreamdevStreamer::Get(int &Count) +{ + return m_SendBuffer->Get(Count); +} -#if VDRVERSNUM < 10300 - isyslog("Streamdev: %s thread started (pid=%d)", m_Name, getpid()); -#endif +void cStreamdevStreamer::Del(int Count) +{ + return m_SendBuffer->Del(Count); +} + +void cStreamdevStreamer::Action(void) +{ + int max = 0; m_Active = true; while (m_Active) { - int recvd; - const uchar *block = m_RingBuffer->Get(recvd); + int got; + uchar *block = m_RingBuffer->Get(got); - if (block && recvd > 0) { - int result = 0; - uchar *sendBlock = Process(block, recvd, result); - - m_RingBuffer->Del(recvd); - if (result > max) max = result; - - if (!m_Socket->TimedWrite(sendBlock, result, 150)) { - if (errno != ETIMEDOUT) { - esyslog("ERROR: Streamdev: Couldn't write data: %s", strerror(errno)); - m_Active = false; - } - } - } else - usleep(1); // this keeps the CPU load low (XXX: waiting buffers) + if (block && got > 0) { + int count = Put(block, got); + if (count) + m_RingBuffer->Del(count); + } } - - Dprintf("Max. Transmit Blocksize was: %d\n", max); - -#if VDRVERSNUM < 10300 - isyslog("Streamdev: %s thread stopped", m_Name); -#endif } diff --git a/server/streamer.h b/server/streamer.h index 0f374b5..ff6411d 100644 --- a/server/streamer.h +++ b/server/streamer.h @@ -1,5 +1,5 @@ /* - * $Id: streamer.h,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ + * $Id: streamer.h,v 1.3 2005/02/08 19:54:52 lordjaxom Exp $ */ #ifndef VDR_STREAMDEV_STREAMER_H @@ -10,21 +10,37 @@ #include class cTBSocket; +class cStreamdevStreamer; + +#define MAXTRANSMITBLOCKSIZE TS_SIZE*10 +#define STREAMERBUFSIZE MEGABYTE(4) +#define WRITERBUFSIZE KILOBYTE(192) + +class cStreamdevWriter: public cThread { +private: + cStreamdevStreamer *m_Streamer; + cTBSocket *m_Socket; + bool m_Active; + +protected: + virtual void Action(void); + +public: + cStreamdevWriter(cTBSocket *Socket, cStreamdevStreamer *Streamer); + virtual ~cStreamdevWriter(); +}; class cStreamdevStreamer: public cThread { private: bool m_Active; - int m_Receivers; - uchar *m_Buffer; - const char *m_Name; - cTBSocket *m_Socket; + cStreamdevWriter *m_Writer; cRingBufferLinear *m_RingBuffer; + cRingBufferLinear *m_SendBuffer; protected: - virtual uchar *Process(const uchar *Data, int &Count, int &Result); virtual void Action(void); - const cTBSocket *Socket(void) const { return m_Socket; } + //const cTBSocket *Socket(void) const { return m_Socket; } public: cStreamdevStreamer(const char *Name); @@ -33,8 +49,13 @@ public: virtual void Start(cTBSocket *Socket); virtual void Stop(void); - int Put(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); } + void Activate(bool On); + int Receive(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); } void ReportOverflow(int Bytes) { m_RingBuffer->ReportOverflow(Bytes); } + + virtual int Put(const uchar *Data, int Count); + virtual uchar *Get(int &Count); + virtual void Del(int Count); virtual void Detach(void) = 0; virtual void Attach(void) = 0; diff --git a/tools/source.c b/tools/source.c index 3674eff..c832e2f 100644 --- a/tools/source.c +++ b/tools/source.c @@ -85,6 +85,29 @@ bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) { return true; } +bool cTBSource::SafeWrite(const void *Buffer, size_t Length) { + cTBSelect sel; + int offs; + + offs = 0; + while (Length > 0) { + int b; + + sel.Clear(); + sel.Add(m_Filed, true); + if (sel.Select() == -1) + return false; + + if (sel.CanWrite(m_Filed)) { + if ((b = Write((char*)Buffer + offs, Length)) == -1) + return false; + offs += b; + Length -= b; + } + } + return true; +} + ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq, uint TimeoutMs) { int seqlen, ms; diff --git a/tools/source.h b/tools/source.h index d55f3e1..09c4bf3 100644 --- a/tools/source.h +++ b/tools/source.h @@ -78,6 +78,8 @@ public: and errno is set appropriately. TimedRead only works on UNIX file descriptor sources. */ bool TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs); + + bool SafeWrite(const void *Buffer, size_t Length); /* ReadUntil() tries to read at most Length bytes into the storage pointed to by Buffer, which must be at least Length bytes in size, within the