From ede02949431440d7a988786625a050627fddc2c7 Mon Sep 17 00:00:00 2001 From: Nafets227 Date: Fri, 14 Nov 2014 21:28:27 +0100 Subject: [PATCH] Refactored polling. The original patch is polished and tweaked by Rolf Ahrenberg. --- .gitignore | 3 ++ Makefile | 3 +- device.c | 1 + device.h | 2 +- poller.c | 90 ++++++++++++------------------------------------------ poller.h | 20 ------------ pollerif.h | 7 ++--- rtcp.c | 40 ++++++++++++++++++++++++ rtcp.h | 32 +++++++++++++++++++ rtp.c | 40 ++++++++++++++++++++++++ rtp.h | 31 +++++++++++++++++++ rtsp.c | 7 ++--- tuner.c | 77 ++++++++++++++-------------------------------- tuner.h | 21 +++++-------- tunerif.h | 2 +- 15 files changed, 204 insertions(+), 172 deletions(-) create mode 100644 rtcp.c create mode 100644 rtcp.h create mode 100644 rtp.c create mode 100644 rtp.h diff --git a/.gitignore b/.gitignore index 13412ee..4e74255 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ *~ po/*.pot po/*.mo +.settings +.cproject +.project diff --git a/Makefile b/Makefile index dbf0f74..1287ca1 100644 --- a/Makefile +++ b/Makefile @@ -89,7 +89,8 @@ all-redirect: all ### The object files (add further files here): OBJS = $(PLUGIN).o common.o config.o device.o discover.o param.o poller.o \ - rtsp.o sectionfilter.o server.o setup.o socket.o statistics.o tuner.o + rtp.o rtcp.o rtsp.o sectionfilter.o server.o setup.o socket.o \ + statistics.o tuner.o ### The main target: diff --git a/device.c b/device.c index 2b543ae..83f2c3a 100644 --- a/device.c +++ b/device.c @@ -407,6 +407,7 @@ bool cSatipDevice::HasInternalCam(void) void cSatipDevice::WriteData(uchar *bufferP, int lengthP) { //debug("cSatipDevice::%s(%u)", __FUNCTION__, deviceIndexM); + AddTunerStatistic(lengthP); // Fill up TS buffer if (tsBufferM) { int len = tsBufferM->Put(bufferP, lengthP); diff --git a/device.h b/device.h index b852c01..370dcd8 100644 --- a/device.h +++ b/device.h @@ -15,7 +15,7 @@ #include "sectionfilter.h" #include "statistics.h" -class cSatipDevice : public cDevice, public cSatipPidStatistics, public cSatipBufferStatistics, public cSatipDeviceIf { +class cSatipDevice : public cDevice, public cSatipPidStatistics, public cSatipBufferStatistics, public cSatipTunerStatistics, public cSatipDeviceIf { // static ones public: static unsigned int deviceCount; diff --git a/poller.c b/poller.c index fc9b71d..07302b6 100644 --- a/poller.c +++ b/poller.c @@ -37,7 +37,6 @@ void cSatipPoller::Destroy(void) cSatipPoller::cSatipPoller() : cThread("SAT>IP poller"), mutexM(), - tunersM(new cSatipPollerTuners()), fdM(epoll_create(eMaxFileDescriptors)) { debug("cSatipPoller::%s()", __FUNCTION__); @@ -50,7 +49,6 @@ cSatipPoller::~cSatipPoller() cMutexLock MutexLock(&mutexM); close(fdM); // Free allocated memory - DELETENULL(tunersM); } void cSatipPoller::Activate(void) @@ -76,22 +74,11 @@ void cSatipPoller::Action(void) // Do the thread loop while (Running()) { int nfds = epoll_wait(fdM, events, eMaxFileDescriptors, -1); - if (nfds == -1) { - error("epoll_wait() failed"); - } - else if (nfds > 0) { - for (int i = 0; i < nfds; ++i) { - for (cSatipPollerTuner *tuner = tunersM->First(); tuner; tuner = tunersM->Next(tuner)) { - if (events[i].data.fd == tuner->VideoFd()) { - tuner->Poller()->ReadVideo(); - break; - } - else if (events[i].data.fd == tuner->ApplicationFd()) { - tuner->Poller()->ReadApplication(); - break; - } - } - } + ERROR_IF_FUNC((nfds == -1), "epoll_wait() failed", break, ;); + for (int i = 0; i < nfds; ++i) { + cSatipPollerIf* poll = reinterpret_cast(events[i].data.ptr); + if (poll) + poll->Action(events[i].events); } } debug("cSatipPoller::%s(): exiting", __FUNCTION__); @@ -99,63 +86,24 @@ void cSatipPoller::Action(void) bool cSatipPoller::Register(cSatipPollerIf &pollerP) { - debug("cSatipPoller::%s(%d)", __FUNCTION__, pollerP.GetPollerId()); + debug("cSatipPoller::%s(%d)", __FUNCTION__, pollerP.GetFd()); cMutexLock MutexLock(&mutexM); - if (tunersM && (fdM >= 0)) { - bool found = false; - for (cSatipPollerTuner *tuner = tunersM->First(); tuner; tuner = tunersM->Next(tuner)) { - if (tuner->Poller() == &pollerP) { - found = true; - break; - } - } - if (!found) { - cSatipPollerTuner *tmp = new cSatipPollerTuner(pollerP, pollerP.GetVideoFd(), pollerP.GetApplicationFd()); - if (tmp) { - tunersM->Add(tmp); - if (tmp->VideoFd() >= 0) { - struct epoll_event ev; - ev.events = EPOLLIN | EPOLLET; - ev.data.fd = tmp->VideoFd(); - if (epoll_ctl(fdM, EPOLL_CTL_ADD, pollerP.GetVideoFd(), &ev) == -1) { - error("Cannot add video socket into epoll [device %d]", pollerP.GetPollerId()); - } - } - if (tmp->ApplicationFd() >= 0) { - struct epoll_event ev; - ev.events = EPOLLIN | EPOLLET; - ev.data.fd = tmp->ApplicationFd(); - if (epoll_ctl(fdM, EPOLL_CTL_ADD, tmp->ApplicationFd(), &ev) == -1) { - error("Cannot add application socket into epoll [device %d]", pollerP.GetPollerId()); - } - } - debug("cSatipPoller::%s(%d): Added interface", __FUNCTION__, pollerP.GetPollerId()); - } - } - return true; - } - return false; + + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.ptr = &pollerP; + ERROR_IF_RET(epoll_ctl(fdM, EPOLL_CTL_ADD, pollerP.GetFd(), &ev) == -1, "epoll_ctl(EPOLL_CTL_ADD) failed", return false); + debug("cSatipPoller::%s(%d): Added interface", __FUNCTION__, pollerP.GetFd()); + + return true; } bool cSatipPoller::Unregister(cSatipPollerIf &pollerP) { - debug("cSatipPoller::%s(%d)", __FUNCTION__, pollerP.GetPollerId()); + debug("cSatipPoller::%s(%d)", __FUNCTION__, pollerP.GetFd()); cMutexLock MutexLock(&mutexM); - if (tunersM && (fdM >= 0)) { - for (cSatipPollerTuner *tuner = tunersM->First(); tuner; tuner = tunersM->Next(tuner)) { - if (tuner->Poller() == &pollerP) { - if ((tuner->VideoFd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, tuner->VideoFd(), NULL) == -1)) { - error("Cannot remove video socket from epoll [device %d]", pollerP.GetPollerId()); - } - if ((tuner->ApplicationFd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, tuner->ApplicationFd(), NULL) == -1)) { - error("Cannot remove application socket from epoll [device %d]", pollerP.GetPollerId()); - } - tunersM->Del(tuner); - debug("cSatipPoller::%s(%d): Removed interface", __FUNCTION__, pollerP.GetPollerId()); - return true; - } - } - } - return false; -} + ERROR_IF_RET((epoll_ctl(fdM, EPOLL_CTL_DEL, pollerP.GetFd(), NULL) == -1), "epoll_ctl(EPOLL_CTL_DEL) failed", return false); + debug("cSatipPoller::%s(%d): Removed interface", __FUNCTION__, pollerP.GetFd()); + return true; +} diff --git a/poller.h b/poller.h index 53b1b44..32216e9 100644 --- a/poller.h +++ b/poller.h @@ -13,25 +13,6 @@ #include "pollerif.h" -class cSatipPollerTuner : public cListObject { -private: - cSatipPollerIf* pollerM; - int videoFdM; - int applicationFdM; - -public: - cSatipPollerTuner(cSatipPollerIf &pollerP, int videoFdP, int applicationFdP) - { - pollerM = &pollerP; videoFdM = videoFdP; applicationFdM = applicationFdP; - } - cSatipPollerIf* Poller(void) { return pollerM; } - int VideoFd(void) { return videoFdM; } - int ApplicationFd(void) { return applicationFdM; } -}; - -class cSatipPollerTuners : public cList { -}; - class cSatipPoller : public cThread { private: enum { @@ -39,7 +20,6 @@ private: }; static cSatipPoller *instanceS; cMutex mutexM; - cSatipPollerTuners *tunersM; int fdM; void Activate(void); void Deactivate(void); diff --git a/pollerif.h b/pollerif.h index 9e04e51..cea1da5 100644 --- a/pollerif.h +++ b/pollerif.h @@ -12,11 +12,8 @@ class cSatipPollerIf { public: cSatipPollerIf() {} virtual ~cSatipPollerIf() {} - virtual void ReadVideo(void) = 0; - virtual void ReadApplication(void) = 0; - virtual int GetPollerId(void) = 0; - virtual int GetVideoFd(void) = 0; - virtual int GetApplicationFd(void) = 0; + virtual int GetFd(void) = 0; + virtual void Action(int fd) = 0; private: cSatipPollerIf(const cSatipPollerIf&); diff --git a/rtcp.c b/rtcp.c new file mode 100644 index 0000000..57ffc4b --- /dev/null +++ b/rtcp.c @@ -0,0 +1,40 @@ +/* + * rtcp.c: SAT>IP plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#include "common.h" +#include "rtcp.h" + +cSatipRtcp::cSatipRtcp(cSatipTunerIf &tunerP, unsigned int bufferLenP) +: tunerM(&tunerP), + bufferLenM(bufferLenP), + bufferM(MALLOC(unsigned char, bufferLenM)) +{ + if (bufferM) + memset(bufferM, 0, bufferLenM); + else + error("Cannot create RTCP buffer!"); +} + +cSatipRtcp::~cSatipRtcp() +{ + DELETE_POINTER(bufferM); +} + +int cSatipRtcp::GetFd(void) +{ + return Fd(); +} + +void cSatipRtcp::Action(int fdP) +{ + //debug("cSatipRtcp::%s(%d)", __FUNCTION__, fdP); + if (bufferM) { + int length = ReadApplication(bufferM, bufferLenM); + if (length > 0) + tunerM->ParseReceptionParameters(bufferM, length); + } +} diff --git a/rtcp.h b/rtcp.h new file mode 100644 index 0000000..4ea624d --- /dev/null +++ b/rtcp.h @@ -0,0 +1,32 @@ +/* + * rtcp.h: SAT>IP plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#ifndef __SATIP_RTCP_H_ +#define __SATIP_RTCP_H_ + +#include "common.h" +#include "socket.h" +#include "tunerif.h" +#include "pollerif.h" + +class cSatipRtcp : public cSatipSocket, public cSatipPollerIf +{ +private: + cSatipTunerIf *tunerM; + unsigned int bufferLenM; + unsigned char *bufferM; + +protected: + virtual int GetFd(void); + virtual void Action(int fdP); + +public: + cSatipRtcp(cSatipTunerIf &tunerP, unsigned int bufferLenP); + virtual ~cSatipRtcp(); +}; + +#endif /* __SATIP_RTCP_H_ */ diff --git a/rtp.c b/rtp.c new file mode 100644 index 0000000..c756edd --- /dev/null +++ b/rtp.c @@ -0,0 +1,40 @@ +/* + * rtp.c: SAT>IP plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#include "common.h" +#include "rtp.h" + +cSatipRtp::cSatipRtp(cSatipDeviceIf &deviceP, unsigned int bufferLenP) +: deviceM(&deviceP), + bufferLenM(bufferLenP), + bufferM(MALLOC(unsigned char, bufferLenM)) +{ + if (bufferM) + memset(bufferM, 0, bufferLenM); + else + error("Cannot create RTP buffer!"); +} + +cSatipRtp::~cSatipRtp() +{ + DELETE_POINTER(bufferM); +} + +int cSatipRtp::GetFd(void) +{ + return Fd(); +} + +void cSatipRtp::Action(int fdP) +{ + //debug("cSatipRtp::%s(%d)", __FUNCTION__, fdP); + if (bufferM) { + int length = ReadVideo(bufferM, min(deviceM->CheckData(), bufferLenM)); + if (length > 0) + deviceM->WriteData(bufferM, length); + } +} diff --git a/rtp.h b/rtp.h new file mode 100644 index 0000000..0321538 --- /dev/null +++ b/rtp.h @@ -0,0 +1,31 @@ +/* + * rtp.h: SAT>IP plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#ifndef __SATIP_RTP_H_ +#define __SATIP_RTP_H_ + +#include "socket.h" +#include "deviceif.h" +#include "pollerif.h" +#include "statistics.h" + +class cSatipRtp : public cSatipSocket, public cSatipPollerIf { +private: + cSatipDeviceIf *deviceM; + unsigned int bufferLenM; + unsigned char *bufferM; + +protected: + virtual int GetFd(void); + virtual void Action(int fdP); + +public: + cSatipRtp(cSatipDeviceIf &deviceP, unsigned int bufferLenP); + virtual ~cSatipRtp(); +}; + +#endif /* __SATIP_RTP_H_ */ diff --git a/rtsp.c b/rtsp.c index 826c511..61baa97 100644 --- a/rtsp.c +++ b/rtsp.c @@ -88,11 +88,8 @@ size_t cSatipRtsp::WriteCallback(void *ptrP, size_t sizeP, size_t nmembP, void * size_t len = sizeP * nmembP; //debug("cSatipRtsp::%s(%zu)", __FUNCTION__, len); - if (obj && obj->tunerM && (len > 0)) { - char *data = strndup((char*)ptrP, len); - obj->tunerM->ParseReceptionParameters(data); - FREE_POINTER(data); - } + if (obj && obj->tunerM && (len > 0)) + obj->tunerM->ParseReceptionParameters((u_char*)ptrP, len); return len; } diff --git a/tuner.c b/tuner.c index 0eed0f8..a05c16b 100644 --- a/tuner.c +++ b/tuner.c @@ -16,10 +16,9 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) sleepM(), deviceM(&deviceP), deviceIdM(deviceP.GetId()), - packetBufferLenM(packetLenP), rtspM(new cSatipRtsp(*this)), - rtpSocketM(new cSatipSocket()), - rtcpSocketM(new cSatipSocket()), + rtpM(new cSatipRtp(deviceP, packetLenP)), + rtcpM(new cSatipRtcp(*this, 1500)), streamAddrM(""), streamParamM(""), currentServerM(NULL), @@ -40,27 +39,22 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) delPidsM(), pidsM() { - debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, packetBufferLenM, deviceIdM); - // Allocate packet buffer - packetBufferM = MALLOC(unsigned char, packetBufferLenM); - if (packetBufferM) - memset(packetBufferM, 0, packetBufferLenM); - else - error("MALLOC() failed for packet buffer [device %d]", deviceIdM); + debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, packetLenP, deviceIdM); // Open sockets int i = 100; while (i-- > 0) { - if (rtpSocketM->Open(0) && rtcpSocketM->Open(rtpSocketM->Port() + 1)) + if (rtpM->Open(0) && rtcpM->Open(rtpM->Port() + 1)) break; - rtpSocketM->Close(); - rtcpSocketM->Close(); + rtpM->Close(); + rtcpM->Close(); } - if ((rtpSocketM->Port() <= 0) || (rtcpSocketM->Port() <= 0)) { + if ((rtpM->Port() <= 0) || (rtcpM->Port() <= 0)) { error("Cannot open required RTP/RTCP ports [device %d]", deviceIdM); } // Must be done after socket initialization! - cSatipPoller::GetInstance()->Register(*this); + cSatipPoller::GetInstance()->Register(*rtpM); + cSatipPoller::GetInstance()->Register(*rtcpM); // Start thread Start(); @@ -69,6 +63,7 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) cSatipTuner::~cSatipTuner() { debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM); + // Stop thread sleepM.Signal(); if (Running()) @@ -76,14 +71,14 @@ cSatipTuner::~cSatipTuner() Close(); // Close the listening sockets - cSatipPoller::GetInstance()->Unregister(*this); - rtpSocketM->Close(); - rtcpSocketM->Close(); + cSatipPoller::GetInstance()->Unregister(*rtcpM); + cSatipPoller::GetInstance()->Unregister(*rtpM); + rtcpM->Close(); + rtpM->Close(); // Free allocated memory - free(packetBufferM); - DELETENULL(rtcpSocketM); - DELETENULL(rtpSocketM); + DELETENULL(rtpM); + DELETENULL(rtcpM); DELETENULL(rtspM); } @@ -143,9 +138,8 @@ bool cSatipTuner::Connect(void) keepAliveM.Set(0); KeepAlive(); // Flush any old content - if (rtpSocketM) - rtpSocketM->Flush(); - openedM = rtspM->Setup(*uri, rtpSocketM->Port(), rtcpSocketM->Port()); + rtpM->Flush(); + openedM = rtspM->Setup(*uri, rtpM->Port(), rtcpM->Port()); return openedM; } keepAliveM.Set(timeoutM); @@ -153,7 +147,7 @@ bool cSatipTuner::Connect(void) if (openedM) { if (nextServerM && nextServerM->Quirk(cSatipServer::eSatipQuirkSessionId)) rtspM->SetSession(SkipZeroes(*sessionM)); - if (rtspM->Setup(*uri, rtpSocketM->Port(), rtcpSocketM->Port())) { + if (rtspM->Setup(*uri, rtpM->Port(), rtcpM->Port())) { tunedM = true; UpdatePids(true); if (nextServerM) { @@ -201,15 +195,15 @@ bool cSatipTuner::Disconnect(void) return true; } -void cSatipTuner::ParseReceptionParameters(const char *paramP) +void cSatipTuner::ParseReceptionParameters(u_char *bufferP, int lengthP) { - //debug("cSatipTuner::%s(%s) [device %d]", __FUNCTION__, paramP, deviceIdM); + //debug("cSatipTuner::%s(%s, %d) [device %d]", __FUNCTION__, bufferP, lengthP, deviceIdM); // DVB-S2: // ver=.;src=;tuner=,,,,,,,,,,,;pids=,..., // DVB-T2: // ver=1.1;tuner=,,,,,,,,,,,,,;pids=,..., - if (!isempty(paramP)) { - char *s = strdup(paramP); + if (lengthP > 0) { + char *s = strdup((char *)bufferP); char *c = strstr(s, ";tuner="); if (c) { int value; @@ -404,28 +398,3 @@ cString cSatipTuner::GetInformation(void) //debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM); return tunedM ? cString::sprintf("rtsp://%s/?%s [stream=%d]", *streamAddrM, *streamParamM, streamIdM) : "connection failed"; } - -void cSatipTuner::ReadVideo(void) -{ - //debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM); - //cMutexLock MutexLock(&mutexM); - if (deviceM && packetBufferM && rtpSocketM) { - int length = rtpSocketM->ReadVideo(packetBufferM, min(deviceM->CheckData(), packetBufferLenM)); - if (length > 0) { - AddTunerStatistic(length); - deviceM->WriteData(packetBufferM, length); - } - } -} - -void cSatipTuner::ReadApplication(void) -{ - //debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM); - //cMutexLock MutexLock(&mutexM); - if (deviceM && packetBufferM && rtcpSocketM) { - unsigned char buf[1450]; - memset(buf, 0, sizeof(buf)); - if (rtcpSocketM->ReadApplication(buf, sizeof(buf)) > 0) - ParseReceptionParameters((const char *)buf); - } -} diff --git a/tuner.h b/tuner.h index 180aa10..282f32a 100644 --- a/tuner.h +++ b/tuner.h @@ -12,7 +12,8 @@ #include #include "deviceif.h" -#include "pollerif.h" +#include "rtp.h" +#include "rtcp.h" #include "rtsp.h" #include "server.h" #include "statistics.h" @@ -44,7 +45,8 @@ public: } }; -class cSatipTuner : public cThread, public cSatipTunerStatistics, public cSatipTunerIf, public cSatipPollerIf { +class cSatipTuner : public cThread, public cSatipTunerStatistics, public cSatipTunerIf +{ private: enum { eDummyPid = 100, @@ -60,11 +62,9 @@ private: cCondWait sleepM; cSatipDeviceIf* deviceM; int deviceIdM; - unsigned char* packetBufferM; - unsigned int packetBufferLenM; cSatipRtsp *rtspM; - cSatipSocket *rtpSocketM; - cSatipSocket *rtcpSocketM; + cSatipRtp *rtpM; + cSatipRtcp *rtcpM; cString streamAddrM; cString streamParamM; cSatipServer *currentServerM; @@ -112,17 +112,10 @@ public: // for internal tuner interface public: - virtual void ParseReceptionParameters(const char *paramP); + virtual void ParseReceptionParameters(u_char *bufferP, int lengthP); virtual void SetStreamId(int streamIdP); virtual void SetSessionTimeout(const char *sessionP, int timeoutP); virtual int GetId(void); - - // for internal poller interface - virtual void ReadVideo(void); - virtual void ReadApplication(void); - virtual int GetPollerId(void) { return GetId(); } - virtual int GetVideoFd(void) { return rtpSocketM ? rtpSocketM->Fd() : -1; }; - virtual int GetApplicationFd(void) { return rtcpSocketM ? rtcpSocketM->Fd() : -1; } }; #endif // __SATIP_TUNER_H diff --git a/tunerif.h b/tunerif.h index 6d2585a..6a716fb 100644 --- a/tunerif.h +++ b/tunerif.h @@ -12,7 +12,7 @@ class cSatipTunerIf { public: cSatipTunerIf() {} virtual ~cSatipTunerIf() {} - virtual void ParseReceptionParameters(const char *paramP) = 0; + virtual void ParseReceptionParameters(u_char *bufferP, int lenghtP) = 0; virtual void SetStreamId(int streamIdP) = 0; virtual void SetSessionTimeout(const char *sessionP, int timeoutP) = 0; virtual int GetId(void) = 0;