diff --git a/HISTORY b/HISTORY index c8c3139..717aeb1 100644 --- a/HISTORY +++ b/HISTORY @@ -80,3 +80,4 @@ VDR Plugin 'satip' Revision History - Added support for SAT>IP frontend selection via Radio ID. - Fixed EIT scan (Thanks to Stefan Schallenberg). +- Refactored input thread to increase performance. diff --git a/discover.c b/discover.c index bee329a..33b393d 100644 --- a/discover.c +++ b/discover.c @@ -94,6 +94,35 @@ size_t cSatipDiscover::WriteCallback(char *ptrP, size_t sizeP, size_t nmembP, vo return len; } +int cSatipDiscover::DebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP) +{ + cSatipDiscover *obj = reinterpret_cast(userPtrP); + + if (obj) { + switch (typeP) { + case CURLINFO_TEXT: + debug("cSatipDiscover::%s(): HTTP INFO %.*s", __FUNCTION__, (int)sizeP, dataP); + break; + case CURLINFO_HEADER_IN: + debug("cSatipDiscover::%s(): HTTP HEAD <<< %.*s", __FUNCTION__, (int)sizeP, dataP); + break; + case CURLINFO_HEADER_OUT: + debug("cSatipDiscover::%s(): HTTP HEAD >>>\n%.*s", __FUNCTION__, (int)sizeP, dataP); + break; + case CURLINFO_DATA_IN: + debug("cSatipDiscover::%s(): HTTP DATA <<< %.*s", __FUNCTION__, (int)sizeP, dataP); + break; + case CURLINFO_DATA_OUT: + debug("cSatipDiscover::%s(): HTTP DATA >>>\n%.*s", __FUNCTION__, (int)sizeP, dataP); + break; + default: + break; + } + } + + return 0; +} + cSatipDiscover::cSatipDiscover() : cThread("SAT>IP discover"), mutexM(), @@ -220,6 +249,8 @@ void cSatipDiscover::Read(void) #ifdef DEBUG // Verbose output SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_VERBOSE, 1L); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_DEBUGFUNCTION, cSatipDiscover::DebugCallback); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_DEBUGDATA, this); #endif // Set callback SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_WRITEFUNCTION, cSatipDiscover::WriteCallback); diff --git a/discover.h b/discover.h index 1d38b71..29b63e2 100644 --- a/discover.h +++ b/discover.h @@ -47,6 +47,7 @@ private: static const char *bcastAddressS; static const char *bcastMessageS; static size_t WriteCallback(char *ptrP, size_t sizeP, size_t nmembP, void *dataP); + static int DebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP); cMutex mutexM; CURL *handleM; cSatipSocket *socketM; diff --git a/rtsp.h b/rtsp.h index 75b77e7..81d152d 100644 --- a/rtsp.h +++ b/rtsp.h @@ -24,7 +24,7 @@ private: static int DebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP); enum { - eConnectTimeoutMs = 1500, // in milliseconds + eConnectTimeoutMs = 1500, // in milliseconds }; cSatipTunerIf* tunerM; diff --git a/socket.h b/socket.h index 8b97bde..aa082c7 100644 --- a/socket.h +++ b/socket.h @@ -27,6 +27,7 @@ public: ~cSatipSocket(); bool Open(const int portP = 0); void Close(void); + int Fd(void) { return socketDescM; } int Port(void) { return socketPortM; } bool IsOpen(void) { return (socketDescM >= 0); } bool Flush(void); diff --git a/tuner.c b/tuner.c index 3619d17..abe4a85 100644 --- a/tuner.c +++ b/tuner.c @@ -5,6 +5,8 @@ * */ +#include + #include "common.h" #include "config.h" #include "discover.h" @@ -28,6 +30,7 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) statusUpdateM(), pidUpdateCacheM(), sessionM(""), + fdM(epoll_create(eMaxFileDescriptors)), timeoutM(eMinKeepAliveIntervalMs), openedM(false), tunedM(false), @@ -50,7 +53,7 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) // Open sockets int i = 100; while (i-- > 0) { - if (rtpSocketM->Open() && rtcpSocketM->Open(rtpSocketM->Port() + 1)) + if (rtpSocketM->Open(0) && rtcpSocketM->Open(rtpSocketM->Port() + 1)) break; rtpSocketM->Close(); rtcpSocketM->Close(); @@ -59,6 +62,26 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) error("Cannot open required RTP/RTCP ports [device %d]", deviceIdM); } + // Setup epoll + if (fdM >= 0) { + if (rtpSocketM->Fd() >= 0) { + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.fd = rtpSocketM->Fd(); + if (epoll_ctl(fdM, EPOLL_CTL_ADD, rtpSocketM->Fd(), &ev) == -1) { + error("Cannot add RTP socket into epoll [device %d]", deviceIdM); + } + } + if (rtcpSocketM->Fd() >= 0) { + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.fd = rtcpSocketM->Fd(); + if (epoll_ctl(fdM, EPOLL_CTL_ADD, rtcpSocketM->Fd(), &ev) == -1) { + error("Cannot add RTP socket into epoll [device %d]", deviceIdM); + } + } + } + // Start thread Start(); } @@ -72,6 +95,17 @@ cSatipTuner::~cSatipTuner() Cancel(3); Close(); + // Cleanup epoll + if (fdM >= 0) { + if ((rtpSocketM->Fd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, rtpSocketM->Fd(), NULL) == -1)) { + error("Cannot remove RTP socket from epoll [device %d]", deviceIdM); + } + if ((rtcpSocketM->Fd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, rtcpSocketM->Fd(), NULL) == -1)) { + error("Cannot remove RTP socket from epoll [device %d]", deviceIdM); + } + close(fdM); + } + // Close the listening sockets rtpSocketM->Close(); rtcpSocketM->Close(); @@ -89,47 +123,53 @@ void cSatipTuner::Action(void) // Increase priority SetPriority(-1); // Do the thread loop - while (packetBufferM && Running()) { - int length = -1; - unsigned int size = min(deviceM->CheckData(), packetBufferLenM); - if (tunedM && (size > 0)) { - // Update pids - UpdatePids(); - // Remember the heart beat - KeepAlive(); - // Read reception statistics via DESCRIBE and RTCP - ReadReceptionStatus(); - if (rtcpSocketM && rtcpSocketM->IsOpen()) { - unsigned char buf[1450]; - memset(buf, 0, sizeof(buf)); - if (rtcpSocketM->ReadApplication(buf, sizeof(buf)) > 0) { - ParseReceptionParameters((const char *)buf); - timeout.Set(eReConnectTimeoutMs); - } - } - // Quirk for devices without valid reception data - if (currentServerM && currentServerM->Quirk(cSatipServer::eSatipQuirkForceLock)) { - hasLockM = true; - signalStrengthM = eDefaultSignalStrength; - signalQualityM = eDefaultSignalQuality; - } - // Read data - if (rtpSocketM && rtpSocketM->IsOpen()) - length = rtpSocketM->ReadVideo(packetBufferM, size); - } - if (length > 0) { - AddTunerStatistic(length); - deviceM->WriteData(packetBufferM, length); - timeout.Set(eReConnectTimeoutMs); - } - else { - // Reconnect if necessary - if (openedM && timeout.TimedOut()) { - Disconnect(); - Connect(); - timeout.Set(eReConnectTimeoutMs); - } - sleepM.Wait(10); // to avoid busy loop and reduce cpu load + while (packetBufferM && rtpSocketM && rtcpSocketM && Running()) { + struct epoll_event events[eMaxFileDescriptors]; + int nfds = epoll_wait(fdM, events, eMaxFileDescriptors, eReadTimeoutMs); + switch (nfds) { + default: + for (int i = 0; i < nfds; ++i) { + timeout.Set(eReConnectTimeoutMs); + if (events[i].data.fd == rtpSocketM->Fd()) { + // Read data + int length = rtpSocketM->ReadVideo(packetBufferM, min(deviceM->CheckData(), packetBufferLenM)); + if (length > 0) { + AddTunerStatistic(length); + deviceM->WriteData(packetBufferM, length); + } + } + else if (events[i].data.fd == rtcpSocketM->Fd()) { + unsigned char buf[1450]; + memset(buf, 0, sizeof(buf)); + if (rtcpSocketM->ReadApplication(buf, sizeof(buf)) > 0) + ParseReceptionParameters((const char *)buf); + } + } + // fall through! + case 0: + // Update pids + UpdatePids(); + // Remember the heart beat + KeepAlive(); + // Read reception statistics via DESCRIBE and RTCP + if (ReadReceptionStatus()) { + // Quirk for devices without valid reception data + if (currentServerM && currentServerM->Quirk(cSatipServer::eSatipQuirkForceLock)) { + hasLockM = true; + signalStrengthM = eDefaultSignalStrength; + signalQualityM = eDefaultSignalQuality; + } + } + // Reconnect if necessary + if (openedM && timeout.TimedOut()) { + Disconnect(); + Connect(); + timeout.Set(eReConnectTimeoutMs); + } + break; + case -1: + error("epoll_wait() failed"); + break; } } debug("cSatipTuner::%s(): exiting [device %d]", __FUNCTION__, deviceIdM); @@ -167,17 +207,21 @@ bool cSatipTuner::Connect(void) return openedM; } keepAliveM.Set(timeoutM); - openedM = rtspM->Options(*connectionUri) && rtspM->Setup(*uri, rtpSocketM->Port(), rtcpSocketM->Port()); + openedM = rtspM->Options(*connectionUri); if (openedM) { if (nextServerM && nextServerM->Quirk(cSatipServer::eSatipQuirkSessionId)) rtspM->SetSession(SkipZeroes(*sessionM)); - tunedM = true; - UpdatePids(true); - if (nextServerM) { - cSatipDiscover::GetInstance()->UseServer(nextServerM, true); - currentServerM = nextServerM; - nextServerM = NULL; + if (rtspM->Setup(*uri, rtpSocketM->Port(), rtcpSocketM->Port())) { + tunedM = true; + UpdatePids(true); + if (nextServerM) { + cSatipDiscover::GetInstance()->UseServer(nextServerM, true); + currentServerM = nextServerM; + nextServerM = NULL; + } } + else + openedM = false; } return openedM; @@ -288,10 +332,10 @@ bool cSatipTuner::SetSource(cSatipServer *serverP, const char *parameterP, const cMutexLock MutexLock(&mutexM); if (serverP) { nextServerM = cSatipDiscover::GetInstance()->GetServer(serverP); - if (nextServerM && !isempty(nextServerM->Address()) && !isempty(parameterP)) { + if (rtspM && nextServerM && !isempty(nextServerM->Address()) && !isempty(parameterP)) { // Update stream address and parameter streamAddrM = rtspM->RtspUnescapeString(nextServerM->Address()); - streamParamM = parameterP; + streamParamM = rtspM->RtspUnescapeString(parameterP); // Reconnect Connect(); } @@ -363,10 +407,9 @@ bool cSatipTuner::KeepAlive(void) cMutexLock MutexLock(&mutexM); if (tunedM && keepAliveM.TimedOut()) { cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM); - if (rtspM->Options(*uri)) { - keepAliveM.Set(timeoutM); + keepAliveM.Set(timeoutM); + if (rtspM->Options(*uri)) return true; - } Disconnect(); } diff --git a/tuner.h b/tuner.h index b08ce16..f223af7 100644 --- a/tuner.h +++ b/tuner.h @@ -46,9 +46,11 @@ public: class cSatipTuner : public cThread, public cSatipTunerStatistics, public cSatipTunerIf { private: enum { + eMaxFileDescriptors = 2, // RTP + RTCP eDummyPid = 100, eDefaultSignalStrength = 15, eDefaultSignalQuality = 224, + eReadTimeoutMs = 500, // in milliseconds eStatusUpdateTimeoutMs = 1000, // in milliseconds eConnectTimeoutMs = 1500, // in milliseconds ePidUpdateIntervalMs = 250, // in milliseconds @@ -74,6 +76,7 @@ private: cTimeMs signalInfoCacheM; cTimeMs pidUpdateCacheM; cString sessionM; + int fdM; int timeoutM; bool openedM; bool tunedM;