Switched to epoll().

This commit is contained in:
Rolf Ahrenberg 2014-11-09 20:32:08 +02:00
parent 2f723e0f66
commit ac4ab867d9
7 changed files with 135 additions and 55 deletions

View File

@ -80,3 +80,4 @@ VDR Plugin 'satip' Revision History
- Added support for SAT>IP frontend selection via - Added support for SAT>IP frontend selection via
Radio ID. Radio ID.
- Fixed EIT scan (Thanks to Stefan Schallenberg). - Fixed EIT scan (Thanks to Stefan Schallenberg).
- Refactored input thread to increase performance.

View File

@ -94,6 +94,35 @@ size_t cSatipDiscover::WriteCallback(char *ptrP, size_t sizeP, size_t nmembP, vo
return len; return len;
} }
int cSatipDiscover::DebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP)
{
cSatipDiscover *obj = reinterpret_cast<cSatipDiscover *>(userPtrP);
if (obj) {
switch (typeP) {
case CURLINFO_TEXT:
debug("cSatipDiscover::%s(): HTTP INFO %.*s", __FUNCTION__, (int)sizeP, dataP);
break;
case CURLINFO_HEADER_IN:
debug("cSatipDiscover::%s(): HTTP HEAD <<< %.*s", __FUNCTION__, (int)sizeP, dataP);
break;
case CURLINFO_HEADER_OUT:
debug("cSatipDiscover::%s(): HTTP HEAD >>>\n%.*s", __FUNCTION__, (int)sizeP, dataP);
break;
case CURLINFO_DATA_IN:
debug("cSatipDiscover::%s(): HTTP DATA <<< %.*s", __FUNCTION__, (int)sizeP, dataP);
break;
case CURLINFO_DATA_OUT:
debug("cSatipDiscover::%s(): HTTP DATA >>>\n%.*s", __FUNCTION__, (int)sizeP, dataP);
break;
default:
break;
}
}
return 0;
}
cSatipDiscover::cSatipDiscover() cSatipDiscover::cSatipDiscover()
: cThread("SAT>IP discover"), : cThread("SAT>IP discover"),
mutexM(), mutexM(),
@ -220,6 +249,8 @@ void cSatipDiscover::Read(void)
#ifdef DEBUG #ifdef DEBUG
// Verbose output // Verbose output
SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_VERBOSE, 1L); SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_VERBOSE, 1L);
SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_DEBUGFUNCTION, cSatipDiscover::DebugCallback);
SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_DEBUGDATA, this);
#endif #endif
// Set callback // Set callback
SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_WRITEFUNCTION, cSatipDiscover::WriteCallback); SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_WRITEFUNCTION, cSatipDiscover::WriteCallback);

View File

@ -47,6 +47,7 @@ private:
static const char *bcastAddressS; static const char *bcastAddressS;
static const char *bcastMessageS; static const char *bcastMessageS;
static size_t WriteCallback(char *ptrP, size_t sizeP, size_t nmembP, void *dataP); static size_t WriteCallback(char *ptrP, size_t sizeP, size_t nmembP, void *dataP);
static int DebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP);
cMutex mutexM; cMutex mutexM;
CURL *handleM; CURL *handleM;
cSatipSocket *socketM; cSatipSocket *socketM;

2
rtsp.h
View File

@ -24,7 +24,7 @@ private:
static int DebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP); static int DebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP);
enum { enum {
eConnectTimeoutMs = 1500, // in milliseconds eConnectTimeoutMs = 1500, // in milliseconds
}; };
cSatipTunerIf* tunerM; cSatipTunerIf* tunerM;

View File

@ -27,6 +27,7 @@ public:
~cSatipSocket(); ~cSatipSocket();
bool Open(const int portP = 0); bool Open(const int portP = 0);
void Close(void); void Close(void);
int Fd(void) { return socketDescM; }
int Port(void) { return socketPortM; } int Port(void) { return socketPortM; }
bool IsOpen(void) { return (socketDescM >= 0); } bool IsOpen(void) { return (socketDescM >= 0); }
bool Flush(void); bool Flush(void);

151
tuner.c
View File

@ -5,6 +5,8 @@
* *
*/ */
#include <sys/epoll.h>
#include "common.h" #include "common.h"
#include "config.h" #include "config.h"
#include "discover.h" #include "discover.h"
@ -28,6 +30,7 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP)
statusUpdateM(), statusUpdateM(),
pidUpdateCacheM(), pidUpdateCacheM(),
sessionM(""), sessionM(""),
fdM(epoll_create(eMaxFileDescriptors)),
timeoutM(eMinKeepAliveIntervalMs), timeoutM(eMinKeepAliveIntervalMs),
openedM(false), openedM(false),
tunedM(false), tunedM(false),
@ -50,7 +53,7 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP)
// Open sockets // Open sockets
int i = 100; int i = 100;
while (i-- > 0) { while (i-- > 0) {
if (rtpSocketM->Open() && rtcpSocketM->Open(rtpSocketM->Port() + 1)) if (rtpSocketM->Open(0) && rtcpSocketM->Open(rtpSocketM->Port() + 1))
break; break;
rtpSocketM->Close(); rtpSocketM->Close();
rtcpSocketM->Close(); rtcpSocketM->Close();
@ -59,6 +62,26 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP)
error("Cannot open required RTP/RTCP ports [device %d]", deviceIdM); error("Cannot open required RTP/RTCP ports [device %d]", deviceIdM);
} }
// Setup epoll
if (fdM >= 0) {
if (rtpSocketM->Fd() >= 0) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = rtpSocketM->Fd();
if (epoll_ctl(fdM, EPOLL_CTL_ADD, rtpSocketM->Fd(), &ev) == -1) {
error("Cannot add RTP socket into epoll [device %d]", deviceIdM);
}
}
if (rtcpSocketM->Fd() >= 0) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = rtcpSocketM->Fd();
if (epoll_ctl(fdM, EPOLL_CTL_ADD, rtcpSocketM->Fd(), &ev) == -1) {
error("Cannot add RTP socket into epoll [device %d]", deviceIdM);
}
}
}
// Start thread // Start thread
Start(); Start();
} }
@ -72,6 +95,17 @@ cSatipTuner::~cSatipTuner()
Cancel(3); Cancel(3);
Close(); Close();
// Cleanup epoll
if (fdM >= 0) {
if ((rtpSocketM->Fd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, rtpSocketM->Fd(), NULL) == -1)) {
error("Cannot remove RTP socket from epoll [device %d]", deviceIdM);
}
if ((rtcpSocketM->Fd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, rtcpSocketM->Fd(), NULL) == -1)) {
error("Cannot remove RTP socket from epoll [device %d]", deviceIdM);
}
close(fdM);
}
// Close the listening sockets // Close the listening sockets
rtpSocketM->Close(); rtpSocketM->Close();
rtcpSocketM->Close(); rtcpSocketM->Close();
@ -89,47 +123,53 @@ void cSatipTuner::Action(void)
// Increase priority // Increase priority
SetPriority(-1); SetPriority(-1);
// Do the thread loop // Do the thread loop
while (packetBufferM && Running()) { while (packetBufferM && rtpSocketM && rtcpSocketM && Running()) {
int length = -1; struct epoll_event events[eMaxFileDescriptors];
unsigned int size = min(deviceM->CheckData(), packetBufferLenM); int nfds = epoll_wait(fdM, events, eMaxFileDescriptors, eReadTimeoutMs);
if (tunedM && (size > 0)) { switch (nfds) {
// Update pids default:
UpdatePids(); for (int i = 0; i < nfds; ++i) {
// Remember the heart beat timeout.Set(eReConnectTimeoutMs);
KeepAlive(); if (events[i].data.fd == rtpSocketM->Fd()) {
// Read reception statistics via DESCRIBE and RTCP // Read data
ReadReceptionStatus(); int length = rtpSocketM->ReadVideo(packetBufferM, min(deviceM->CheckData(), packetBufferLenM));
if (rtcpSocketM && rtcpSocketM->IsOpen()) { if (length > 0) {
unsigned char buf[1450]; AddTunerStatistic(length);
memset(buf, 0, sizeof(buf)); deviceM->WriteData(packetBufferM, length);
if (rtcpSocketM->ReadApplication(buf, sizeof(buf)) > 0) { }
ParseReceptionParameters((const char *)buf); }
timeout.Set(eReConnectTimeoutMs); else if (events[i].data.fd == rtcpSocketM->Fd()) {
} unsigned char buf[1450];
} memset(buf, 0, sizeof(buf));
// Quirk for devices without valid reception data if (rtcpSocketM->ReadApplication(buf, sizeof(buf)) > 0)
if (currentServerM && currentServerM->Quirk(cSatipServer::eSatipQuirkForceLock)) { ParseReceptionParameters((const char *)buf);
hasLockM = true; }
signalStrengthM = eDefaultSignalStrength; }
signalQualityM = eDefaultSignalQuality; // fall through!
} case 0:
// Read data // Update pids
if (rtpSocketM && rtpSocketM->IsOpen()) UpdatePids();
length = rtpSocketM->ReadVideo(packetBufferM, size); // Remember the heart beat
} KeepAlive();
if (length > 0) { // Read reception statistics via DESCRIBE and RTCP
AddTunerStatistic(length); if (ReadReceptionStatus()) {
deviceM->WriteData(packetBufferM, length); // Quirk for devices without valid reception data
timeout.Set(eReConnectTimeoutMs); if (currentServerM && currentServerM->Quirk(cSatipServer::eSatipQuirkForceLock)) {
} hasLockM = true;
else { signalStrengthM = eDefaultSignalStrength;
// Reconnect if necessary signalQualityM = eDefaultSignalQuality;
if (openedM && timeout.TimedOut()) { }
Disconnect(); }
Connect(); // Reconnect if necessary
timeout.Set(eReConnectTimeoutMs); if (openedM && timeout.TimedOut()) {
} Disconnect();
sleepM.Wait(10); // to avoid busy loop and reduce cpu load Connect();
timeout.Set(eReConnectTimeoutMs);
}
break;
case -1:
error("epoll_wait() failed");
break;
} }
} }
debug("cSatipTuner::%s(): exiting [device %d]", __FUNCTION__, deviceIdM); debug("cSatipTuner::%s(): exiting [device %d]", __FUNCTION__, deviceIdM);
@ -167,17 +207,21 @@ bool cSatipTuner::Connect(void)
return openedM; return openedM;
} }
keepAliveM.Set(timeoutM); keepAliveM.Set(timeoutM);
openedM = rtspM->Options(*connectionUri) && rtspM->Setup(*uri, rtpSocketM->Port(), rtcpSocketM->Port()); openedM = rtspM->Options(*connectionUri);
if (openedM) { if (openedM) {
if (nextServerM && nextServerM->Quirk(cSatipServer::eSatipQuirkSessionId)) if (nextServerM && nextServerM->Quirk(cSatipServer::eSatipQuirkSessionId))
rtspM->SetSession(SkipZeroes(*sessionM)); rtspM->SetSession(SkipZeroes(*sessionM));
tunedM = true; if (rtspM->Setup(*uri, rtpSocketM->Port(), rtcpSocketM->Port())) {
UpdatePids(true); tunedM = true;
if (nextServerM) { UpdatePids(true);
cSatipDiscover::GetInstance()->UseServer(nextServerM, true); if (nextServerM) {
currentServerM = nextServerM; cSatipDiscover::GetInstance()->UseServer(nextServerM, true);
nextServerM = NULL; currentServerM = nextServerM;
nextServerM = NULL;
}
} }
else
openedM = false;
} }
return openedM; return openedM;
@ -288,10 +332,10 @@ bool cSatipTuner::SetSource(cSatipServer *serverP, const char *parameterP, const
cMutexLock MutexLock(&mutexM); cMutexLock MutexLock(&mutexM);
if (serverP) { if (serverP) {
nextServerM = cSatipDiscover::GetInstance()->GetServer(serverP); nextServerM = cSatipDiscover::GetInstance()->GetServer(serverP);
if (nextServerM && !isempty(nextServerM->Address()) && !isempty(parameterP)) { if (rtspM && nextServerM && !isempty(nextServerM->Address()) && !isempty(parameterP)) {
// Update stream address and parameter // Update stream address and parameter
streamAddrM = rtspM->RtspUnescapeString(nextServerM->Address()); streamAddrM = rtspM->RtspUnescapeString(nextServerM->Address());
streamParamM = parameterP; streamParamM = rtspM->RtspUnescapeString(parameterP);
// Reconnect // Reconnect
Connect(); Connect();
} }
@ -363,10 +407,9 @@ bool cSatipTuner::KeepAlive(void)
cMutexLock MutexLock(&mutexM); cMutexLock MutexLock(&mutexM);
if (tunedM && keepAliveM.TimedOut()) { if (tunedM && keepAliveM.TimedOut()) {
cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM); cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM);
if (rtspM->Options(*uri)) { keepAliveM.Set(timeoutM);
keepAliveM.Set(timeoutM); if (rtspM->Options(*uri))
return true; return true;
}
Disconnect(); Disconnect();
} }

View File

@ -46,9 +46,11 @@ public:
class cSatipTuner : public cThread, public cSatipTunerStatistics, public cSatipTunerIf { class cSatipTuner : public cThread, public cSatipTunerStatistics, public cSatipTunerIf {
private: private:
enum { enum {
eMaxFileDescriptors = 2, // RTP + RTCP
eDummyPid = 100, eDummyPid = 100,
eDefaultSignalStrength = 15, eDefaultSignalStrength = 15,
eDefaultSignalQuality = 224, eDefaultSignalQuality = 224,
eReadTimeoutMs = 500, // in milliseconds
eStatusUpdateTimeoutMs = 1000, // in milliseconds eStatusUpdateTimeoutMs = 1000, // in milliseconds
eConnectTimeoutMs = 1500, // in milliseconds eConnectTimeoutMs = 1500, // in milliseconds
ePidUpdateIntervalMs = 250, // in milliseconds ePidUpdateIntervalMs = 250, // in milliseconds
@ -74,6 +76,7 @@ private:
cTimeMs signalInfoCacheM; cTimeMs signalInfoCacheM;
cTimeMs pidUpdateCacheM; cTimeMs pidUpdateCacheM;
cString sessionM; cString sessionM;
int fdM;
int timeoutM; int timeoutM;
bool openedM; bool openedM;
bool tunedM; bool tunedM;