From c2ef6d593e6d08565c80e45d89b6b8051d028264 Mon Sep 17 00:00:00 2001 From: nafets227 Date: Sun, 9 Nov 2014 22:56:15 +0100 Subject: [PATCH] Tuner Bugfix, Blocking socket Preparation (Replay of Subversion #54 and #55) --- config.c | 3 ++- config.h | 2 ++ data.c | 13 +++++++++---- socket.c | 21 ++++++++++++++------- socket.h | 3 ++- tuner.c | 19 ++++++++++++------- 6 files changed, 41 insertions(+), 20 deletions(-) diff --git a/config.c b/config.c index 4825a81..501c49a 100644 --- a/config.c +++ b/config.c @@ -13,7 +13,8 @@ cSatipConfig SatipConfig; cSatipConfig::cSatipConfig(void) : operatingModeM(eOperatingModeLow), eitScanM(1), - useBytesM(1) + useBytesM(1), + usePollingM(true) { for (unsigned int i = 0; i < ARRAY_SIZE(disabledSourcesM); ++i) disabledSourcesM[i] = cSource::stNone; diff --git a/config.h b/config.h index 9555480..b0564cf 100644 --- a/config.h +++ b/config.h @@ -20,6 +20,7 @@ private: int disabledSourcesM[MAX_DISABLED_SOURCES_COUNT]; int disabledFiltersM[SECTION_FILTER_TABLE_SIZE]; char configDirectoryM[PATH_MAX]; + bool usePollingM; public: enum { @@ -50,6 +51,7 @@ public: void SetConfigDirectory(const char *directoryP); void SetDisabledSources(unsigned int indexP, int sourceP); void SetDisabledFilters(unsigned int indexP, int numberP); + bool IsPolling() {return usePollingM;} }; extern cSatipConfig SatipConfig; diff --git a/data.c b/data.c index 221395c..99f476b 100644 --- a/data.c +++ b/data.c @@ -20,7 +20,7 @@ enum LOGLEVEL { logAll = 0xFFFF }; -int logLevel = logFunc | logFuncPerf | logData; +int logLevel = logFunc /*| logFuncPerf | logData*/; #define log(lvl) \ if (logLevel & lvl) \ @@ -79,7 +79,9 @@ int cSatipTunerDataThread::LastReceivedMs() { int rc = timeDataReceivedM.Elapsed(); - log2(logFunc, "returning %d", rc); + log2(logFuncPerf, "returning %d", rc); + + return rc; } void cSatipTunerDataThread::ResetLastReceivedMs() @@ -107,6 +109,7 @@ void cSatipTunerDataThread::Flush(void) void cSatipTunerDataThread::Action(void) { log(logFunc); + bool polling = SatipConfig.IsPolling(); // Increase priority SetPriority(-1); @@ -121,6 +124,9 @@ void cSatipTunerDataThread::Action(void) // Read data if (rtpSocketM && rtpSocketM->IsOpen()) { length = rtpSocketM->ReadVideo(packetBufferM, size); + if (!polling || length > 0) + timeDataReceivedM.Set(); + log2(logData, "received %d bytes", length); } @@ -132,13 +138,12 @@ void cSatipTunerDataThread::Action(void) if (statisticsM) statisticsM->AddTunerStatistic(length); - timeDataReceivedM.Set(); } mutexM.Unlock(); // to avoid busy loop and reduce cpu load - if (length <= 0) + if (polling && length <= 0) sleepM.Wait(10); } } diff --git a/socket.c b/socket.c index 3ec2979..fe72fe8 100644 --- a/socket.c +++ b/socket.c @@ -23,7 +23,8 @@ cSatipSocket::cSatipSocket() socketDescM(-1), lastErrorReportM(0), packetErrorsM(0), - sequenceNumberM(-1) + sequenceNumberM(-1), + waitM(false) { debug("cSatipSocket::%s()", __FUNCTION__); memset(&sockAddrM, 0, sizeof(sockAddrM)); @@ -36,17 +37,22 @@ cSatipSocket::~cSatipSocket() Close(); } -bool cSatipSocket::Open(const int portP) +bool cSatipSocket::Open(const int portP, bool waitP) { // Bind to the socket if it is not active already if (socketDescM < 0) { + waitM = waitP; + socklen_t len = sizeof(sockAddrM); // Create socket socketDescM = socket(PF_INET, SOCK_DGRAM, 0); ERROR_IF_RET(socketDescM < 0, "socket()", return false); - // Make it use non-blocking I/O to avoid stuck read calls - ERROR_IF_FUNC(fcntl(socketDescM, F_SETFL, O_NONBLOCK), "fcntl(O_NONBLOCK)", - Close(), return false); + + if (!waitP) + // Make it use non-blocking I/O to avoid stuck read calls + ERROR_IF_FUNC(fcntl(socketDescM, F_SETFL, O_NONBLOCK), "fcntl(O_NONBLOCK)", + Close(), return false); + // Allow multiple sockets to use the same PORT number int yes = 1; ERROR_IF_FUNC(setsockopt(socketDescM, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0, @@ -63,7 +69,8 @@ bool cSatipSocket::Open(const int portP) "getsockname()", Close(), return false); socketPortM = ntohs(sockAddrM.sin_port); } - debug("cSatipSocket::%s(%d): socketPort=%d", __FUNCTION__, portP, socketPortM); + debug("cSatipSocket::%s(%d,%d): socketPort=%d", + __FUNCTION__, portP, socketPortM, waitP); return true; } @@ -133,7 +140,7 @@ int cSatipSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP) msgh.msg_flags = 0; if (socketDescM && bufferAddrP && (bufferLenP > 0)) - len = (int)recvmsg(socketDescM, &msgh, MSG_DONTWAIT); + len = (int)recvmsg(socketDescM, &msgh, waitM ? 0 : MSG_DONTWAIT); if (len > 0) return len; } while (len > 0); diff --git a/socket.h b/socket.h index 8b97bde..0a7a6b3 100644 --- a/socket.h +++ b/socket.h @@ -21,11 +21,12 @@ private: time_t lastErrorReportM; int packetErrorsM; int sequenceNumberM; + bool waitM; public: cSatipSocket(); ~cSatipSocket(); - bool Open(const int portP = 0); + bool Open(const int portP = 0, bool waitP = false); void Close(void); int Port(void) { return socketPortM; } bool IsOpen(void) { return (socketDescM >= 0); } diff --git a/tuner.c b/tuner.c index 14ee9b5..8956b6f 100644 --- a/tuner.c +++ b/tuner.c @@ -43,9 +43,12 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP) RtspInitialize(); + bool waitSocket = false; + // Open sockets for (int i = 100; i > 0; --i) { - if (rtpSocketM->Open() && rtcpSocketM->Open(rtpSocketM->Port() + 1)) + if (rtpSocketM->Open(0, waitSocket) && + rtcpSocketM->Open(rtpSocketM->Port() + 1), waitSocket) break; rtpSocketM->Close(); rtcpSocketM->Close(); @@ -94,7 +97,7 @@ void cSatipTuner::Action(void) // Do the thread loop while (Running()) { if (reconnectM) { - info("SAT>IP Device %d timed out. Reconnecting.", deviceM->GetId()); + info("SAT>IP Device %d reconnecting.", deviceM->GetId()); cMutexLock MutexLock(&mutexM); if (tunedM) Disconnect(); @@ -130,20 +133,22 @@ void cSatipTuner::Action(void) signalQualityM = eDefaultSignalQuality; } - if (rtcpTimeout.TimedOut()) + if (rtcpTimeout.TimedOut()) { + error("No RTP Data received for %d ms [device %d], Reconnect initiated", + (int)rtcpTimeout.Elapsed(), deviceM->GetId()); + rtcpTimeout.Set(eReConnectTimeoutMs); reconnectM = true; + } int passedMs = dataThreadM.LastReceivedMs(); if (passedMs >= eReConnectTimeoutMs) { - error("No Data received for %d ms [device %d], Reconnect initiated", + error("No RTP Data received for %d ms [device %d], Reconnect initiated", (int)passedMs, deviceM->GetId()); dataThreadM.ResetLastReceivedMs(); reconnectM = true; } - - } - sleepM.Wait(10); // to avoid busy loop and reduce cpu load + sleepM.Wait(50); // to avoid busy loop and reduce cpu load } debug("cSatipTuner::%s(): exiting [device %d]", __FUNCTION__, deviceIdM); }