From e05801b464a0cf956214684d3557b611396d46ec Mon Sep 17 00:00:00 2001 From: Rolf Ahrenberg Date: Tue, 11 Nov 2014 20:23:41 +0200 Subject: [PATCH] Refactored epoll() to use only one thread. --- Makefile | 4 +- common.h | 3 + device.c | 2 - poller.c | 161 +++++++++++++++++++++++++++++++++++++++++++++++++++++ poller.h | 64 +++++++++++++++++++++ pollerif.h | 26 +++++++++ satip.c | 3 + tuner.c | 134 +++++++++++++++++--------------------------- tuner.h | 13 +++-- 9 files changed, 319 insertions(+), 91 deletions(-) create mode 100644 poller.c create mode 100644 poller.h create mode 100644 pollerif.h diff --git a/Makefile b/Makefile index aba1c46..dbf0f74 100644 --- a/Makefile +++ b/Makefile @@ -88,8 +88,8 @@ all-redirect: all ### The object files (add further files here): -OBJS = $(PLUGIN).o common.o config.o device.o discover.o param.o rtsp.o \ - sectionfilter.o server.o setup.o socket.o statistics.o tuner.o +OBJS = $(PLUGIN).o common.o config.o device.o discover.o param.o poller.o \ + rtsp.o sectionfilter.o server.o setup.o socket.o statistics.o tuner.o ### The main target: diff --git a/common.h b/common.h index fa2ab43..08ea6e0 100644 --- a/common.h +++ b/common.h @@ -8,6 +8,7 @@ #ifndef __SATIP_COMMON_H #define __SATIP_COMMON_H +#include #include #include #include @@ -24,6 +25,8 @@ #define ELEMENTS(x) (sizeof(x) / sizeof(x[0])) +#define SATIP_MAX_DEVICES MAXDEVICES + #define SATIP_BUFFER_SIZE KILOBYTE(512) #define SATIP_DEVICE_INFO_ALL 0 diff --git a/device.c b/device.c index 5f732aa..2b543ae 100644 --- a/device.c +++ b/device.c @@ -12,8 +12,6 @@ #include "param.h" #include "device.h" -#define SATIP_MAX_DEVICES MAXDEVICES - static cSatipDevice * SatipDevicesS[SATIP_MAX_DEVICES] = { NULL }; cSatipDevice::cSatipDevice(unsigned int indexP) diff --git a/poller.c b/poller.c new file mode 100644 index 0000000..fc9b71d --- /dev/null +++ b/poller.c @@ -0,0 +1,161 @@ +/* + * poller.c: SAT>IP plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#include + +#include "common.h" +#include "poller.h" + +cSatipPoller *cSatipPoller::instanceS = NULL; + +cSatipPoller *cSatipPoller::GetInstance(void) +{ + if (!instanceS) + instanceS = new cSatipPoller(); + return instanceS; +} + +bool cSatipPoller::Initialize(void) +{ + debug("cSatipPoller::%s()", __FUNCTION__); + if (instanceS) + instanceS->Activate(); + return true; +} + +void cSatipPoller::Destroy(void) +{ + debug("cSatipPoller::%s()", __FUNCTION__); + if (instanceS) + instanceS->Deactivate(); +} + +cSatipPoller::cSatipPoller() +: cThread("SAT>IP poller"), + mutexM(), + tunersM(new cSatipPollerTuners()), + fdM(epoll_create(eMaxFileDescriptors)) +{ + debug("cSatipPoller::%s()", __FUNCTION__); +} + +cSatipPoller::~cSatipPoller() +{ + debug("cSatipPoller::%s()", __FUNCTION__); + Deactivate(); + cMutexLock MutexLock(&mutexM); + close(fdM); + // Free allocated memory + DELETENULL(tunersM); +} + +void cSatipPoller::Activate(void) +{ + // Start the thread + Start(); +} + +void cSatipPoller::Deactivate(void) +{ + debug("cSatipPoller::%s()", __FUNCTION__); + cMutexLock MutexLock(&mutexM); + if (Running()) + Cancel(3); +} + +void cSatipPoller::Action(void) +{ + debug("cSatipPoller::%s(): entering", __FUNCTION__); + struct epoll_event events[eMaxFileDescriptors]; + // Increase priority + SetPriority(-1); + // Do the thread loop + while (Running()) { + int nfds = epoll_wait(fdM, events, eMaxFileDescriptors, -1); + if (nfds == -1) { + error("epoll_wait() failed"); + } + else if (nfds > 0) { + for (int i = 0; i < nfds; ++i) { + for (cSatipPollerTuner *tuner = tunersM->First(); tuner; tuner = tunersM->Next(tuner)) { + if (events[i].data.fd == tuner->VideoFd()) { + tuner->Poller()->ReadVideo(); + break; + } + else if (events[i].data.fd == tuner->ApplicationFd()) { + tuner->Poller()->ReadApplication(); + break; + } + } + } + } + } + debug("cSatipPoller::%s(): exiting", __FUNCTION__); +} + +bool cSatipPoller::Register(cSatipPollerIf &pollerP) +{ + debug("cSatipPoller::%s(%d)", __FUNCTION__, pollerP.GetPollerId()); + cMutexLock MutexLock(&mutexM); + if (tunersM && (fdM >= 0)) { + bool found = false; + for (cSatipPollerTuner *tuner = tunersM->First(); tuner; tuner = tunersM->Next(tuner)) { + if (tuner->Poller() == &pollerP) { + found = true; + break; + } + } + if (!found) { + cSatipPollerTuner *tmp = new cSatipPollerTuner(pollerP, pollerP.GetVideoFd(), pollerP.GetApplicationFd()); + if (tmp) { + tunersM->Add(tmp); + if (tmp->VideoFd() >= 0) { + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.fd = tmp->VideoFd(); + if (epoll_ctl(fdM, EPOLL_CTL_ADD, pollerP.GetVideoFd(), &ev) == -1) { + error("Cannot add video socket into epoll [device %d]", pollerP.GetPollerId()); + } + } + if (tmp->ApplicationFd() >= 0) { + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.fd = tmp->ApplicationFd(); + if (epoll_ctl(fdM, EPOLL_CTL_ADD, tmp->ApplicationFd(), &ev) == -1) { + error("Cannot add application socket into epoll [device %d]", pollerP.GetPollerId()); + } + } + debug("cSatipPoller::%s(%d): Added interface", __FUNCTION__, pollerP.GetPollerId()); + } + } + return true; + } + return false; +} + +bool cSatipPoller::Unregister(cSatipPollerIf &pollerP) +{ + debug("cSatipPoller::%s(%d)", __FUNCTION__, pollerP.GetPollerId()); + cMutexLock MutexLock(&mutexM); + if (tunersM && (fdM >= 0)) { + for (cSatipPollerTuner *tuner = tunersM->First(); tuner; tuner = tunersM->Next(tuner)) { + if (tuner->Poller() == &pollerP) { + if ((tuner->VideoFd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, tuner->VideoFd(), NULL) == -1)) { + error("Cannot remove video socket from epoll [device %d]", pollerP.GetPollerId()); + } + if ((tuner->ApplicationFd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, tuner->ApplicationFd(), NULL) == -1)) { + error("Cannot remove application socket from epoll [device %d]", pollerP.GetPollerId()); + } + tunersM->Del(tuner); + debug("cSatipPoller::%s(%d): Removed interface", __FUNCTION__, pollerP.GetPollerId()); + return true; + } + } + } + return false; +} + diff --git a/poller.h b/poller.h new file mode 100644 index 0000000..53b1b44 --- /dev/null +++ b/poller.h @@ -0,0 +1,64 @@ +/* + * poller.h: SAT>IP plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#ifndef __SATIP_POLLER_H +#define __SATIP_POLLER_H + +#include +#include + +#include "pollerif.h" + +class cSatipPollerTuner : public cListObject { +private: + cSatipPollerIf* pollerM; + int videoFdM; + int applicationFdM; + +public: + cSatipPollerTuner(cSatipPollerIf &pollerP, int videoFdP, int applicationFdP) + { + pollerM = &pollerP; videoFdM = videoFdP; applicationFdM = applicationFdP; + } + cSatipPollerIf* Poller(void) { return pollerM; } + int VideoFd(void) { return videoFdM; } + int ApplicationFd(void) { return applicationFdM; } +}; + +class cSatipPollerTuners : public cList { +}; + +class cSatipPoller : public cThread { +private: + enum { + eMaxFileDescriptors = SATIP_MAX_DEVICES * 2, // Data + Application + }; + static cSatipPoller *instanceS; + cMutex mutexM; + cSatipPollerTuners *tunersM; + int fdM; + void Activate(void); + void Deactivate(void); + // constructor + cSatipPoller(); + // to prevent copy constructor and assignment + cSatipPoller(const cSatipPoller&); + cSatipPoller& operator=(const cSatipPoller&); + +protected: + virtual void Action(void); + +public: + static cSatipPoller *GetInstance(void); + static bool Initialize(void); + static void Destroy(void); + virtual ~cSatipPoller(); + bool Register(cSatipPollerIf &pollerP); + bool Unregister(cSatipPollerIf &pollerP); +}; + +#endif // __SATIP_POLLER_H diff --git a/pollerif.h b/pollerif.h new file mode 100644 index 0000000..9e04e51 --- /dev/null +++ b/pollerif.h @@ -0,0 +1,26 @@ +/* + * pollerif.h: SAT>IP plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#ifndef __SATIP_POLLERIF_H +#define __SATIP_POLLERIF_H + +class cSatipPollerIf { +public: + cSatipPollerIf() {} + virtual ~cSatipPollerIf() {} + virtual void ReadVideo(void) = 0; + virtual void ReadApplication(void) = 0; + virtual int GetPollerId(void) = 0; + virtual int GetVideoFd(void) = 0; + virtual int GetApplicationFd(void) = 0; + +private: + cSatipPollerIf(const cSatipPollerIf&); + cSatipPollerIf& operator=(const cSatipPollerIf&); +}; + +#endif // __SATIP_POLLERIF_H diff --git a/satip.c b/satip.c index c0847e9..6b1255d 100644 --- a/satip.c +++ b/satip.c @@ -11,6 +11,7 @@ #include "config.h" #include "device.h" #include "discover.h" +#include "poller.h" #include "setup.h" #if defined(LIBCURL_VERSION_NUM) && LIBCURL_VERSION_NUM < 0x072400 @@ -117,6 +118,7 @@ bool cPluginSatip::Initialize(void) error("Unable to initialize CURL"); SatipConfig.SetConfigDirectory(cPlugin::ResourceDirectory(PLUGIN_NAME_I18N)); cSatipDiscover::GetInstance()->Initialize(serversM); + cSatipPoller::GetInstance()->Initialize(); return cSatipDevice::Initialize(deviceCountM); } @@ -140,6 +142,7 @@ void cPluginSatip::Stop(void) debug("cPluginSatip::%s()", __FUNCTION__); // Stop any background activities the plugin is performing. cSatipDevice::Shutdown(); + cSatipPoller::GetInstance()->Destroy(); cSatipDiscover::GetInstance()->Destroy(); curl_global_cleanup(); } diff --git a/tuner.c b/tuner.c index 7c85431..0eed0f8 100644 --- a/tuner.c +++ b/tuner.c @@ -5,11 +5,10 @@ * */ -#include - #include "common.h" #include "config.h" #include "discover.h" +#include "poller.h" #include "tuner.h" cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) @@ -30,7 +29,6 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) statusUpdateM(), pidUpdateCacheM(), sessionM(""), - fdM(epoll_create(eMaxFileDescriptors)), timeoutM(eMinKeepAliveIntervalMs), openedM(false), tunedM(false), @@ -61,26 +59,8 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) if ((rtpSocketM->Port() <= 0) || (rtcpSocketM->Port() <= 0)) { error("Cannot open required RTP/RTCP ports [device %d]", deviceIdM); } - - // Setup epoll - if (fdM >= 0) { - if (rtpSocketM->Fd() >= 0) { - struct epoll_event ev; - ev.events = EPOLLIN | EPOLLET; - ev.data.fd = rtpSocketM->Fd(); - if (epoll_ctl(fdM, EPOLL_CTL_ADD, rtpSocketM->Fd(), &ev) == -1) { - error("Cannot add RTP socket into epoll [device %d]", deviceIdM); - } - } - if (rtcpSocketM->Fd() >= 0) { - struct epoll_event ev; - ev.events = EPOLLIN | EPOLLET; - ev.data.fd = rtcpSocketM->Fd(); - if (epoll_ctl(fdM, EPOLL_CTL_ADD, rtcpSocketM->Fd(), &ev) == -1) { - error("Cannot add RTP socket into epoll [device %d]", deviceIdM); - } - } - } + // Must be done after socket initialization! + cSatipPoller::GetInstance()->Register(*this); // Start thread Start(); @@ -95,18 +75,8 @@ cSatipTuner::~cSatipTuner() Cancel(3); Close(); - // Cleanup epoll - if (fdM >= 0) { - if ((rtpSocketM->Fd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, rtpSocketM->Fd(), NULL) == -1)) { - error("Cannot remove RTP socket from epoll [device %d]", deviceIdM); - } - if ((rtcpSocketM->Fd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, rtcpSocketM->Fd(), NULL) == -1)) { - error("Cannot remove RTP socket from epoll [device %d]", deviceIdM); - } - close(fdM); - } - // Close the listening sockets + cSatipPoller::GetInstance()->Unregister(*this); rtpSocketM->Close(); rtcpSocketM->Close(); @@ -121,57 +91,28 @@ void cSatipTuner::Action(void) { debug("cSatipTuner::%s(): entering [device %d]", __FUNCTION__, deviceIdM); cTimeMs timeout(eReConnectTimeoutMs); - // Increase priority - SetPriority(-1); // Do the thread loop - while (packetBufferM && rtpSocketM && rtcpSocketM && Running()) { - struct epoll_event events[eMaxFileDescriptors]; - int nfds = epoll_wait(fdM, events, eMaxFileDescriptors, eReadTimeoutMs); - switch (nfds) { - default: - for (int i = 0; i < nfds; ++i) { - timeout.Set(eReConnectTimeoutMs); - if (events[i].data.fd == rtpSocketM->Fd()) { - // Read data - int length = rtpSocketM->ReadVideo(packetBufferM, min(deviceM->CheckData(), packetBufferLenM)); - if (length > 0) { - AddTunerStatistic(length); - deviceM->WriteData(packetBufferM, length); - } - } - else if (events[i].data.fd == rtcpSocketM->Fd()) { - unsigned char buf[1450]; - memset(buf, 0, sizeof(buf)); - if (rtcpSocketM->ReadApplication(buf, sizeof(buf)) > 0) - ParseReceptionParameters((const char *)buf); - } - } - // fall through! - case 0: - // Update pids - UpdatePids(); - // Remember the heart beat - KeepAlive(); - // Read reception statistics via DESCRIBE and RTCP - if (ReadReceptionStatus()) { - // Quirk for devices without valid reception data - if (currentServerM && currentServerM->Quirk(cSatipServer::eSatipQuirkForceLock)) { - hasLockM = true; - signalStrengthM = eDefaultSignalStrength; - signalQualityM = eDefaultSignalQuality; - } - } - // Reconnect if necessary - if (openedM && timeout.TimedOut()) { - Disconnect(); - Connect(); - timeout.Set(eReConnectTimeoutMs); - } - break; - case -1: - error("epoll_wait() failed"); - break; + while (Running()) { + // Update pids + UpdatePids(); + // Remember the heart beat + KeepAlive(); + // Read reception statistics via DESCRIBE and RTCP + if (ReadReceptionStatus()) { + // Quirk for devices without valid reception data + if (currentServerM && currentServerM->Quirk(cSatipServer::eSatipQuirkForceLock)) { + hasLockM = true; + signalStrengthM = eDefaultSignalStrength; + signalQualityM = eDefaultSignalQuality; + } } + // Reconnect if necessary + if (openedM && timeout.TimedOut()) { + Disconnect(); + Connect(); + timeout.Set(eReConnectTimeoutMs); + } + sleepM.Wait(100); // to avoid busy loop and reduce cpu load } debug("cSatipTuner::%s(): exiting [device %d]", __FUNCTION__, deviceIdM); } @@ -343,6 +284,7 @@ bool cSatipTuner::SetSource(cSatipServer *serverP, const char *parameterP, const } else Disconnect(); + sleepM.Signal(); return true; } @@ -361,6 +303,7 @@ bool cSatipTuner::SetPid(int pidP, int typeP, bool onP) addPidsM.RemovePid(pidP); } pidUpdateCacheM.Set(ePidUpdateIntervalMs); + sleepM.Signal(); return true; } @@ -461,3 +404,28 @@ cString cSatipTuner::GetInformation(void) //debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM); return tunedM ? cString::sprintf("rtsp://%s/?%s [stream=%d]", *streamAddrM, *streamParamM, streamIdM) : "connection failed"; } + +void cSatipTuner::ReadVideo(void) +{ + //debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM); + //cMutexLock MutexLock(&mutexM); + if (deviceM && packetBufferM && rtpSocketM) { + int length = rtpSocketM->ReadVideo(packetBufferM, min(deviceM->CheckData(), packetBufferLenM)); + if (length > 0) { + AddTunerStatistic(length); + deviceM->WriteData(packetBufferM, length); + } + } +} + +void cSatipTuner::ReadApplication(void) +{ + //debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM); + //cMutexLock MutexLock(&mutexM); + if (deviceM && packetBufferM && rtcpSocketM) { + unsigned char buf[1450]; + memset(buf, 0, sizeof(buf)); + if (rtcpSocketM->ReadApplication(buf, sizeof(buf)) > 0) + ParseReceptionParameters((const char *)buf); + } +} diff --git a/tuner.h b/tuner.h index f223af7..180aa10 100644 --- a/tuner.h +++ b/tuner.h @@ -12,6 +12,7 @@ #include #include "deviceif.h" +#include "pollerif.h" #include "rtsp.h" #include "server.h" #include "statistics.h" @@ -43,14 +44,12 @@ public: } }; -class cSatipTuner : public cThread, public cSatipTunerStatistics, public cSatipTunerIf { +class cSatipTuner : public cThread, public cSatipTunerStatistics, public cSatipTunerIf, public cSatipPollerIf { private: enum { - eMaxFileDescriptors = 2, // RTP + RTCP eDummyPid = 100, eDefaultSignalStrength = 15, eDefaultSignalQuality = 224, - eReadTimeoutMs = 500, // in milliseconds eStatusUpdateTimeoutMs = 1000, // in milliseconds eConnectTimeoutMs = 1500, // in milliseconds ePidUpdateIntervalMs = 250, // in milliseconds @@ -76,7 +75,6 @@ private: cTimeMs signalInfoCacheM; cTimeMs pidUpdateCacheM; cString sessionM; - int fdM; int timeoutM; bool openedM; bool tunedM; @@ -118,6 +116,13 @@ public: virtual void SetStreamId(int streamIdP); virtual void SetSessionTimeout(const char *sessionP, int timeoutP); virtual int GetId(void); + + // for internal poller interface + virtual void ReadVideo(void); + virtual void ReadApplication(void); + virtual int GetPollerId(void) { return GetId(); } + virtual int GetVideoFd(void) { return rtpSocketM ? rtpSocketM->Fd() : -1; }; + virtual int GetApplicationFd(void) { return rtcpSocketM ? rtcpSocketM->Fd() : -1; } }; #endif // __SATIP_TUNER_H