From 5d6d68d566a0318f4a486fde51d26f8ae72a3ca0 Mon Sep 17 00:00:00 2001 From: Rolf Ahrenberg Date: Thu, 13 Sep 2007 16:58:22 +0000 Subject: [PATCH] Minor refactoring. --- device.c | 6 +- streamer.c | 280 +++++++++++++++++++++++++---------------------------- streamer.h | 22 +++-- 3 files changed, 146 insertions(+), 162 deletions(-) diff --git a/device.c b/device.c index be6f1d3..f307905 100644 --- a/device.c +++ b/device.c @@ -3,7 +3,7 @@ * * See the README file for copyright information and how to reach the author. * - * $Id: device.c,v 1.4 2007/09/12 21:21:55 rahrenbe Exp $ + * $Id: device.c,v 1.5 2007/09/13 16:58:22 rahrenbe Exp $ */ #include "common.h" @@ -140,7 +140,7 @@ bool cIptvDevice::OpenDvr(void) isPacketDelivered = false; tsBuffer->Clear(); mutex.Unlock(); - pIptvStreamer->Activate(); + pIptvStreamer->OpenStream(); isOpenDvr = true; return true; } @@ -148,7 +148,7 @@ bool cIptvDevice::OpenDvr(void) void cIptvDevice::CloseDvr(void) { debug("cIptvDevice::CloseDvr(%d)\n", deviceIndex); - pIptvStreamer->Deactivate(); + pIptvStreamer->CloseStream(); isOpenDvr = false; } diff --git a/streamer.c b/streamer.c index 21facef..ddb9a51 100644 --- a/streamer.c +++ b/streamer.c @@ -3,7 +3,7 @@ * * See the README file for copyright information and how to reach the author. * - * $Id: streamer.c,v 1.9 2007/09/13 14:10:37 ajhseppa Exp $ + * $Id: streamer.c,v 1.10 2007/09/13 16:58:22 rahrenbe Exp $ */ #include @@ -21,20 +21,18 @@ cIptvStreamer::cIptvStreamer(cRingBufferLinear* Buffer, cMutex* Mutex) : cThread("IPTV streamer"), - dataPort(1234), + streamPort(1234), + socketDesc(-1), pRingBuffer(Buffer), bufferSize(TS_SIZE * 7), mutex(Mutex), - socketActive(false), mcastActive(false) { - debug("cIptvStreamer::cIptvStreamer()\n"); - memset(&stream, '\0', strlen(stream)); - + streamAddr = strdup(""); // Create the socket - CheckAndCreateSocket(dataPort); - + OpenSocket(streamPort); + // Allocate receive buffer pReceiveBuffer = MALLOC(unsigned char, bufferSize); if (!pReceiveBuffer) error("ERROR: MALLOC(pReceiveBuffer) failed"); @@ -43,69 +41,65 @@ cIptvStreamer::cIptvStreamer(cRingBufferLinear* Buffer, cMutex* Mutex) cIptvStreamer::~cIptvStreamer() { debug("cIptvStreamer::~cIptvStreamer()\n"); - - Deactivate(); - - if (pReceiveBuffer) - free(pReceiveBuffer); - - // Close the socket + // Drop the multicast group + DropMulticast(); + // Close the stream and socket + CloseStream(); CloseSocket(); + // Free allocated memory + free(streamAddr); + free(pReceiveBuffer); } -void cIptvStreamer::Action() +void cIptvStreamer::Action(void) { debug("cIptvStreamer::Action(): Entering\n"); - - // Create files necessary for selecting I/O from socket. + // Create files necessary for selecting I/O from socket fd_set rfds; FD_ZERO(&rfds); FD_SET(socketDesc, &rfds); - + // Do the thread loop while (Running()) { - socklen_t addrlen = sizeof(sa); - - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 500000; - - // Wait for data - int retval = select(socketDesc + 1, &rfds, NULL, NULL, &tv); - - if (retval < 0) { - char tmp[64]; - error("ERROR: select(): %s", strerror_r(errno, tmp, sizeof(tmp))); - } else if(retval) { - - // Read data from socket - int length = recvfrom(socketDesc, pReceiveBuffer, bufferSize, - MSG_DONTWAIT, (struct sockaddr *)&sa, &addrlen); - mutex->Lock(); - int p = pRingBuffer->Put(pReceiveBuffer, length); - if (p != length && Running()) { - pRingBuffer->ReportOverflow(length - p); - } - mutex->Unlock(); - - } else { - debug("Timeout waiting for data\n"); + if (pRingBuffer && mutex && pReceiveBuffer && (socketDesc >= 0)) { + struct timeval tv; + socklen_t addrlen = sizeof(sockAddr); + // Wait for data + tv.tv_sec = 0; + tv.tv_usec = 500000; + int retval = select(socketDesc + 1, &rfds, NULL, NULL, &tv); + if (retval < 0) { + char tmp[64]; + error("ERROR: select(): %s", strerror_r(errno, tmp, sizeof(tmp))); + } else if (retval) { + // Read data from socket + int length = recvfrom(socketDesc, pReceiveBuffer, bufferSize, MSG_DONTWAIT, + (struct sockaddr *)&sockAddr, &addrlen); + mutex->Lock(); + int p = pRingBuffer->Put(pReceiveBuffer, length); + if (p != length && Running()) + pRingBuffer->ReportOverflow(length - p); + mutex->Unlock(); + } else + debug("cIptvStreamer::Action(): Timeout\n"); + } + else + cCondWait::SleepMs(100); // avoid busy loop } - } debug("cIptvStreamer::Action(): Exiting\n"); } -bool cIptvStreamer::CheckAndCreateSocket(const int port) +bool cIptvStreamer::OpenSocket(const int port) { + debug("cIptvStreamer::OpenSocket()\n"); // If socket is there already and it is bound to a different port, it must // be closed first - if (socketActive && port != dataPort) { - debug("Full tear-down of active socket\n"); + if (port != streamPort) { + debug("cIptvStreamer::OpenSocket(): Socket tear-down\n"); CloseSocket(); } - // Bind to the socket if it is not active already - if (!socketActive) { - + if (socketDesc < 0) { + int yes = 1; // Create socket socketDesc = socket(PF_INET, SOCK_DGRAM, 0); if (socketDesc < 0) { @@ -113,142 +107,130 @@ bool cIptvStreamer::CheckAndCreateSocket(const int port) error("ERROR: socket(): %s", strerror_r(errno, tmp, sizeof(tmp))); return false; } - - // Make it use non-blocking I/O to avoid stuck read -calls. + // Make it use non-blocking I/O to avoid stuck read calls if (fcntl(socketDesc, F_SETFL, O_NONBLOCK)) { char tmp[64]; error("ERROR: fcntl(): %s", strerror_r(errno, tmp, sizeof(tmp))); - close(socketDesc); + CloseSocket(); return false; } - - int yes = 1; - // Allow multiple sockets to use the same PORT number if (setsockopt(socketDesc, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) { char tmp[64]; error("ERROR: setsockopt(): %s", strerror_r(errno, tmp, sizeof(tmp))); - close(socketDesc); + CloseSocket(); return false; } - - memset(&sa, '\0', sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr.s_addr = htonl(INADDR_ANY); - - int err = bind(socketDesc, (struct sockaddr *)&sa, sizeof(sa)); + // Bind socket + memset(&sockAddr, '\0', sizeof(sockAddr)); + sockAddr.sin_family = AF_INET; + sockAddr.sin_port = htons(port); + sockAddr.sin_addr.s_addr = htonl(INADDR_ANY); + int err = bind(socketDesc, (struct sockaddr *)&sockAddr, sizeof(sockAddr)); if (err < 0) { char tmp[64]; error("ERROR: bind(): %s", strerror_r(errno, tmp, sizeof(tmp))); - close(socketDesc); + CloseSocket(); return false; } - - dataPort = port; - socketActive = true; + // Update stream port + streamPort = port; } - - return true; - -} - -void cIptvStreamer::CloseSocket() -{ - if (socketActive) { - close(socketDesc); - socketActive = false; - mcastActive = false; - } -} - -bool cIptvStreamer::Activate() -{ - struct ip_mreq mreq; - - debug("cIptvStreamer::Activate(): stream = %s\n", stream); - - if (!stream) { - error("No stream set yet, not activating\n"); - return false; - } - - if (mcastActive) { - debug("cIptvStreamer::Activate(): Already active\n"); - return true; - } - - // Ensure that socket is valid - CheckAndCreateSocket(dataPort); - - // Join a new multicast group - mreq.imr_multiaddr.s_addr = inet_addr(stream); - mreq.imr_interface.s_addr = htonl(INADDR_ANY); - - int err = setsockopt(socketDesc, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, - sizeof(mreq)); - if (err < 0) { - char tmp[64]; - error("ERROR: setsockopt(): %s", strerror_r(errno, tmp, sizeof(tmp))); - return false; - } - - // Start thread - if (!Running()) - Start(); - - mcastActive = true; return true; } -bool cIptvStreamer::Deactivate() +void cIptvStreamer::CloseSocket(void) { - debug("cIptvStreamer::Deactivate(): stream = %s\n", stream); + debug("cIptvStreamer::CloseSocket()\n"); + // Check if socket exists + if (socketDesc >= 0) { + close(socketDesc); + socketDesc = -1; + } +} - // Stop thread - if (Running()) - Cancel(3); - - if (stream && mcastActive) { +bool cIptvStreamer::JoinMulticast(void) +{ + debug("cIptvStreamer::JoinMulticast()\n"); + // Check that stream address is valid + if (!mcastActive && !isempty(streamAddr)) { + // Ensure that socket is valid + OpenSocket(streamPort); + // Join a new multicast group struct ip_mreq mreq; - debug("cIptvStreamer::Deactivate(): Deactivating\n"); - mreq.imr_multiaddr.s_addr = inet_addr(stream); + mreq.imr_multiaddr.s_addr = inet_addr(streamAddr); mreq.imr_interface.s_addr = htonl(INADDR_ANY); - - int err = setsockopt(socketDesc, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, + int err = setsockopt(socketDesc, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); if (err < 0) { char tmp[64]; error("ERROR: setsockopt(): %s", strerror_r(errno, tmp, sizeof(tmp))); return false; } - - // Not active any more - mcastActive = false; + // Update multicasting flag + mcastActive = true; } + return true; +} +bool cIptvStreamer::DropMulticast(void) +{ + debug("cIptvStreamer::DropMulticast()\n"); + // Check that stream address is valid + if (mcastActive && !isempty(streamAddr)) { + // Ensure that socket is valid + OpenSocket(streamPort); + // Drop the multicast group + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr = inet_addr(streamAddr); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + int err = setsockopt(socketDesc, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, + sizeof(mreq)); + if (err < 0) { + char tmp[64]; + error("ERROR: setsockopt(): %s", strerror_r(errno, tmp, sizeof(tmp))); + return false; + } + // Update multicasting flag + mcastActive = false; + } + return true; +} + +bool cIptvStreamer::OpenStream(void) +{ + debug("cIptvStreamer::OpenStream(): streamAddr = %s\n", streamAddr); + // Join a new multicast group + JoinMulticast(); + // Start thread + Start(); + return true; +} + +bool cIptvStreamer::CloseStream(void) +{ + debug("cIptvStreamer::CloseStream(): streamAddr = %s\n", streamAddr); + // Stop thread + if (Running()) + Cancel(3); + // Drop the multicast group + DropMulticast(); return true; } bool cIptvStreamer::SetStream(const char* address, const int port, const char* protocol) { debug("cIptvStreamer::SetStream(): %s://%s:%d\n", protocol, address, port); - - // Deactivate the reception if it is running currently. Otherwise the - // reception stream is overwritten and cannot be un-set after this - Deactivate(); - - // Ensure that the socket is valid - CheckAndCreateSocket(port); - - // Check if the address fits into the buffer - if (strlen(address) > sizeof(stream)) { - error("ERROR: Address too big\n"); - return false; - } - strn0cpy(stream, address, sizeof(stream)); - + if (!isempty(address)) { + // Drop the multicast group + DropMulticast(); + // Update stream address and port + streamAddr = strcpyrealloc(streamAddr, address); + streamPort = port; + // Join a new multicast group + JoinMulticast(); + } return true; } diff --git a/streamer.h b/streamer.h index 75455fc..9d1916f 100644 --- a/streamer.h +++ b/streamer.h @@ -3,7 +3,7 @@ * * See the README file for copyright information and how to reach the author. * - * $Id: streamer.h,v 1.4 2007/09/12 21:55:57 rahrenbe Exp $ + * $Id: streamer.h,v 1.5 2007/09/13 16:58:22 rahrenbe Exp $ */ #ifndef __IPTV_STREAMER_H @@ -16,28 +16,30 @@ class cIptvStreamer : public cThread { private: - char stream[256]; + char* streamAddr; + int streamPort; int socketDesc; - int dataPort; - struct sockaddr_in sa; + struct sockaddr_in sockAddr; cRingBufferLinear* pRingBuffer; unsigned char* pReceiveBuffer; unsigned int bufferSize; cMutex* mutex; - bool socketActive; bool mcastActive; - bool CheckAndCreateSocket(const int port); - void CloseSocket(); +private: + bool OpenSocket(const int port); + void CloseSocket(void); + bool JoinMulticast(void); + bool DropMulticast(void); public: cIptvStreamer(); cIptvStreamer(cRingBufferLinear* Buffer, cMutex* Mutex); virtual ~cIptvStreamer(); - virtual void Action(); + virtual void Action(void); bool SetStream(const char* address, const int port, const char* protocol); - bool Activate(); - bool Deactivate(); + bool OpenStream(void); + bool CloseStream(void); }; #endif // __IPTV_STREAMER_H