diff --git a/protocolif.h b/protocolif.h index 2089fea..925aaac 100644 --- a/protocolif.h +++ b/protocolif.h @@ -3,7 +3,7 @@ * * See the README file for copyright information and how to reach the author. * - * $Id: protocolif.h,v 1.1 2007/09/14 15:44:25 rahrenbe Exp $ + * $Id: protocolif.h,v 1.2 2007/09/15 20:33:15 rahrenbe Exp $ */ #ifndef __IPTV_PROTOCOLIF_H @@ -13,7 +13,7 @@ class cIptvProtocolIf { public: cIptvProtocolIf() {} virtual ~cIptvProtocolIf() {} - virtual int Read(unsigned char *Buffer, int Len) = 0; + virtual int Read(unsigned char *Buffer) = 0; virtual bool Set(const char* Address, const int Port) = 0; virtual bool Open(void) = 0; virtual bool Close(void) = 0; diff --git a/protocoludp.c b/protocoludp.c index 2f7b679..a1d8776 100644 --- a/protocoludp.c +++ b/protocoludp.c @@ -3,7 +3,7 @@ * * See the README file for copyright information and how to reach the author. * - * $Id: protocoludp.c,v 1.1 2007/09/14 15:44:25 rahrenbe Exp $ + * $Id: protocoludp.c,v 1.2 2007/09/15 20:33:15 rahrenbe Exp $ */ #include @@ -12,16 +12,23 @@ #include #include +#include + #include "common.h" #include "protocoludp.h" cIptvProtocolUdp::cIptvProtocolUdp() : streamPort(1234), socketDesc(-1), + readBufferLen(TS_SIZE * 7), mcastActive(false) { debug("cIptvProtocolUdp::cIptvProtocolUdp()\n"); streamAddr = strdup(""); + // Allocate receive buffer + readBuffer = MALLOC(unsigned char, readBufferLen); + if (!readBuffer) + error("ERROR: MALLOC() failed in ProtocolUdp()"); } cIptvProtocolUdp::~cIptvProtocolUdp() @@ -31,6 +38,7 @@ cIptvProtocolUdp::~cIptvProtocolUdp() Close(); // Free allocated memory free(streamAddr); + free(readBuffer); } bool cIptvProtocolUdp::OpenSocket(const int Port) @@ -143,10 +151,11 @@ bool cIptvProtocolUdp::DropMulticast(void) return true; } -int cIptvProtocolUdp::Read(unsigned char *Buffer, int Len) +int cIptvProtocolUdp::Read(unsigned char *Buffer) { //debug("cIptvProtocolUdp::Read()\n"); socklen_t addrlen = sizeof(sockAddr); + Buffer = readBuffer; // Wait for data struct timeval tv; tv.tv_sec = 0; @@ -165,7 +174,7 @@ int cIptvProtocolUdp::Read(unsigned char *Buffer, int Len) // Check if data available else if (retval) { // Read data from socket - return recvfrom(socketDesc, Buffer, Len, MSG_DONTWAIT, + return recvfrom(socketDesc, readBuffer, readBufferLen, MSG_DONTWAIT, (struct sockaddr *)&sockAddr, &addrlen); } return 0; @@ -203,4 +212,3 @@ bool cIptvProtocolUdp::Set(const char* Address, const int Port) } return true; } - diff --git a/protocoludp.h b/protocoludp.h index ce3593a..3de5cb7 100644 --- a/protocoludp.h +++ b/protocoludp.h @@ -3,7 +3,7 @@ * * See the README file for copyright information and how to reach the author. * - * $Id: protocoludp.h,v 1.1 2007/09/14 15:44:25 rahrenbe Exp $ + * $Id: protocoludp.h,v 1.2 2007/09/15 20:33:15 rahrenbe Exp $ */ #ifndef __IPTV_PROTOCOLUDP_H @@ -17,6 +17,8 @@ private: char* streamAddr; int streamPort; int socketDesc; + unsigned char* readBuffer; + unsigned int readBufferLen; struct sockaddr_in sockAddr; bool mcastActive; @@ -29,7 +31,7 @@ private: public: cIptvProtocolUdp(); virtual ~cIptvProtocolUdp(); - virtual int Read(unsigned char *Buffer, int Len); + virtual int Read(unsigned char *Buffer); virtual bool Set(const char* Address, const int Port); virtual bool Open(void); virtual bool Close(void); diff --git a/streamer.c b/streamer.c index a664693..5b3890d 100644 --- a/streamer.c +++ b/streamer.c @@ -3,16 +3,9 @@ * * See the README file for copyright information and how to reach the author. * - * $Id: streamer.c,v 1.11 2007/09/14 15:44:25 rahrenbe Exp $ + * $Id: streamer.c,v 1.12 2007/09/15 20:33:15 rahrenbe Exp $ */ -#include -#include -#include -#include -#include - -#include #include #include @@ -23,14 +16,9 @@ cIptvStreamer::cIptvStreamer(cRingBufferLinear* RingBuffer, cMutex* Mutex) : cThread("IPTV streamer"), ringBuffer(RingBuffer), mutex(Mutex), - readBufferLen(TS_SIZE * 7), protocol(NULL) { debug("cIptvStreamer::cIptvStreamer()\n"); - // Allocate receive buffer - readBuffer = MALLOC(unsigned char, readBufferLen); - if (!readBuffer) - error("ERROR: MALLOC(readBuffer) failed"); } cIptvStreamer::~cIptvStreamer() @@ -38,8 +26,6 @@ cIptvStreamer::~cIptvStreamer() debug("cIptvStreamer::~cIptvStreamer()\n"); // Close the protocol Close(); - // Free allocated memory - free(readBuffer); } void cIptvStreamer::Action(void) @@ -47,20 +33,21 @@ void cIptvStreamer::Action(void) debug("cIptvStreamer::Action(): Entering\n"); // Do the thread loop while (Running()) { - if (ringBuffer && mutex && readBuffer && protocol) { - int length = protocol->Read(readBuffer, readBufferLen); + if (ringBuffer && mutex && protocol) { + unsigned char *buffer = NULL; + int length = protocol->Read(buffer); if (length >= 0) { mutex->Lock(); - int p = ringBuffer->Put(readBuffer, length); + int p = ringBuffer->Put(buffer, length); if (p != length && Running()) ringBuffer->ReportOverflow(length - p); mutex->Unlock(); } else - cCondWait::SleepMs(3); // reduce cpu load + cCondWait::SleepMs(100); // to reduce cpu load } else - cCondWait::SleepMs(100); // avoid busy loop + cCondWait::SleepMs(100); // and avoid busy loop } debug("cIptvStreamer::Action(): Exiting\n"); } @@ -92,18 +79,18 @@ bool cIptvStreamer::Set(const char* Address, const int Port, cIptvProtocolIf* Pr { debug("cIptvStreamer::Set(): %s:%d\n", Address, Port); if (!isempty(Address)) { - // Update protocol; Close the existing one if changed - if (protocol != Protocol) { - if (protocol) - protocol->Close(); - protocol = Protocol; - if (protocol) - protocol->Open(); - } - // Set protocol address and port - if (protocol) + // Update protocol; Close the existing one if changed + if (protocol != Protocol) { + if (protocol) + protocol->Close(); + protocol = Protocol; + if (protocol) + protocol->Open(); + } + // Set protocol address and port + if (protocol) protocol->Set(Address, Port); - } + } return true; }