From 77441ea504b36b5197426d846942d45967b088a8 Mon Sep 17 00:00:00 2001 From: Rolf Ahrenberg Date: Sat, 8 Nov 2014 22:22:46 +0200 Subject: [PATCH] Separated data thread from tuner (Thanks to Stefan Schallenberg). --- HISTORY | 2 + Makefile | 2 +- common.h | 30 +++ data.c | 158 +++++++++++++ data.h | 55 +++++ device.c | 1 - statistics.h | 1 + tuner.c | 609 +++++++++++++++++++++++++++++---------------------- tuner.h | 32 +-- 9 files changed, 607 insertions(+), 283 deletions(-) create mode 100644 data.c create mode 100644 data.h diff --git a/HISTORY b/HISTORY index c8c3139..aaf3efb 100644 --- a/HISTORY +++ b/HISTORY @@ -80,3 +80,5 @@ VDR Plugin 'satip' Revision History - Added support for SAT>IP frontend selection via Radio ID. - Fixed EIT scan (Thanks to Stefan Schallenberg). +- Separated data thread from tuner (Thanks to + Stefan Schallenberg). diff --git a/Makefile b/Makefile index 524338c..dd6adfe 100644 --- a/Makefile +++ b/Makefile @@ -88,7 +88,7 @@ all-redirect: all ### The object files (add further files here): -OBJS = $(PLUGIN).o common.o config.o device.o discover.o param.o \ +OBJS = $(PLUGIN).o common.o config.o data.o device.o discover.o param.o \ sectionfilter.o server.o setup.o socket.o statistics.o tuner.o ### The main target: diff --git a/common.h b/common.h index fa2ab43..3761006 100644 --- a/common.h +++ b/common.h @@ -109,5 +109,35 @@ extern const section_filter_table_type section_filter_table[SECTION_FILTER_TABLE extern const char VERSION[]; +template class cSatipVector : public cVector { +public: + int IndexOf(const T &Data) + { + for (int i = 0; i < this->Size(); ++i) + if (Data == this->At(i)) + return i; + return -1; + } + + void RemoveElement(const T &Data) + { + int i = IndexOf(Data); + if (i >= 0) + this->Remove(i); + } + + void InsertUnique(T Data, int Before = 0) + { + if (IndexOf(Data) < 0) + this->Insert(Data, Before); + } + + void AppendUnique(T Data) + { + if (IndexOf(Data) < 0) + this->Append(Data); + } +}; + #endif // __SATIP_COMMON_H diff --git a/data.c b/data.c new file mode 100644 index 0000000..e64b9d4 --- /dev/null +++ b/data.c @@ -0,0 +1,158 @@ +/* + * tuner.c: SAT>IP plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#include "common.h" +#include "config.h" +#include "discover.h" +#include "param.h" +#include "tuner.h" +#include "data.h" + +enum LOGLEVEL { + logFunc = 0x01, + logFuncPerf = 0x02, + logData = 0x10, + logDataDetails = 0x20, + logAll = 0xFFFF + }; + +int logLevel = logFunc | logFuncPerf | logData; + +#define log(lvl) \ + if (logLevel & lvl) \ + debug("%s::%s [device %d]", __CLASS__, __FUNCTION__, deviceM->GetId()) +#define log1(lvl, a) \ + if (logLevel & lvl) \ + debug("%s::%s" a " [device %d]", __CLASS__, __FUNCTION__, deviceM->GetId()) +#define log2(lvl, a, b...) \ + if (logLevel & lvl) \ + debug("%s::%s " a " [device %d]", __CLASS__, __FUNCTION__, b, deviceM->GetId()) +#define __CLASS__ "cSatipTunerDataThread" + +cSatipTunerDataThread::cSatipTunerDataThread(cSatipDeviceIf &deviceP, cSatipTunerStatistics &statisticsP, unsigned int packetLenP) +: cThread("SAT>IP data"), + deviceM(&deviceP), + statisticsM(&statisticsP), + packetBufferLenM(packetLenP), + packetBufferM(NULL), + rtpSocketM(NULL), + timeoutM(-1), + timeoutHandlerM(0), + timeoutFuncM(NULL), + timeoutParamM(NULL), + sleepM(), + mutexM() +{ + log2(logFunc, "(...,...,%d)", packetLenP); + + // Allocate packet buffer + packetBufferM = MALLOC(unsigned char, packetBufferLenM); + if (packetBufferM) + memset(packetBufferM, 0, packetBufferLenM); + else + error("MALLOC() failed for packet buffer [device %d]", deviceM->GetId()); +} + +cSatipTunerDataThread::~cSatipTunerDataThread() +{ + log(logFunc); + cMutexLock MutexLock(&mutexM); + + // Free allocated memory + free(packetBufferM); + packetBufferM = NULL; +} + +void cSatipTunerDataThread::Start(cSatipSocket *rtpSocketP) +{ + log1(logFunc, "(...)"); + cMutexLock MutexLock(&mutexM); + + rtpSocketM = rtpSocketP; + + // Start thread + cThread::Start(); +} + +void cSatipTunerDataThread::SetTimeout(int timeoutP, fCallback callbackP, void *paramP) +{ + log2(logFunc, "(%d, ...)", timeoutP); + cMutexLock MutexLock(&mutexM); + + if (timeoutP > 0) { + timeoutM = timeoutP; + timeoutFuncM = callbackP; + timeoutParamM = paramP; + timeoutHandlerM.Set(timeoutM); + } + else { + timeoutM = -1; + timeoutFuncM = NULL; + timeoutParamM = NULL; + timeoutHandlerM.Set(0); + } +} + +void cSatipTunerDataThread::Cancel(int WaitSeconds) +{ + if (Running()) + cThread::Cancel(WaitSeconds); +} + +void cSatipTunerDataThread::Flush(void) +{ + log(logFunc); + cMutexLock MutexLock(&mutexM); + + if (rtpSocketM) + rtpSocketM->Flush(); +} + +void cSatipTunerDataThread::Action(void) +{ + log(logFunc); + + // Increase priority + SetPriority(-1); + + // Do the thread loop + while (Running() && packetBufferM) { + int length = -1; + unsigned int size = min(deviceM->CheckData(), packetBufferLenM); + + mutexM.Lock(); + + // Read data + if (rtpSocketM && rtpSocketM->IsOpen()) { + length = rtpSocketM->ReadVideo(packetBufferM, size); + log2(logData, "received %d bytes", length); + } + + if (length > 0) { + log2(logDataDetails, "trying to write %d bytes", length); + deviceM->WriteData(packetBufferM, length); + log2(logDataDetails, "wrote %d bytes", length); + + if (statisticsM) + statisticsM->AddTunerStatistic(length); + + timeoutHandlerM.Set(timeoutM); + } + + if (timeoutM > 0 && timeoutFuncM && timeoutHandlerM.TimedOut()) { + error("No Data received for %d ms [device %d], timeout handling started", timeoutM, deviceM->GetId()); + (*timeoutFuncM)(timeoutParamM); + timeoutHandlerM.Set(timeoutM); + } + + mutexM.Unlock(); + + // to avoid busy loop and reduce cpu load + if (length <= 0) + sleepM.Wait(10); + } +} diff --git a/data.h b/data.h new file mode 100644 index 0000000..816cb60 --- /dev/null +++ b/data.h @@ -0,0 +1,55 @@ +/* + * tuner.h: SAT>IP plugin for the Video Disk Recorder + * + * See the README file for copyright information and how to reach the author. + * + */ + +#ifndef __SATIP_DATA_H +#define __SATIP_DATA_H + +#include +#include + +#ifndef CURLOPT_RTSPHEADER +#error "libcurl is missing required RTSP support" +#endif + +#include +#include + +#include "deviceif.h" +#include "statistics.h" +#include "socket.h" + +class cSatipTunerDataThread: public cThread { +public: + typedef void (*fCallback)(void *parm); + + cSatipTunerDataThread(cSatipDeviceIf &deviceP, cSatipTunerStatistics &statisticsP, unsigned int packetLenP); + ~cSatipTunerDataThread(void); + void Start(cSatipSocket *rtpSocketP); + void SetTimeout(int timeoutP, fCallback callbackP, void *parmP); + void Cancel(int WaitSeconds = 0); + void Flush(); + +protected: + void Action(void); + +private: + cSatipTunerDataThread(cSatipTunerDataThread &toCopy); // Prohibit copying + + cSatipDeviceIf *deviceM; + cSatipTunerStatistics *statisticsM; + unsigned int packetBufferLenM; + unsigned char *packetBufferM; + cSatipSocket *rtpSocketM; + int timeoutM; + cTimeMs timeoutHandlerM; + fCallback timeoutFuncM; + void *timeoutParamM; + cCondWait sleepM; + cMutex mutexM; +}; + +#endif // __SATIP_DATA_H diff --git a/device.c b/device.c index 7155468..77ca71a 100644 --- a/device.c +++ b/device.c @@ -287,7 +287,6 @@ bool cSatipDevice::SetChannelDevice(const cChannel *channelP, bool liveViewP) error("Unrecognized SAT>IP channel parameters: %s", channelP->Parameters()); return false; } - cString address; cSatipServer *server = cSatipDiscover::GetInstance()->GetServer(channelP->Source(), channelP->Transponder(), dtp.System()); if (!server) { debug("cSatipDevice::%s(%u): no suitable server found", __FUNCTION__, deviceIndexM); diff --git a/statistics.h b/statistics.h index f7c4907..adb71d7 100644 --- a/statistics.h +++ b/statistics.h @@ -58,6 +58,7 @@ public: cString GetTunerStatistic(); protected: + friend class cSatipTunerDataThread; void AddTunerStatistic(long bytesP); private: diff --git a/tuner.c b/tuner.c index 88023bb..96896d8 100644 --- a/tuner.c +++ b/tuner.c @@ -12,9 +12,9 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) : cThread("SAT>IP tuner"), + dataThreadM(deviceP, *this, packetLenP), sleepM(), deviceM(&deviceP), - packetBufferLenM(packetLenP), rtpSocketM(new cSatipSocket()), rtcpSocketM(new cSatipSocket()), streamAddrM(""), @@ -23,13 +23,12 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) nextServerM(NULL), mutexM(), handleM(NULL), - headerListM(NULL), keepAliveM(), statusUpdateM(), pidUpdateCacheM(), sessionM(""), timeoutM(eMinKeepAliveIntervalMs), - openedM(false), + reconnectM(false), tunedM(false), hasLockM(false), signalStrengthM(-1), @@ -39,27 +38,49 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) delPidsM(), pidsM() { - debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, packetBufferLenM, deviceM->GetId()); - // Allocate packet buffer - packetBufferM = MALLOC(unsigned char, packetBufferLenM); - if (packetBufferM) - memset(packetBufferM, 0, packetBufferLenM); - else - error("MALLOC() failed for packet buffer [device %d]", deviceM->GetId()); - // Start thread + debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, packetLenP, deviceM->GetId()); + + RtspInitialize(); + + // Open sockets + for (int i = 100; i > 0; --i) { + if (rtpSocketM->Open() && rtcpSocketM->Open(rtpSocketM->Port() + 1)) + break; + rtpSocketM->Close(); + rtcpSocketM->Close(); + } + if ((rtpSocketM->Port() <= 0) || (rtcpSocketM->Port() <= 0)) { + error("Cannot open required RTP/RTCP ports [device %d]", deviceM->GetId()); + } + else { + info("Using RTP/RTCP ports %d-%d [device %d]", rtpSocketM->Port(), rtcpSocketM->Port(), deviceM->GetId()); + } + + // Start threads + dataThreadM.Start(rtpSocketM); Start(); } cSatipTuner::~cSatipTuner() { debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); - // Stop thread + // Stop threads + dataThreadM.Cancel(3); sleepM.Signal(); if (Running()) Cancel(3); Close(); + + // Terminate curl session + RtspTerminate(); + + // Close the listening sockets + if (rtpSocketM) + rtpSocketM->Close(); + if (rtcpSocketM) + rtcpSocketM->Close(); + // Free allocated memory - free(packetBufferM); DELETENULL(rtcpSocketM); DELETENULL(rtpSocketM); } @@ -67,12 +88,12 @@ cSatipTuner::~cSatipTuner() size_t cSatipTuner::HeaderCallback(void *ptrP, size_t sizeP, size_t nmembP, void *dataP) { cSatipTuner *obj = reinterpret_cast(dataP); + //debug("cSatipTuner::%s() [device %d]", __FUNCTION__, obj->deviceM->GetId()); + size_t len = sizeP * nmembP; - //debug("cSatipTuner::%s(%zu)", __FUNCTION__, len); char *s, *p = (char *)ptrP; char *r = strtok_r(p, "\r\n", &s); - while (obj && r) { //debug("cSatipTuner::%s(%zu): %s", __FUNCTION__, len, r); r = skipspace(r); @@ -111,29 +132,77 @@ size_t cSatipTuner::DataCallback(void *ptrP, size_t sizeP, size_t nmembP, void * return len; } +void cSatipTuner::DataTimeoutCallback(void *objP) +{ + cSatipTuner *obj = reinterpret_cast(objP); + //debug("cSatipTuner::%s() [device %d]", __FUNCTION__, obj->deviceM->GetId()); + + if (obj) + obj->reconnectM = true; +} + +int cSatipTuner::RtspDebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP) +{ + //cSatipTuner *obj = reinterpret_cast(userPtrP); + //debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, (int)typeP, obj->deviceM->GetId()); + + switch (typeP) { + case CURLINFO_TEXT: + debug("RTSP INFO %.*s", (int)sizeP, dataP); + break; + case CURLINFO_HEADER_IN: + debug("RTSP HEAD< %.*s", (int)sizeP, dataP); + break; + case CURLINFO_HEADER_OUT: + debug("RTSP HEAD> %.*s", (int)sizeP, dataP); + break; + case CURLINFO_DATA_IN: + debug("RTSP DATA< %.*s", (int)sizeP, dataP); + break; + case CURLINFO_DATA_OUT: + debug("RTSP DATA> %.*s", (int)sizeP, dataP); + break; + default: + break; + } + + return 0; +} + void cSatipTuner::Action(void) { debug("cSatipTuner::%s(): entering [device %d]", __FUNCTION__, deviceM->GetId()); - cTimeMs timeout(eReConnectTimeoutMs); + cTimeMs rtcpTimeout(eReConnectTimeoutMs); // Increase priority SetPriority(-1); // Do the thread loop - while (packetBufferM && Running()) { - int length = -1; - unsigned int size = min(deviceM->CheckData(), packetBufferLenM); - if (tunedM && (size > 0)) { + while (Running()) { + if (reconnectM) { + info("SAT>IP Device %d timed out. Reconnecting.", deviceM->GetId()); + cMutexLock MutexLock(&mutexM); + if (tunedM) + Disconnect(); + Connect(); + reconnectM = false; + } + if (tunedM) { // Update pids UpdatePids(); // Remember the heart beat KeepAlive(); - // Read reception statistics via DESCRIBE and RTCP - ReadReceptionStatus(); + // Read reception statistics via DESCRIBE + if (!pidsM.Size() && statusUpdateM.TimedOut() ) { + cMutexLock MutexLock(&mutexM); + if (RtspDescribe()) + statusUpdateM.Set(eStatusUpdateTimeoutMs); + } + // Read reception statistics via RTCP if (rtcpSocketM && rtcpSocketM->IsOpen()) { unsigned char buf[1450]; memset(buf, 0, sizeof(buf)); if (rtcpSocketM->ReadApplication(buf, sizeof(buf)) > 0) { ParseReceptionParameters((const char *)buf); - timeout.Set(eReConnectTimeoutMs); + rtcpTimeout.Set(eReConnectTimeoutMs); } } // Quirk for devices without valid reception data @@ -142,24 +211,10 @@ void cSatipTuner::Action(void) signalStrengthM = eDefaultSignalStrength; signalQualityM = eDefaultSignalQuality; } - // Read data - if (rtpSocketM && rtpSocketM->IsOpen()) - length = rtpSocketM->ReadVideo(packetBufferM, size); + if (rtcpTimeout.TimedOut()) + reconnectM = true; } - if (length > 0) { - AddTunerStatistic(length); - deviceM->WriteData(packetBufferM, length); - timeout.Set(eReConnectTimeoutMs); - } - else { - // Reconnect if necessary - if (openedM && timeout.TimedOut()) { - Disconnect(); - Connect(); - timeout.Set(eReConnectTimeoutMs); - } sleepM.Wait(10); // to avoid busy loop and reduce cpu load - } } debug("cSatipTuner::%s(): exiting [device %d]", __FUNCTION__, deviceM->GetId()); } @@ -181,151 +236,44 @@ bool cSatipTuner::Connect(void) cMutexLock MutexLock(&mutexM); debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); - // Initialize the curl session - if (!handleM) - handleM = curl_easy_init(); - - if (handleM && !isempty(*streamAddrM)) { - cString uri, control, transport, range; - CURLcode res = CURLE_OK; - - // Just retune - if (tunedM && (streamIdM >= 0)) { - debug("cSatipTuner::%s(): retune [device %d]", __FUNCTION__, deviceM->GetId()); - keepAliveM.Set(0); - KeepAlive(); - // Flush any old content - if (rtpSocketM) - rtpSocketM->Flush(); - - uri = cString::sprintf("rtsp://%s/?%s", *streamAddrM, *streamParamM); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_STREAM_URI, *uri); - transport = cString::sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpSocketM->Port(), rtcpSocketM->Port()); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_TRANSPORT, *transport); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP); - SATIP_CURL_EASY_PERFORM(handleM); - - openedM = true; - return openedM; - } - -#ifdef DEBUG - // Verbose output - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_VERBOSE, 1L); -#endif - - // No progress meter and no signaling - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_NOPROGRESS, 1L); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_NOSIGNAL, 1L); - - // Set timeouts - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_TIMEOUT_MS, (long)eConnectTimeoutMs); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_CONNECTTIMEOUT_MS, (long)eConnectTimeoutMs); - - // Set user-agent - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_USERAGENT, *cString::sprintf("vdr-%s/%s (device %d)", PLUGIN_NAME_I18N, VERSION, deviceM->GetId())); - - // Set URL - char *p = curl_easy_unescape(handleM, *streamAddrM, 0, NULL); - streamAddrM = p; - curl_free(p); - uri = cString::sprintf("rtsp://%s/", *streamAddrM); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_URL, *uri); - - // Open sockets - int i = 100; - while (i-- > 0) { - if (rtpSocketM->Open() && rtcpSocketM->Open(rtpSocketM->Port() + 1)) - break; - rtpSocketM->Close(); - rtcpSocketM->Close(); - } - if ((rtpSocketM->Port() <= 0) || (rtcpSocketM->Port() <= 0)) { - error("Cannot open required RTP/RTCP ports [device %d]", deviceM->GetId()); - openedM = false; - return openedM; - } - - // Request server options - keepAliveM.Set(timeoutM); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_STREAM_URI, *uri); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_OPTIONS); - SATIP_CURL_EASY_PERFORM(handleM); - if (!ValidateLatestResponse()) { - openedM = false; - return openedM; - } - - // Setup media stream: "&pids=all" for the whole mux - uri = cString::sprintf("rtsp://%s/?%s", *streamAddrM, *streamParamM); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_STREAM_URI, *uri); - transport = cString::sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpSocketM->Port(), rtcpSocketM->Port()); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_TRANSPORT, *transport); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP); - // Set header callback for catching the session and timeout - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_HEADERFUNCTION, cSatipTuner::HeaderCallback); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_WRITEHEADER, this); - SATIP_CURL_EASY_PERFORM(handleM); - // Session id is now known - disable header parsing - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_HEADERFUNCTION, NULL); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_WRITEHEADER, NULL); - if (nextServerM && nextServerM->Quirk(cSatipServer::eSatipQuirkSessionId) && !isempty(*sessionM) && startswith(*sessionM, "0")) { - debug("cSatipTuner::%s(): session id quirk [device %d]", __FUNCTION__, deviceM->GetId()); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_SESSION_ID, SkipZeroes(*sessionM)); - } - if (!ValidateLatestResponse()) { - openedM = false; - return openedM; - } - - // Start playing - tunedM = true; - UpdatePids(true); - if (nextServerM) { - cSatipDiscover::GetInstance()->UseServer(nextServerM, true); - currentServerM = nextServerM; - nextServerM = NULL; - } - openedM = true; - return openedM; + if (isempty(*streamAddrM)) { + if (tunedM) + Disconnect(); + tunedM = false; + return tunedM; } - openedM = false; - return openedM; + // Setup stream + tunedM = RtspSetup(streamParamM, rtpSocketM->Port(), rtcpSocketM->Port()); + if (!tunedM) + return tunedM; + + dataThreadM.Flush(); + keepAliveM.Set(timeoutM); + + // Start playing + UpdatePids(true); + if (nextServerM) { + cSatipDiscover::GetInstance()->UseServer(nextServerM, true); + currentServerM = nextServerM; + nextServerM = NULL; + } + + return tunedM; } bool cSatipTuner::Disconnect(void) { cMutexLock MutexLock(&mutexM); debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); - openedM = false; - // Terminate curl session - if (handleM) { - // Teardown rtsp session - if (!isempty(*streamAddrM) && streamIdM >= 0) { - CURLcode res = CURLE_OK; - - cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_STREAM_URI, *uri); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_TEARDOWN); - SATIP_CURL_EASY_PERFORM(handleM); - ValidateLatestResponse(); - } - - // Cleanup curl stuff - if (headerListM) { - curl_slist_free_all(headerListM); - headerListM = NULL; - } - curl_easy_cleanup(handleM); - handleM = NULL; + // Teardown rtsp session + if (tunedM) { + RtspTeardown(); + streamIdM = -1; + tunedM = false; } - // Close the listening sockets - rtpSocketM->Close(); - rtcpSocketM->Close(); - // Reset signal parameters hasLockM = false; signalStrengthM = -1; @@ -333,7 +281,6 @@ bool cSatipTuner::Disconnect(void) if (currentServerM) cSatipDiscover::GetInstance()->UseServer(currentServerM, false); - tunedM = false; statusUpdateM.Set(0); timeoutM = eMinKeepAliveIntervalMs; addPidsM.Clear(); @@ -388,7 +335,10 @@ void cSatipTuner::ParseReceptionParameters(const char *paramP) // "0" the frontend is not locked // "1" the frontend is locked c = strstr(c, ","); - hasLockM = atoi(++c); + value = !!atoi(++c); + if (value != hasLockM) + info("Device %d %s lock", deviceM->GetId(), hasLockM ? "gained" : "lost"); + hasLockM = value; // quality: // Numerical value between 0 and 15 @@ -424,6 +374,7 @@ bool cSatipTuner::SetSource(cSatipServer *serverP, const char *parameterP, const { debug("cSatipTuner::%s(%s, %d) [device %d]", __FUNCTION__, parameterP, indexP, deviceM->GetId()); cMutexLock MutexLock(&mutexM); + if (serverP) { nextServerM = cSatipDiscover::GetInstance()->GetServer(serverP); if (nextServerM && !isempty(nextServerM->Address()) && !isempty(parameterP)) { @@ -436,103 +387,83 @@ bool cSatipTuner::SetSource(cSatipServer *serverP, const char *parameterP, const } else Disconnect(); + return true; } bool cSatipTuner::SetPid(int pidP, int typeP, bool onP) { - //debug("cSatipTuner::%s(%d, %d, %d) [device %d]", __FUNCTION__, pidP, typeP, onP, deviceM->GetId()); + debug("cSatipTuner::%s(%d, %d, %d) [device %d]", __FUNCTION__, pidP, typeP, onP, deviceM->GetId()); cMutexLock MutexLock(&mutexM); - bool found = false; - for (int i = 0; i < pidsM.Size(); ++i) { - if (pidsM[i] == pidP) { - found = true; - if (!onP) - pidsM.Remove(i); - break; - } - } - if (onP && !found) - pidsM.Append(pidP); - // Generate deltas - found = false; + if (onP) { - for (int i = 0; i < addPidsM.Size(); ++i) { - if (addPidsM[i] == pidP) { - found = true; - break; - } - } - if (!found) - addPidsM.Append(pidP); - for (int i = 0; i < delPidsM.Size(); ++i) { - if (delPidsM[i] == pidP) { - delPidsM.Remove(i); - break; - } - } + pidsM.AppendUnique(pidP); + addPidsM.AppendUnique(pidP); + delPidsM.RemoveElement(pidP); } else { - for (int i = 0; i < delPidsM.Size(); ++i) { - if (delPidsM[i] == pidP) { - found = true; - break; - } - } - if (!found) - delPidsM.Append(pidP); - for (int i = 0; i < addPidsM.Size(); ++i) { - if (addPidsM[i] == pidP) { - addPidsM.Remove(i); - break; - } - } + pidsM.RemoveElement(pidP); + delPidsM.AppendUnique(pidP); + addPidsM.RemoveElement(pidP); } + pidUpdateCacheM.Set(ePidUpdateIntervalMs); + return true; } +cString cSatipTuner::GeneratePidParameter(bool allPidsP) +{ + debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, (int)allPidsP, deviceM->GetId()); + cMutexLock MutexLock(&mutexM); + + bool usedummy = !!(currentServerM && currentServerM->Quirk(cSatipServer::eSatipQuirkPlayPids)); + cString param = ""; + + if (allPidsP || usedummy) { + if (usedummy && (pidsM.Size() == 1) && (pidsM[0] < 0x20)) + param = cString::sprintf("%s,%d", *param, eDummyPid); + else if (pidsM.Size()) { + param = "pids="; + for (int i = 0; i < pidsM.Size(); ++i) + param = cString::sprintf("%s%s%d", *param, (i == 0 ? "" : ","), pidsM[i]); + } + } + else { + if (addPidsM.Size()) { + param = "addpids="; + for (int i = 0; i < addPidsM.Size(); ++i) + param = cString::sprintf("%s%s%d", *param, (i == 0 ? "" : ","), addPidsM[i]); + addPidsM.Clear(); + } + if (delPidsM.Size()) { + param = cString::sprintf("%s%sdelpids=", *param, (isempty(*param) ? "" : "&")); + for (int i = 0; i < delPidsM.Size(); ++i) + param = cString::sprintf("%s%s%d", *param, (i == 0 ? "" : ","), delPidsM[i]); + delPidsM.Clear(); + } + } + + return param; +} + bool cSatipTuner::UpdatePids(bool forceP) { cMutexLock MutexLock(&mutexM); - if (((forceP && pidsM.Size()) || (pidUpdateCacheM.TimedOut() && (addPidsM.Size() || delPidsM.Size()))) && - tunedM && handleM && !isempty(*streamAddrM) && (streamIdM > 0)) { - CURLcode res = CURLE_OK; - cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM); - bool usedummy = !!(currentServerM && currentServerM->Quirk(cSatipServer::eSatipQuirkPlayPids)); - if (forceP || usedummy) { - if (pidsM.Size()) { - uri = cString::sprintf("%s?pids=", *uri); - for (int i = 0; i < pidsM.Size(); ++i) - uri = cString::sprintf("%s%d%s", *uri, pidsM[i], (i == (pidsM.Size() - 1)) ? "" : ","); - } - if (usedummy && (pidsM.Size() == 1) && (pidsM[0] < 0x20)) - uri = cString::sprintf("%s,%d", *uri, eDummyPid); - } - else { - if (addPidsM.Size()) { - uri = cString::sprintf("%s?addpids=", *uri); - for (int i = 0; i < addPidsM.Size(); ++i) - uri = cString::sprintf("%s%d%s", *uri, addPidsM[i], (i == (addPidsM.Size() - 1)) ? "" : ","); - } - if (delPidsM.Size()) { - uri = cString::sprintf("%s%sdelpids=", *uri, addPidsM.Size() ? "&" : "?"); - for (int i = 0; i < delPidsM.Size(); ++i) - uri = cString::sprintf("%s%d%s", *uri, delPidsM[i], (i == (delPidsM.Size() - 1)) ? "" : ","); - } - } - //debug("cSatipTuner::%s(): %s [device %d]", __FUNCTION__, *uri, deviceM->GetId()); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_STREAM_URI, *uri); - SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_PLAY); - SATIP_CURL_EASY_PERFORM(handleM); - if (ValidateLatestResponse()) { + //debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, (int)forceP, deviceM->GetId()); + + if (((forceP && pidsM.Size()) || + (pidUpdateCacheM.TimedOut() && (addPidsM.Size() || delPidsM.Size())) ) && + tunedM && handleM && !isempty(*streamAddrM)) { + // Disable RTP Timeout while sending PLAY Command + dataThreadM.SetTimeout(-1, &DataTimeoutCallback, this); + if (RtspPlay(*GeneratePidParameter(forceP))) { addPidsM.Clear(); delPidsM.Clear(); + dataThreadM.SetTimeout(eReConnectTimeoutMs, &DataTimeoutCallback, this); + return true; } - else - Disconnect(); - - return true; + Disconnect(); } return false; @@ -543,28 +474,175 @@ bool cSatipTuner::KeepAlive(void) cMutexLock MutexLock(&mutexM); if (tunedM && handleM && keepAliveM.TimedOut()) { debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); + if (RtspOptions()) { + keepAliveM.Set(timeoutM); + return true; + } + Disconnect(); + } + + return false; +} + +bool cSatipTuner::RtspInitialize() +{ + // Initialize the curl session + if (!handleM) { + debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); + + handleM = curl_easy_init(); + CURLcode res = CURLE_OK; + +#ifdef DEBUG + // Verbose output + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_DEBUGFUNCTION, RtspDebugCallback); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_VERBOSE, 1L); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_DEBUGDATA, this); +#endif + + // No progress meter and no signaling + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_NOPROGRESS, 1L); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_NOSIGNAL, 1L); + + // Set timeouts + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_TIMEOUT_MS, (long)eConnectTimeoutMs); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_CONNECTTIMEOUT_MS, (long)eConnectTimeoutMs); + + // Set user-agent + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_USERAGENT, *cString::sprintf("vdr-%s/%s (device %d)", PLUGIN_NAME_I18N, VERSION, deviceM->GetId())); + + return !!handleM; + } + + return false; +} + +bool cSatipTuner::RtspTerminate() +{ + debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); + if (handleM) { + curl_easy_cleanup(handleM); + handleM = NULL; + } + + return true; +} + +bool cSatipTuner::RtspSetup(const char *paramP, int rtpPortP, int rtcpPortP) +{ + debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); + if (handleM) { + cString uri; + CURLcode res = CURLE_OK; + + // set URL, this will not change + // Note that we are unescaping the adress here and do NOT store it for future use. + char *p = curl_easy_unescape(handleM, *streamAddrM, 0, NULL); + cString url = cString::sprintf("rtsp://%s/", p); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_URL, *url); + curl_free(p); + + if (streamIdM >= 0) + uri = cString::sprintf("rtsp://%s/stream=%d?%s", *streamAddrM, streamIdM, paramP); + else + uri = cString::sprintf("rtsp://%s/?%s", *streamAddrM, paramP); + + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_STREAM_URI, *uri); + cString transport = cString::sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPortP, rtcpPortP); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_TRANSPORT, *transport); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_SETUP); + // Set header callback for catching the session and timeout + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_HEADERFUNCTION, cSatipTuner::HeaderCallback); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_WRITEHEADER, this); + SATIP_CURL_EASY_PERFORM(handleM); + // Session id is now known - disable header parsing + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_HEADERFUNCTION, NULL); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_WRITEHEADER, NULL); + + // Validate session id + if (streamIdM < 0) { + error("Internal Error: No session id received [device %d]", deviceM->GetId()); + return false; + } + // For some SATIP boxes e.g. GSSBOX and Triax TSS 400 we need to strip the + // leading '0' of the sessionID. + if (nextServerM && nextServerM->Quirk(cSatipServer::eSatipQuirkSessionId) && !isempty(*sessionM) && startswith(*sessionM, "0")) { + debug("cSatipTuner::%s(): session id quirk [device %d]", __FUNCTION__, deviceM->GetId()); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_SESSION_ID, SkipZeroes(*sessionM)); + } + + return ValidateLatestResponse(); + } + + return false; +} + +bool cSatipTuner::RtspTeardown(void) +{ + debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); + if (handleM) { + CURLcode res = CURLE_OK; + cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM); + + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_STREAM_URI, *uri); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_TEARDOWN); + SATIP_CURL_EASY_PERFORM(handleM); + + // Reset data we have about the session + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_SESSION_ID, NULL); + streamIdM = -1; + sessionM = ""; + timeoutM = eMinKeepAliveIntervalMs; + + return ValidateLatestResponse(); + } + + return false; +} + +bool cSatipTuner::RtspPlay(const char *paramP) +{ + debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); + if (handleM) { + cString uri; + CURLcode res = CURLE_OK; + + if (paramP) + uri = cString::sprintf("rtsp://%s/stream=%d?%s", *streamAddrM, streamIdM, paramP); + else + uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM); + + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_STREAM_URI, *uri); + SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_PLAY); + SATIP_CURL_EASY_PERFORM(handleM); + + return ValidateLatestResponse(); + } + + return false; +} + +bool cSatipTuner::RtspOptions(void) +{ + debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); + if (handleM) { CURLcode res = CURLE_OK; cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM); SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_STREAM_URI, *uri); SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_RTSP_REQUEST, (long)CURL_RTSPREQ_OPTIONS); SATIP_CURL_EASY_PERFORM(handleM); - if (ValidateLatestResponse()) - keepAliveM.Set(timeoutM); - else - Disconnect(); - return true; + return ValidateLatestResponse(); } return false; } -bool cSatipTuner::ReadReceptionStatus(void) +bool cSatipTuner::RtspDescribe(void) { - cMutexLock MutexLock(&mutexM); - if (tunedM && handleM && !pidsM.Size() && statusUpdateM.TimedOut() ) { - debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); + debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceM->GetId()); + if (handleM) { CURLcode res = CURLE_OK; cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM); @@ -575,12 +653,11 @@ bool cSatipTuner::ReadReceptionStatus(void) SATIP_CURL_EASY_PERFORM(handleM); SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_WRITEFUNCTION, NULL); SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_WRITEDATA, NULL); - if (ValidateLatestResponse()) - statusUpdateM.Set(eStatusUpdateTimeoutMs); - else - Disconnect(); - return true; + if (ValidateLatestResponse()) + return true; + + Disconnect(); } return false; diff --git a/tuner.h b/tuner.h index 4a3201b..46f5940 100644 --- a/tuner.h +++ b/tuner.h @@ -8,13 +8,6 @@ #ifndef __SATIP_TUNER_H #define __SATIP_TUNER_H -#include -#include - -#ifndef CURLOPT_RTSPHEADER -#error "libcurl is missing required RTSP support" -#endif - #include #include @@ -22,6 +15,7 @@ #include "server.h" #include "statistics.h" #include "socket.h" +#include "data.h" class cSatipTuner : public cThread, public cSatipTunerStatistics { private: @@ -38,11 +32,12 @@ private: static size_t HeaderCallback(void *ptrP, size_t sizeP, size_t nmembP, void *dataP); static size_t DataCallback(void *ptrP, size_t sizeP, size_t nmembP, void *dataP); + static void DataTimeoutCallback(void *objP); + static int RtspDebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP); + cSatipTunerDataThread dataThreadM; cCondWait sleepM; cSatipDeviceIf* deviceM; - unsigned char* packetBufferM; - unsigned int packetBufferLenM; cSatipSocket *rtpSocketM; cSatipSocket *rtcpSocketM; cString streamAddrM; @@ -51,22 +46,21 @@ private: cSatipServer *nextServerM; cMutex mutexM; CURL *handleM; - struct curl_slist *headerListM; cTimeMs keepAliveM; cTimeMs statusUpdateM; cTimeMs signalInfoCacheM; cTimeMs pidUpdateCacheM; cString sessionM; int timeoutM; - bool openedM; + bool reconnectM; bool tunedM; bool hasLockM; int signalStrengthM; int signalQualityM; int streamIdM; - cVector addPidsM; - cVector delPidsM; - cVector pidsM; + cSatipVector addPidsM; + cSatipVector delPidsM; + cSatipVector pidsM; bool Connect(void); bool Disconnect(void); @@ -75,9 +69,17 @@ private: void SetStreamId(int streamIdP); void SetSessionTimeout(const char *sessionP, int timeoutP = 0); bool KeepAlive(void); - bool ReadReceptionStatus(void); bool UpdateSignalInfoCache(void); bool UpdatePids(bool forceP = false); + cString GeneratePidParameter(bool allPidsP = false); + + bool RtspInitialize(void); + bool RtspTerminate(void); + bool RtspOptions(void); + bool RtspSetup(const char *paramP, int rtpPortP, int rtcpPortP); + bool RtspDescribe(void); + bool RtspPlay(const char *paramP = NULL); + bool RtspTeardown(void); protected: virtual void Action(void);