/* * $Id: streamer.c,v 1.10 2005/04/27 17:55:43 lordjaxom Exp $ */ #include #include #include #include #include "server/streamer.h" #include "server/suspend.h" #include "server/setup.h" #include "tools/socket.h" #include "tools/select.h" #include "common.h" // --- cStreamdevWriter ------------------------------------------------------- cStreamdevWriter::cStreamdevWriter(cTBSocket *Socket, cStreamdevStreamer *Streamer): cThread("streamdev-writer"), m_Streamer(Streamer), m_Socket(Socket), m_Active(false) { } cStreamdevWriter::~cStreamdevWriter() { Dprintf("destructing writer\n"); m_Active = false; Cancel(3); } void cStreamdevWriter::Action(void) { cTBSelect sel; Dprintf("Writer start\n"); int max = 0; m_Active = true; while (m_Active) { int count; uchar *block = m_Streamer->Get(count); if (block) { sel.Clear(); sel.Add(*m_Socket, true); if (sel.Select(500) == -1) { esyslog("ERROR: streamdev-server: couldn't send data: %m"); break; } if (sel.CanWrite(*m_Socket)) { int written; if ((written = m_Socket->Write(block, count)) == -1) { esyslog("ERROR: streamdev-server: couldn't send data: %m"); break; } if (count > max) max = count; m_Streamer->Del(written); } } } m_Active = false; Dprintf("Max. Transmit Blocksize was: %d\n", max); m_Streamer->Stop(); } // --- cStreamdevStreamer ----------------------------------------------------- cStreamdevStreamer::cStreamdevStreamer(const char *Name): cThread(Name), m_Active(false), m_Running(false), m_Writer(NULL), m_RingBuffer(new cRingBufferLinear(STREAMERBUFSIZE, TS_SIZE * 2, true, "streamdev-streamer")), m_SendBuffer(new cRingBufferLinear(WRITERBUFSIZE, TS_SIZE * 2)) { m_RingBuffer->SetTimeouts(0, 100); m_SendBuffer->SetTimeouts(0, 100); } cStreamdevStreamer::~cStreamdevStreamer() { Dprintf("Desctructing streamer\n"); delete m_RingBuffer; delete m_SendBuffer; } void cStreamdevStreamer::Start(cTBSocket *Socket) { Dprintf("start streamer\n"); m_Writer = new cStreamdevWriter(Socket, this); m_Running = true; Attach(); } void cStreamdevStreamer::Activate(bool On) { if (On && !m_Active) { Dprintf("activate streamer\n"); m_Writer->Start(); cThread::Start(); } } void cStreamdevStreamer::Stop(void) { Lock(); if (m_Active) { Dprintf("stopping streamer\n"); m_Active = false; Cancel(3); } if (m_Running) { Detach(); m_Running = false; DELETENULL(m_Writer); } Unlock(); } void cStreamdevStreamer::Action(void) { int max = 0; m_Active = true; while (m_Active) { int got; uchar *block = m_RingBuffer->Get(got); if (block) { int count = Put(block, got); if (count) m_RingBuffer->Del(count); } } }