1
0
mirror of https://github.com/rofafor/vdr-plugin-satip.git synced 2023-10-10 13:37:42 +02:00

Merge pull request #3 from rofafor/poller

Refactored poller implementation.
This commit is contained in:
Rolf Ahrenberg 2014-11-19 22:17:48 +02:00
commit 0896df33e6
28 changed files with 890 additions and 480 deletions

3
.gitignore vendored
View File

@ -4,3 +4,6 @@
*~
po/*.pot
po/*.mo
.settings
.cproject
.project

View File

@ -65,7 +65,7 @@ VDR Plugin 'satip' Revision History
- Added a validity check for the session member.
- Added a session id quirk for Triax TSS 400.
2014-11-11: Version 0.3.4
2014-11-22: Version 1.0.0
- Fixed the cable only device detection.
- Added support for blacklisted sources.

View File

@ -88,8 +88,9 @@ all-redirect: all
### The object files (add further files here):
OBJS = $(PLUGIN).o common.o config.o device.o discover.o param.o rtsp.o \
sectionfilter.o server.o setup.o socket.o statistics.o tuner.o
OBJS = $(PLUGIN).o common.o config.o device.o discover.o msearch.o param.o \
poller.o rtp.o rtcp.o rtsp.o sectionfilter.o server.o setup.o socket.o \
statistics.o tuner.o
### The main target:

View File

@ -8,6 +8,7 @@
#ifndef __SATIP_COMMON_H
#define __SATIP_COMMON_H
#include <vdr/device.h>
#include <vdr/tools.h>
#include <vdr/config.h>
#include <vdr/i18n.h>
@ -24,6 +25,8 @@
#define ELEMENTS(x) (sizeof(x) / sizeof(x[0]))
#define SATIP_MAX_DEVICES MAXDEVICES
#define SATIP_BUFFER_SIZE KILOBYTE(512)
#define SATIP_DEVICE_INFO_ALL 0

View File

@ -12,8 +12,6 @@
#include "param.h"
#include "device.h"
#define SATIP_MAX_DEVICES MAXDEVICES
static cSatipDevice * SatipDevicesS[SATIP_MAX_DEVICES] = { NULL };
cSatipDevice::cSatipDevice(unsigned int indexP)

View File

@ -18,13 +18,6 @@
cSatipDiscover *cSatipDiscover::instanceS = NULL;
const char *cSatipDiscover::bcastAddressS = "239.255.255.250";
const char *cSatipDiscover::bcastMessageS = "M-SEARCH * HTTP/1.1\r\n" \
"HOST: 239.255.255.250:1900\r\n" \
"MAN: \"ssdp:discover\"\r\n" \
"ST: urn:ses-com:device:SatIPServer:1\r\n" \
"MX: 2\r\n\r\n";
cSatipDiscover *cSatipDiscover::GetInstance(void)
{
if (!instanceS)
@ -126,11 +119,12 @@ int cSatipDiscover::DebugCallback(CURL *handleP, curl_infotype typeP, char *data
cSatipDiscover::cSatipDiscover()
: cThread("SAT>IP discover"),
mutexM(),
msearchM(),
probeUrlListM(),
handleM(curl_easy_init()),
socketM(new cSatipSocket()),
sleepM(),
probeIntervalM(0),
serversM(new cSatipServers())
serversM()
{
debug("cSatipDiscover::%s()", __FUNCTION__);
}
@ -141,11 +135,10 @@ cSatipDiscover::~cSatipDiscover()
Deactivate();
cMutexLock MutexLock(&mutexM);
// Free allocated memory
DELETENULL(socketM);
DELETENULL(serversM);
if (handleM)
curl_easy_cleanup(handleM);
handleM = NULL;
probeUrlListM.Clear();
}
void cSatipDiscover::Activate(void)
@ -166,84 +159,49 @@ void cSatipDiscover::Deactivate(void)
void cSatipDiscover::Action(void)
{
debug("cSatipDiscover::%s(): entering", __FUNCTION__);
probeIntervalM.Set(eProbeIntervalMs);
msearchM.Probe();
// Do the thread loop
while (Running()) {
cStringList tmp;
if (probeIntervalM.TimedOut()) {
probeIntervalM.Set(eProbeIntervalMs);
Probe();
Janitor();
msearchM.Probe();
mutexM.Lock();
serversM.Cleanup(eProbeIntervalMs * 2);
mutexM.Unlock();
}
mutexM.Lock();
if (probeUrlListM.Size()) {
for (int i = 0; i < probeUrlListM.Size(); ++i)
tmp.Insert(strdup(probeUrlListM.At(i)));
probeUrlListM.Clear();
}
mutexM.Unlock();
if (tmp.Size()) {
for (int i = 0; i < tmp.Size(); ++i)
Fetch(tmp.At(i));
tmp.Clear();
}
// to avoid busy loop and reduce cpu load
sleepM.Wait(10);
sleepM.Wait(eSleepTimeoutMs);
}
debug("cSatipDiscover::%s(): exiting", __FUNCTION__);
}
void cSatipDiscover::Janitor(void)
void cSatipDiscover::Probe(const char *urlP)
{
debug("cSatipDiscover::%s()", __FUNCTION__);
debug("cSatipDiscover::%s(%s)", __FUNCTION__, urlP);
cMutexLock MutexLock(&mutexM);
if (serversM)
serversM->Cleanup(eProbeIntervalMs * 2);
probeUrlListM.Insert(strdup(urlP));
sleepM.Signal();
}
void cSatipDiscover::Probe(void)
void cSatipDiscover::Fetch(const char *urlP)
{
debug("cSatipDiscover::%s()", __FUNCTION__);
if (socketM && socketM->Open(eDiscoveryPort)) {
cTimeMs timeout(eProbeTimeoutMs);
socketM->Write(bcastAddressS, reinterpret_cast<const unsigned char *>(bcastMessageS), strlen(bcastMessageS));
while (Running() && !timeout.TimedOut()) {
Read();
// to avoid busy loop and reduce cpu load
sleepM.Wait(100);
}
socketM->Close();
}
}
void cSatipDiscover::Read(void)
{
//debug("cSatipDiscover::%s()", __FUNCTION__);
if (socketM) {
unsigned char *buf = MALLOC(unsigned char, eProbeBufferSize + 1);
if (buf) {
memset(buf, 0, eProbeBufferSize + 1);
int len = socketM->Read(buf, eProbeBufferSize);
if (len > 0) {
//debug("cSatipDiscover::%s(): len=%d", __FUNCTION__, len);
bool status = false, valid = false;
char *s, *p = reinterpret_cast<char *>(buf), *location = NULL;
char *r = strtok_r(p, "\r\n", &s);
while (r) {
//debug("cSatipDiscover::%s(): %s", __FUNCTION__, r);
// Check the status code
// HTTP/1.1 200 OK
if (!status && startswith(r, "HTTP/1.1 200 OK")) {
status = true;
}
if (status) {
// Check the location data
// LOCATION: http://192.168.0.115:8888/octonet.xml
if (startswith(r, "LOCATION:")) {
location = compactspace(r + 9);
debug("cSatipDiscover::%s(): location='%s'", __FUNCTION__, location);
}
// Check the source type
// ST: urn:ses-com:device:SatIPServer:1
else if (startswith(r, "ST:")) {
char *st = compactspace(r + 3);
if (strstr(st, "urn:ses-com:device:SatIPServer:1"))
valid = true;
debug("cSatipDiscover::%s(): st='%s'", __FUNCTION__, st);
}
// Check whether all the required data is found
if (valid && !isempty(location))
break;
}
r = strtok_r(NULL, "\r\n", &s);
}
if (handleM && valid && !isempty(location)) {
debug("cSatipDiscover::%s(%s)", __FUNCTION__, urlP);
if (handleM && !isempty(urlP)) {
long rc = 0;
CURLcode res = CURLE_OK;
#ifdef DEBUG
@ -268,7 +226,7 @@ void cSatipDiscover::Read(void)
SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_USERAGENT, *cString::sprintf("vdr-%s/%s", PLUGIN_NAME_I18N, VERSION));
// Set URL
SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_URL, location);
SATIP_CURL_EASY_SETOPT(handleM, CURLOPT_URL, urlP);
// Fetch the data
SATIP_CURL_EASY_PERFORM(handleM);
@ -277,88 +235,80 @@ void cSatipDiscover::Read(void)
error("Discovery detected invalid status code: %ld", rc);
}
}
free(buf);
}
}
}
void cSatipDiscover::AddServer(const char *addrP, const char *modelP, const char * descP)
{
debug("cSatipDiscover::%s(%s, %s, %s)", __FUNCTION__, addrP, modelP, descP);
cMutexLock MutexLock(&mutexM);
if (serversM) {
cSatipServer *tmp = new cSatipServer(addrP, modelP, descP);
// Validate against existing servers
if (!serversM->Update(tmp)) {
if (!serversM.Update(tmp)) {
info("Adding device '%s|%s|%s'", tmp->Address(), tmp->Model(), tmp->Description());
serversM->Add(tmp);
serversM.Add(tmp);
}
else
DELETENULL(tmp);
}
}
int cSatipDiscover::GetServerCount(void)
{
//debug("cSatipDiscover::%s()", __FUNCTION__);
cMutexLock MutexLock(&mutexM);
return serversM ? serversM->Count() : -1;
return serversM.Count();
}
cSatipServer *cSatipDiscover::GetServer(int sourceP, int transponderP, int systemP)
{
//debug("cSatipDiscover::%s(%d, %d, %d)", __FUNCTION__, sourceP, transponderP, systemP);
cMutexLock MutexLock(&mutexM);
return serversM ? serversM->Find(sourceP, transponderP, systemP) : NULL;
return serversM.Find(sourceP, transponderP, systemP);
}
cSatipServer *cSatipDiscover::GetServer(cSatipServer *serverP)
{
//debug("cSatipDiscover::%s()", __FUNCTION__);
cMutexLock MutexLock(&mutexM);
return serversM ? serversM->Find(serverP) : NULL;
return serversM.Find(serverP);
}
cSatipServers *cSatipDiscover::GetServers(void)
{
//debug("cSatipDiscover::%s()", __FUNCTION__);
cMutexLock MutexLock(&mutexM);
return serversM;
return &serversM;
}
cString cSatipDiscover::GetServerString(cSatipServer *serverP)
{
//debug("cSatipDiscover::%s()", __FUNCTION__);
cMutexLock MutexLock(&mutexM);
return serversM ? serversM->GetString(serverP) : "";
return serversM.GetString(serverP);
}
cString cSatipDiscover::GetServerList(void)
{
//debug("cSatipDiscover::%s()", __FUNCTION__);
cMutexLock MutexLock(&mutexM);
return serversM ? serversM->List() : "";
return serversM.List();
}
void cSatipDiscover::SetTransponder(cSatipServer *serverP, int transponderP)
{
//debug("cSatipDiscover::%s(%d)", __FUNCTION__, transponderP);
cMutexLock MutexLock(&mutexM);
if (serversM)
serversM->SetTransponder(serverP, transponderP);
serversM.SetTransponder(serverP, transponderP);
}
void cSatipDiscover::UseServer(cSatipServer *serverP, bool onOffP)
{
//debug("cSatipDiscover::%s(%d)", __FUNCTION__, onOffP);
cMutexLock MutexLock(&mutexM);
if (serversM)
serversM->Use(serverP, onOffP);
serversM.Use(serverP, onOffP);
}
int cSatipDiscover::NumProvidedSystems(void)
{
//debug("cSatipDiscover::%s()", __FUNCTION__);
cMutexLock MutexLock(&mutexM);
return serversM ? serversM->NumProvidedSystems() : 0;
return serversM.NumProvidedSystems();
}

View File

@ -13,6 +13,7 @@
#include <vdr/thread.h>
#include <vdr/tools.h>
#include "msearch.h"
#include "server.h"
#include "socket.h"
@ -37,29 +38,25 @@ class cSatipDiscoverServers : public cList<cSatipDiscoverServer> {
class cSatipDiscover : public cThread {
private:
enum {
eSleepTimeoutMs = 500, // in milliseconds
eConnectTimeoutMs = 1500, // in milliseconds
eDiscoveryPort = 1900,
eProbeBufferSize = 1024, // in bytes
eProbeTimeoutMs = 2000, // in milliseconds
eProbeIntervalMs = 60000 // in milliseconds
};
static cSatipDiscover *instanceS;
static const char *bcastAddressS;
static const char *bcastMessageS;
static size_t WriteCallback(char *ptrP, size_t sizeP, size_t nmembP, void *dataP);
static int DebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, size_t sizeP, void *userPtrP);
cMutex mutexM;
cSatipMsearch msearchM;
cStringList probeUrlListM;
CURL *handleM;
cSatipSocket *socketM;
cCondWait sleepM;
cTimeMs probeIntervalM;
cSatipServers *serversM;
cSatipServers serversM;
void Activate(void);
void Deactivate(void);
void Janitor(void);
void Probe(void);
void Read(void);
void AddServer(const char *addrP, const char *modelP, const char *descP);
void Fetch(const char *urlP);
// constructor
cSatipDiscover();
// to prevent copy constructor and assignment
@ -74,6 +71,7 @@ public:
static bool Initialize(cSatipDiscoverServers *serversP);
static void Destroy(void);
virtual ~cSatipDiscover();
void Probe(const char *urlP);
void TriggerScan(void) { probeIntervalM.Set(0); }
int GetServerCount(void);
cSatipServer *GetServer(int sourceP, int transponderP = 0, int systemP = -1);

94
msearch.c Normal file
View File

@ -0,0 +1,94 @@
/*
* msearch.c: SAT>IP plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#include "common.h"
#include "discover.h"
#include "poller.h"
#include "msearch.h"
const char *cSatipMsearch::bcastAddressS = "239.255.255.250";
const char *cSatipMsearch::bcastMessageS = "M-SEARCH * HTTP/1.1\r\n" \
"HOST: 239.255.255.250:1900\r\n" \
"MAN: \"ssdp:discover\"\r\n" \
"ST: urn:ses-com:device:SatIPServer:1\r\n" \
"MX: 2\r\n\r\n";
cSatipMsearch::cSatipMsearch(void)
: bufferLenM(eProbeBufferSize),
bufferM(MALLOC(unsigned char, bufferLenM)),
registeredM(false)
{
if (bufferM)
memset(bufferM, 0, bufferLenM);
else
error("Cannot create Msearch buffer!");
if (!Open(eDiscoveryPort))
error("Cannot open Msearch port!");
}
cSatipMsearch::~cSatipMsearch()
{
}
void cSatipMsearch::Probe(void)
{
debug("cSatipMsearch::%s()", __FUNCTION__);
if (!registeredM) {
cSatipPoller::GetInstance()->Register(*this);
registeredM = true;
}
Write(bcastAddressS, reinterpret_cast<const unsigned char *>(bcastMessageS), strlen(bcastMessageS));
}
int cSatipMsearch::GetFd(void)
{
return Fd();
}
void cSatipMsearch::Process(int fdP)
{
//debug("cSatipMsearch::%s()", __FUNCTION__);
if (bufferM) {
int length = Read(bufferM, bufferLenM);
if (length > 0) {
bufferM[min(length, int(bufferLenM - 1))] = 0;
//debug("cSatipMsearch::%s(): len=%d buf=%s", __FUNCTION__, length, bufferM);
bool status = false, valid = false;
char *s, *p = reinterpret_cast<char *>(bufferM), *location = NULL;
char *r = strtok_r(p, "\r\n", &s);
while (r) {
//debug("cSatipMsearch::%s(): %s", __FUNCTION__, r);
// Check the status code
// HTTP/1.1 200 OK
if (!status && startswith(r, "HTTP/1.1 200 OK"))
status = true;
if (status) {
// Check the location data
// LOCATION: http://192.168.0.115:8888/octonet.xml
if (startswith(r, "LOCATION:")) {
location = compactspace(r + 9);
debug("cSatipMsearch::%s(): location='%s'", __FUNCTION__, location);
}
// Check the source type
// ST: urn:ses-com:device:SatIPServer:1
else if (startswith(r, "ST:")) {
char *st = compactspace(r + 3);
if (strstr(st, "urn:ses-com:device:SatIPServer:1"))
valid = true;
debug("cSatipMsearch::%s(): st='%s'", __FUNCTION__, st);
}
// Check whether all the required data is found
if (valid && !isempty(location)) {
cSatipDiscover::GetInstance()->Probe(location);
break;
}
}
r = strtok_r(NULL, "\r\n", &s);
}
}
}
}

37
msearch.h Normal file
View File

@ -0,0 +1,37 @@
/*
* msearch.h: SAT>IP plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#ifndef __SATIP_MSEARCH_H_
#define __SATIP_MSEARCH_H_
#include "socket.h"
#include "pollerif.h"
class cSatipMsearch : public cSatipSocket, public cSatipPollerIf {
private:
enum {
eProbeBufferSize = 1024, // in bytes
eDiscoveryPort = 1900,
};
static const char *bcastAddressS;
static const char *bcastMessageS;
unsigned int bufferLenM;
unsigned char *bufferM;
bool registeredM;
public:
cSatipMsearch(void);
virtual ~cSatipMsearch();
void Probe(void);
// for internal poller interface
public:
virtual int GetFd(void);
virtual void Process(int fdP);
};
#endif /* __SATIP_MSEARCH_H_ */

View File

@ -5,10 +5,10 @@
#
msgid ""
msgstr ""
"Project-Id-Version: vdr-satip 0.3.4\n"
"Project-Id-Version: vdr-satip 1.0.0\n"
"Report-Msgid-Bugs-To: <see README>\n"
"POT-Creation-Date: 2014-11-11 11:11+0200\n"
"PO-Revision-Date: 2014-11-11 11:11+0200\n"
"POT-Creation-Date: 2014-11-22 11:22+0200\n"
"PO-Revision-Date: 2014-11-22 11:22+0200\n"
"Last-Translator: Gabriel Bonich <gbonich@gmail.com>\n"
"Language-Team: Catalan <vdr@linuxtv.org>\n"
"Language: ca\n"

View File

@ -5,10 +5,10 @@
#
msgid ""
msgstr ""
"Project-Id-Version: vdr-satip 0.3.4\n"
"Project-Id-Version: vdr-satip 1.0.0\n"
"Report-Msgid-Bugs-To: <see README>\n"
"POT-Creation-Date: 2014-11-11 11:11+0200\n"
"PO-Revision-Date: 2014-11-11 11:11+0200\n"
"POT-Creation-Date: 2014-11-22 11:22+0200\n"
"PO-Revision-Date: 2014-11-22 11:22+0200\n"
"Last-Translator: Frank Neumann <fnu@yavdr.org>\n"
"Language-Team: German <vdr@linuxtv.org>\n"
"Language: de\n"

View File

@ -5,10 +5,10 @@
#
msgid ""
msgstr ""
"Project-Id-Version: vdr-satip 0.3.4\n"
"Project-Id-Version: vdr-satip 1.0.0\n"
"Report-Msgid-Bugs-To: <see README>\n"
"POT-Creation-Date: 2014-11-11 11:11+0200\n"
"PO-Revision-Date: 2014-11-11 11:11+0200\n"
"POT-Creation-Date: 2014-11-22 11:22+0200\n"
"PO-Revision-Date: 2014-11-22 11:22+0200\n"
"Last-Translator: Gabriel Bonich <gbonich@gmail.com>\n"
"Language-Team: Spanish <vdr@linuxtv.org>\n"
"Language: es\n"

View File

@ -5,10 +5,10 @@
#
msgid ""
msgstr ""
"Project-Id-Version: vdr-satip 0.3.4\n"
"Project-Id-Version: vdr-satip 1.0.0\n"
"Report-Msgid-Bugs-To: <see README>\n"
"POT-Creation-Date: 2014-11-11 11:11+0200\n"
"PO-Revision-Date: 2014-11-11 11:11+0200\n"
"POT-Creation-Date: 2014-11-22 11:22+0200\n"
"PO-Revision-Date: 2014-11-22 11:22+0200\n"
"Last-Translator: Rolf Ahrenberg\n"
"Language-Team: Finnish <vdr@linuxtv.org>\n"
"Language: fi\n"

109
poller.c Normal file
View File

@ -0,0 +1,109 @@
/*
* poller.c: SAT>IP plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#include <sys/epoll.h>
#include "common.h"
#include "poller.h"
cSatipPoller *cSatipPoller::instanceS = NULL;
cSatipPoller *cSatipPoller::GetInstance(void)
{
if (!instanceS)
instanceS = new cSatipPoller();
return instanceS;
}
bool cSatipPoller::Initialize(void)
{
debug("cSatipPoller::%s()", __FUNCTION__);
if (instanceS)
instanceS->Activate();
return true;
}
void cSatipPoller::Destroy(void)
{
debug("cSatipPoller::%s()", __FUNCTION__);
if (instanceS)
instanceS->Deactivate();
}
cSatipPoller::cSatipPoller()
: cThread("SAT>IP poller"),
mutexM(),
fdM(epoll_create(eMaxFileDescriptors))
{
debug("cSatipPoller::%s()", __FUNCTION__);
}
cSatipPoller::~cSatipPoller()
{
debug("cSatipPoller::%s()", __FUNCTION__);
Deactivate();
cMutexLock MutexLock(&mutexM);
close(fdM);
// Free allocated memory
}
void cSatipPoller::Activate(void)
{
// Start the thread
Start();
}
void cSatipPoller::Deactivate(void)
{
debug("cSatipPoller::%s()", __FUNCTION__);
cMutexLock MutexLock(&mutexM);
if (Running())
Cancel(3);
}
void cSatipPoller::Action(void)
{
debug("cSatipPoller::%s(): entering", __FUNCTION__);
struct epoll_event events[eMaxFileDescriptors];
// Increase priority
SetPriority(-1);
// Do the thread loop
while (Running()) {
int nfds = epoll_wait(fdM, events, eMaxFileDescriptors, -1);
ERROR_IF_FUNC((nfds == -1), "epoll_wait() failed", break, ;);
for (int i = 0; i < nfds; ++i) {
cSatipPollerIf* poll = reinterpret_cast<cSatipPollerIf *>(events[i].data.ptr);
if (poll)
poll->Process(events[i].events);
}
}
debug("cSatipPoller::%s(): exiting", __FUNCTION__);
}
bool cSatipPoller::Register(cSatipPollerIf &pollerP)
{
debug("cSatipPoller::%s(%d)", __FUNCTION__, pollerP.GetFd());
cMutexLock MutexLock(&mutexM);
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = &pollerP;
ERROR_IF_RET(epoll_ctl(fdM, EPOLL_CTL_ADD, pollerP.GetFd(), &ev) == -1, "epoll_ctl(EPOLL_CTL_ADD) failed", return false);
debug("cSatipPoller::%s(%d): Added interface", __FUNCTION__, pollerP.GetFd());
return true;
}
bool cSatipPoller::Unregister(cSatipPollerIf &pollerP)
{
debug("cSatipPoller::%s(%d)", __FUNCTION__, pollerP.GetFd());
cMutexLock MutexLock(&mutexM);
ERROR_IF_RET((epoll_ctl(fdM, EPOLL_CTL_DEL, pollerP.GetFd(), NULL) == -1), "epoll_ctl(EPOLL_CTL_DEL) failed", return false);
debug("cSatipPoller::%s(%d): Removed interface", __FUNCTION__, pollerP.GetFd());
return true;
}

44
poller.h Normal file
View File

@ -0,0 +1,44 @@
/*
* poller.h: SAT>IP plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#ifndef __SATIP_POLLER_H
#define __SATIP_POLLER_H
#include <vdr/thread.h>
#include <vdr/tools.h>
#include "pollerif.h"
class cSatipPoller : public cThread {
private:
enum {
eMaxFileDescriptors = SATIP_MAX_DEVICES * 2, // Data + Application
};
static cSatipPoller *instanceS;
cMutex mutexM;
int fdM;
void Activate(void);
void Deactivate(void);
// constructor
cSatipPoller();
// to prevent copy constructor and assignment
cSatipPoller(const cSatipPoller&);
cSatipPoller& operator=(const cSatipPoller&);
protected:
virtual void Action(void);
public:
static cSatipPoller *GetInstance(void);
static bool Initialize(void);
static void Destroy(void);
virtual ~cSatipPoller();
bool Register(cSatipPollerIf &pollerP);
bool Unregister(cSatipPollerIf &pollerP);
};
#endif // __SATIP_POLLER_H

23
pollerif.h Normal file
View File

@ -0,0 +1,23 @@
/*
* pollerif.h: SAT>IP plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#ifndef __SATIP_POLLERIF_H
#define __SATIP_POLLERIF_H
class cSatipPollerIf {
public:
cSatipPollerIf() {}
virtual ~cSatipPollerIf() {}
virtual int GetFd(void) = 0;
virtual void Process(int fdP) = 0;
private:
cSatipPollerIf(const cSatipPollerIf&);
cSatipPollerIf& operator=(const cSatipPollerIf&);
};
#endif // __SATIP_POLLERIF_H

88
rtcp.c Normal file
View File

@ -0,0 +1,88 @@
/*
* rtcp.c: SAT>IP plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#include "common.h"
#include "rtcp.h"
cSatipRtcp::cSatipRtcp(cSatipTunerIf &tunerP, unsigned int bufferLenP)
: tunerM(tunerP),
bufferLenM(bufferLenP),
bufferM(MALLOC(unsigned char, bufferLenM))
{
if (bufferM)
memset(bufferM, 0, bufferLenM);
else
error("Cannot create RTCP buffer!");
}
cSatipRtcp::~cSatipRtcp()
{
DELETE_POINTER(bufferM);
}
int cSatipRtcp::GetFd(void)
{
return Fd();
}
int cSatipRtcp::GetApplicationOffset(int *lengthP)
{
//debug("cSatipRtcp::%s()", __FUNCTION__);
if (!lengthP)
return -1;
int offset = 0;
int total = *lengthP;
while (total > 0) {
// Version
unsigned int v = (bufferM[offset] >> 6) & 0x03;
// Padding
//unsigned int p = (bufferM[offset] >> 5) & 0x01;
// Subtype
//unsigned int st = bufferM[offset] & 0x1F;
// Payload type
unsigned int pt = bufferM[offset + 1] & 0xFF;
// Lenght
unsigned int length = ((bufferM[offset + 2] & 0xFF) << 8) | (bufferM[offset + 3] & 0xFF);
// Convert it to bytes
length = (length + 1) * 4;
// V=2, APP = 204
if ((v == 2) && (pt == 204)) {
// SSCR/CSCR
//unsigned int ssrc = ((bufferM[offset + 4] & 0xFF) << 24) | ((bufferM[offset + 5] & 0xFF) << 16) |
// ((bufferM[offset + 6] & 0xFF) << 8) | (bufferM[offset + 7] & 0xFF);
// Name
if ((bufferM[offset + 8] == 'S') && (bufferM[offset + 9] == 'E') &&
(bufferM[offset + 10] == 'S') && (bufferM[offset + 11] == '1')) {
// Identifier
//unsigned int id = ((bufferM[offset + 12] & 0xFF) << 8) | (bufferM[offset + 13] & 0xFF);
// String length
int string_length = ((bufferM[offset + 14] & 0xFF) << 8) | (bufferM[offset + 15] & 0xFF);
if (string_length > 0) {
*lengthP = string_length;
return (offset + 16);
}
}
}
offset += length;
total -= length;
}
*lengthP = 0;
return -1;
}
void cSatipRtcp::Process(int fdP)
{
//debug("cSatipRtcp::%s(%d)", __FUNCTION__, fdP);
if (bufferM) {
int length = Read(bufferM, bufferLenM);
if (length > 0) {
int offset = GetApplicationOffset(&length);
if (offset >= 0)
tunerM.ProcessApplicationData(bufferM + offset, length);
}
}
}

32
rtcp.h Normal file
View File

@ -0,0 +1,32 @@
/*
* rtcp.h: SAT>IP plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#ifndef __SATIP_RTCP_H_
#define __SATIP_RTCP_H_
#include "socket.h"
#include "tunerif.h"
#include "pollerif.h"
class cSatipRtcp : public cSatipSocket, public cSatipPollerIf {
private:
cSatipTunerIf &tunerM;
unsigned int bufferLenM;
unsigned char *bufferM;
int GetApplicationOffset(int *lenghtP);
public:
cSatipRtcp(cSatipTunerIf &tunerP, unsigned int bufferLenP);
virtual ~cSatipRtcp();
// for internal poller interface
public:
virtual int GetFd(void);
virtual void Process(int fdP);
};
#endif /* __SATIP_RTCP_H_ */

114
rtp.c Normal file
View File

@ -0,0 +1,114 @@
/*
* rtp.c: SAT>IP plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#include "common.h"
#include "rtp.h"
cSatipRtp::cSatipRtp(cSatipTunerIf &tunerP, unsigned int bufferLenP)
: tunerM(tunerP),
bufferLenM(bufferLenP),
bufferM(MALLOC(unsigned char, bufferLenM)),
lastErrorReportM(0),
packetErrorsM(0),
sequenceNumberM(-1)
{
if (bufferM)
memset(bufferM, 0, bufferLenM);
else
error("Cannot create RTP buffer!");
}
cSatipRtp::~cSatipRtp()
{
DELETE_POINTER(bufferM);
}
int cSatipRtp::GetFd(void)
{
return Fd();
}
void cSatipRtp::Close(void)
{
debug("cSatipRtp::%s(%d)", __FUNCTION__, GetFd());
cSatipSocket::Close();
sequenceNumberM = -1;
if (packetErrorsM) {
info("Detected %d RTP packet errors", packetErrorsM);
packetErrorsM = 0;
lastErrorReportM = time(NULL);
}
}
int cSatipRtp::GetHeaderLenght(int lengthP)
{
//debug("cSatipRtp::%s()", __FUNCTION__);
unsigned int headerlen = 0;
if (lengthP > 0) {
if (bufferM[0] == TS_SYNC_BYTE)
return headerlen;
else if (lengthP > 3) {
// http://tools.ietf.org/html/rfc3550
// http://tools.ietf.org/html/rfc2250
// Version
unsigned int v = (bufferM[0] >> 6) & 0x03;
// Extension bit
unsigned int x = (bufferM[0] >> 4) & 0x01;
// CSCR count
unsigned int cc = bufferM[0] & 0x0F;
// Payload type: MPEG2 TS = 33
//unsigned int pt = bufferAddrP[1] & 0x7F;
// Sequence number
int seq = ((bufferM[2] & 0xFF) << 8) | (bufferM[3] & 0xFF);
if ((((sequenceNumberM + 1) % 0xFFFF) == 0) && (seq == 0xFFFF))
sequenceNumberM = -1;
else if ((sequenceNumberM >= 0) && (((sequenceNumberM + 1) % 0xFFFF) != seq)) {
packetErrorsM++;
if (time(NULL) - lastErrorReportM > eReportIntervalS) {
info("Detected %d RTP packet errors", packetErrorsM);
packetErrorsM = 0;
lastErrorReportM = time(NULL);
}
sequenceNumberM = seq;
}
else
sequenceNumberM = seq;
// Header lenght
headerlen = (3 + cc) * (unsigned int)sizeof(uint32_t);
// Check if extension
if (x) {
// Extension header length
unsigned int ehl = (((bufferM[headerlen + 2] & 0xFF) << 8) | (bufferM[headerlen + 3] & 0xFF));
// Update header length
headerlen += (ehl + 1) * (unsigned int)sizeof(uint32_t);
}
// Check that rtp is version 2 and payload contains multiple of TS packet data
if ((v != 2) || (((lengthP - headerlen) % TS_SIZE) != 0) || (bufferM[headerlen] != TS_SYNC_BYTE)) {
debug("cSatipRtp::%s(%d): Received incorrect RTP packet", __FUNCTION__, lengthP);
headerlen = -1;
}
}
}
return headerlen;
}
void cSatipRtp::Process(int fdP)
{
//debug("cSatipRtp::%s(%d)", __FUNCTION__, fdP);
if (bufferM) {
int length = Read(bufferM, min(tunerM.GetVideoDataSize(), bufferLenM));
if (length > 0) {
int headerlen = GetHeaderLenght(length);
if ((headerlen >= 0) && (headerlen < length))
tunerM.ProcessVideoData(bufferM + headerlen, length - headerlen);
}
}
}

39
rtp.h Normal file
View File

@ -0,0 +1,39 @@
/*
* rtp.h: SAT>IP plugin for the Video Disk Recorder
*
* See the README file for copyright information and how to reach the author.
*
*/
#ifndef __SATIP_RTP_H_
#define __SATIP_RTP_H_
#include "socket.h"
#include "tunerif.h"
#include "pollerif.h"
class cSatipRtp : public cSatipSocket, public cSatipPollerIf {
private:
enum {
eReportIntervalS = 300 // in seconds
};
cSatipTunerIf &tunerM;
unsigned int bufferLenM;
unsigned char *bufferM;
time_t lastErrorReportM;
int packetErrorsM;
int sequenceNumberM;
int GetHeaderLenght(int lengthP);
public:
cSatipRtp(cSatipTunerIf &tunerP, unsigned int bufferLenP);
virtual ~cSatipRtp();
virtual void Close(void);
// for internal poller interface
public:
virtual int GetFd(void);
virtual void Process(int fdP);
};
#endif /* __SATIP_RTP_H_ */

29
rtsp.c
View File

@ -9,7 +9,7 @@
#include "rtsp.h"
cSatipRtsp::cSatipRtsp(cSatipTunerIf &tunerP)
: tunerM(&tunerP),
: tunerM(tunerP),
tunerIdM(tunerP.GetId()),
handleM(curl_easy_init()),
headerListM(NULL)
@ -59,21 +59,21 @@ size_t cSatipRtsp::HeaderCallback(void *ptrP, size_t sizeP, size_t nmembP, void
char *s, *p = (char *)ptrP;
char *r = strtok_r(p, "\r\n", &s);
while (obj && obj->tunerM && r) {
while (obj && r) {
//debug("cSatipRtsp::%s(%zu): %s", __FUNCTION__, len, r);
r = skipspace(r);
if (strstr(r, "com.ses.streamID")) {
int streamid = -1;
if (sscanf(r, "com.ses.streamID:%11d", &streamid) == 1)
obj->tunerM->SetStreamId(streamid);
obj->tunerM.SetStreamId(streamid);
}
else if (strstr(r, "Session:")) {
int timeout = -1;
char *session = NULL;
if (sscanf(r, "Session:%m[^;];timeout=%11d", &session, &timeout) == 2)
obj->tunerM->SetSessionTimeout(skipspace(session), timeout * 1000);
obj->tunerM.SetSessionTimeout(skipspace(session), timeout * 1000);
else if (sscanf(r, "Session:%m[^;]", &session) == 1)
obj->tunerM->SetSessionTimeout(skipspace(session), -1);
obj->tunerM.SetSessionTimeout(skipspace(session), -1);
FREE_POINTER(session);
}
r = strtok_r(NULL, "\r\n", &s);
@ -88,11 +88,8 @@ size_t cSatipRtsp::WriteCallback(void *ptrP, size_t sizeP, size_t nmembP, void *
size_t len = sizeP * nmembP;
//debug("cSatipRtsp::%s(%zu)", __FUNCTION__, len);
if (obj && obj->tunerM && (len > 0)) {
char *data = strndup((char*)ptrP, len);
obj->tunerM->ParseReceptionParameters(data);
FREE_POINTER(data);
}
if (obj && (len > 0))
obj->tunerM.ProcessApplicationData((u_char*)ptrP, len);
return len;
}
@ -101,22 +98,22 @@ int cSatipRtsp::DebugCallback(CURL *handleP, curl_infotype typeP, char *dataP, s
{
cSatipRtsp *obj = reinterpret_cast<cSatipRtsp *>(userPtrP);
if (obj && obj->tunerM) {
if (obj) {
switch (typeP) {
case CURLINFO_TEXT:
debug("cSatipTuner::%s(%d): RTSP INFO %.*s", __FUNCTION__, obj->tunerM->GetId(), (int)sizeP, dataP);
debug("cSatipTuner::%s(%d): RTSP INFO %.*s", __FUNCTION__, obj->tunerM.GetId(), (int)sizeP, dataP);
break;
case CURLINFO_HEADER_IN:
debug("cSatipTuner::%s(%d): RTSP HEAD <<< %.*s", __FUNCTION__, obj->tunerM->GetId(), (int)sizeP, dataP);
debug("cSatipTuner::%s(%d): RTSP HEAD <<< %.*s", __FUNCTION__, obj->tunerM.GetId(), (int)sizeP, dataP);
break;
case CURLINFO_HEADER_OUT:
debug("cSatipTuner::%s(%d): RTSP HEAD >>>\n%.*s", __FUNCTION__, obj->tunerM->GetId(), (int)sizeP, dataP);
debug("cSatipTuner::%s(%d): RTSP HEAD >>>\n%.*s", __FUNCTION__, obj->tunerM.GetId(), (int)sizeP, dataP);
break;
case CURLINFO_DATA_IN:
debug("cSatipTuner::%s(%d): RTSP DATA <<< %.*s", __FUNCTION__, obj->tunerM->GetId(), (int)sizeP, dataP);
debug("cSatipTuner::%s(%d): RTSP DATA <<< %.*s", __FUNCTION__, obj->tunerM.GetId(), (int)sizeP, dataP);
break;
case CURLINFO_DATA_OUT:
debug("cSatipTuner::%s(%d): RTSP DATA >>>\n%.*s", __FUNCTION__, obj->tunerM->GetId(), (int)sizeP, dataP);
debug("cSatipTuner::%s(%d): RTSP DATA >>>\n%.*s", __FUNCTION__, obj->tunerM.GetId(), (int)sizeP, dataP);
break;
default:
break;

2
rtsp.h
View File

@ -27,7 +27,7 @@ private:
eConnectTimeoutMs = 1500, // in milliseconds
};
cSatipTunerIf* tunerM;
cSatipTunerIf &tunerM;
int tunerIdM;
CURL *handleM;
struct curl_slist *headerListM;

View File

@ -11,6 +11,7 @@
#include "config.h"
#include "device.h"
#include "discover.h"
#include "poller.h"
#include "setup.h"
#if defined(LIBCURL_VERSION_NUM) && LIBCURL_VERSION_NUM < 0x072400
@ -25,7 +26,7 @@
#define GITVERSION ""
#endif
const char VERSION[] = "0.3.4" GITVERSION;
const char VERSION[] = "1.0.0" GITVERSION;
static const char DESCRIPTION[] = trNOOP("SAT>IP Devices");
class cPluginSatip : public cPlugin {
@ -116,6 +117,7 @@ bool cPluginSatip::Initialize(void)
if (curl_global_init(CURL_GLOBAL_ALL) != CURLE_OK)
error("Unable to initialize CURL");
SatipConfig.SetConfigDirectory(cPlugin::ResourceDirectory(PLUGIN_NAME_I18N));
cSatipPoller::GetInstance()->Initialize();
cSatipDiscover::GetInstance()->Initialize(serversM);
return cSatipDevice::Initialize(deviceCountM);
}
@ -141,6 +143,7 @@ void cPluginSatip::Stop(void)
// Stop any background activities the plugin is performing.
cSatipDevice::Shutdown();
cSatipDiscover::GetInstance()->Destroy();
cSatipPoller::GetInstance()->Destroy();
curl_global_cleanup();
}

109
socket.c
View File

@ -20,10 +20,7 @@
cSatipSocket::cSatipSocket()
: socketPortM(0),
socketDescM(-1),
lastErrorReportM(0),
packetErrorsM(0),
sequenceNumberM(-1)
socketDescM(-1)
{
debug("cSatipSocket::%s()", __FUNCTION__);
memset(&sockAddrM, 0, sizeof(sockAddrM));
@ -75,14 +72,8 @@ void cSatipSocket::Close(void)
close(socketDescM);
socketDescM = -1;
socketPortM = 0;
sequenceNumberM = -1;
memset(&sockAddrM, 0, sizeof(sockAddrM));
}
if (packetErrorsM) {
info("detected %d RTP packet errors", packetErrorsM);
packetErrorsM = 0;
lastErrorReportM = time(NULL);
}
}
bool cSatipSocket::Flush(void)
@ -141,104 +132,6 @@ int cSatipSocket::Read(unsigned char *bufferAddrP, unsigned int bufferLenP)
return 0;
}
int cSatipSocket::ReadVideo(unsigned char *bufferAddrP, unsigned int bufferLenP)
{
//debug("cSatipSocket::%s()", __FUNCTION__);
int len = Read(bufferAddrP, bufferLenP);
if (len > 0) {
if (bufferAddrP[0] == TS_SYNC_BYTE)
return len;
else if (len > 3) {
// http://tools.ietf.org/html/rfc3550
// http://tools.ietf.org/html/rfc2250
// Version
unsigned int v = (bufferAddrP[0] >> 6) & 0x03;
// Extension bit
unsigned int x = (bufferAddrP[0] >> 4) & 0x01;
// CSCR count
unsigned int cc = bufferAddrP[0] & 0x0F;
// Payload type: MPEG2 TS = 33
//unsigned int pt = bufferAddrP[1] & 0x7F;
// Sequence number
int seq = ((bufferAddrP[2] & 0xFF) << 8) | (bufferAddrP[3] & 0xFF);
if ((((sequenceNumberM + 1) % 0xFFFF) == 0) && (seq == 0xFFFF))
sequenceNumberM = -1;
else if ((sequenceNumberM >= 0) && (((sequenceNumberM + 1) % 0xFFFF) != seq)) {
packetErrorsM++;
if (time(NULL) - lastErrorReportM > eReportIntervalS) {
info("detected %d RTP packet errors", packetErrorsM);
packetErrorsM = 0;
lastErrorReportM = time(NULL);
}
sequenceNumberM = seq;
}
else
sequenceNumberM = seq;
// Header lenght
unsigned int headerlen = (3 + cc) * (unsigned int)sizeof(uint32_t);
// Check if extension
if (x) {
// Extension header length
unsigned int ehl = (((bufferAddrP[headerlen + 2] & 0xFF) << 8) |
(bufferAddrP[headerlen + 3] & 0xFF));
// Update header length
headerlen += (ehl + 1) * (unsigned int)sizeof(uint32_t);
}
// Check that rtp is version 2 and payload contains multiple of TS packet data
if ((v == 2) && (((len - headerlen) % TS_SIZE) == 0) &&
(bufferAddrP[headerlen] == TS_SYNC_BYTE)) {
// Set argument point to payload in read buffer
memmove(bufferAddrP, &bufferAddrP[headerlen], (len - headerlen));
return (len - headerlen);
}
}
}
return 0;
}
int cSatipSocket::ReadApplication(unsigned char *bufferAddrP, unsigned int bufferLenP)
{
//debug("cSatipSocket::%s()", __FUNCTION__);
int len = Read(bufferAddrP, bufferLenP);
int offset = 0;
while (len > 0) {
// Version
unsigned int v = (bufferAddrP[offset] >> 6) & 0x03;
// Padding
//unsigned int p = (bufferAddrP[offset] >> 5) & 0x01;
// Subtype
//unsigned int st = bufferAddrP[offset] & 0x1F;
// Payload type
unsigned int pt = bufferAddrP[offset + 1] & 0xFF;
// Lenght
unsigned int length = ((bufferAddrP[offset + 2] & 0xFF) << 8) | (bufferAddrP[offset + 3] & 0xFF);
// Convert it to bytes
length = (length + 1) * 4;
// V=2, APP = 204
if ((v == 2) && (pt == 204)) {
// SSCR/CSCR
//unsigned int ssrc = ((bufferAddrP[offset + 4] & 0xFF) << 24) | ((bufferAddrP[offset + 5] & 0xFF) << 16) |
// ((bufferAddrP[offset + 6] & 0xFF) << 8) | (bufferAddrP[offset + 7] & 0xFF);
// Name
if ((bufferAddrP[offset + 8] == 'S') && (bufferAddrP[offset + 9] == 'E') &&
(bufferAddrP[offset + 10] == 'S') && (bufferAddrP[offset + 11] == '1')) {
// Identifier
//unsigned int id = ((bufferAddrP[offset + 12] & 0xFF) << 8) | (bufferAddrP[offset + 13] & 0xFF);
// String length
int string_length = ((bufferAddrP[offset + 14] & 0xFF) << 8) | (bufferAddrP[offset + 15] & 0xFF);
if (string_length > 0) {
// Set argument point to payload in read buffer
memmove(bufferAddrP, &bufferAddrP[offset + 16], string_length);
bufferAddrP[string_length] = 0;
return string_length;
}
}
}
offset += length;
len -= length;
}
return 0;
}
bool cSatipSocket::Write(const char *addrP, const unsigned char *bufferAddrP, unsigned int bufferLenP)
{

View File

@ -12,28 +12,20 @@
class cSatipSocket {
private:
enum {
eReportIntervalS = 300 // in seconds
};
int socketPortM;
int socketDescM;
struct sockaddr_in sockAddrM;
time_t lastErrorReportM;
int packetErrorsM;
int sequenceNumberM;
public:
cSatipSocket();
~cSatipSocket();
virtual ~cSatipSocket();
bool Open(const int portP = 0);
void Close(void);
virtual void Close(void);
int Fd(void) { return socketDescM; }
int Port(void) { return socketPortM; }
bool IsOpen(void) { return (socketDescM >= 0); }
bool Flush(void);
int Read(unsigned char *bufferAddrP, unsigned int bufferLenP);
int ReadVideo(unsigned char *bufferAddrP, unsigned int bufferLenP);
int ReadApplication(unsigned char *bufferAddrP, unsigned int bufferLenP);
bool Write(const char *addrP, const unsigned char *bufferAddrP, unsigned int bufferLenP);
};

255
tuner.c
View File

@ -5,11 +5,10 @@
*
*/
#include <sys/epoll.h>
#include "common.h"
#include "config.h"
#include "discover.h"
#include "poller.h"
#include "tuner.h"
cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP)
@ -17,21 +16,21 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP)
sleepM(),
deviceM(&deviceP),
deviceIdM(deviceP.GetId()),
packetBufferLenM(packetLenP),
rtspM(new cSatipRtsp(*this)),
rtpSocketM(new cSatipSocket()),
rtcpSocketM(new cSatipSocket()),
rtspM(*this),
rtpM(*this, packetLenP),
rtcpM(*this, eApplicationMaxSizeB),
streamAddrM(""),
streamParamM(""),
currentServerM(NULL),
nextServerM(NULL),
mutexM(),
reConnectM(),
keepAliveM(),
statusUpdateM(),
pidUpdateCacheM(),
sessionM(""),
tunerStatusM(tsIdle),
fdM(epoll_create(eMaxFileDescriptors)),
currentStateM(tsIdle),
nextStateM(tsIdle),
timeoutM(eMinKeepAliveIntervalMs),
hasLockM(false),
signalStrengthM(-1),
@ -41,45 +40,22 @@ cSatipTuner::cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP)
delPidsM(),
pidsM()
{
debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, packetBufferLenM, deviceIdM);
// Allocate packet buffer
packetBufferM = MALLOC(unsigned char, packetBufferLenM);
if (packetBufferM)
memset(packetBufferM, 0, packetBufferLenM);
else
error("MALLOC() failed for packet buffer [device %d]", deviceIdM);
debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, packetLenP, deviceIdM);
// Open sockets
int i = 100;
while (i-- > 0) {
if (rtpSocketM->Open(0) && rtcpSocketM->Open(rtpSocketM->Port() + 1))
if (rtpM.Open(0) && rtcpM.Open(rtpM.Port() + 1))
break;
rtpSocketM->Close();
rtcpSocketM->Close();
rtpM.Close();
rtcpM.Close();
}
if ((rtpSocketM->Port() <= 0) || (rtcpSocketM->Port() <= 0)) {
if ((rtpM.Port() <= 0) || (rtcpM.Port() <= 0)) {
error("Cannot open required RTP/RTCP ports [device %d]", deviceIdM);
}
// Setup epoll
if (fdM >= 0) {
if (rtpSocketM->Fd() >= 0) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = rtpSocketM->Fd();
if (epoll_ctl(fdM, EPOLL_CTL_ADD, rtpSocketM->Fd(), &ev) == -1) {
error("Cannot add RTP socket into epoll [device %d]", deviceIdM);
}
}
if (rtcpSocketM->Fd() >= 0) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = rtcpSocketM->Fd();
if (epoll_ctl(fdM, EPOLL_CTL_ADD, rtcpSocketM->Fd(), &ev) == -1) {
error("Cannot add RTP socket into epoll [device %d]", deviceIdM);
}
}
}
// Must be done after socket initialization!
cSatipPoller::GetInstance()->Register(rtpM);
cSatipPoller::GetInstance()->Register(rtcpM);
// Start thread
Start();
@ -89,7 +65,8 @@ cSatipTuner::~cSatipTuner()
{
debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM);
tunerStatusM = tsIdle;
currentStateM = tsIdle;
nextStateM = tsIdle;
// Stop thread
sleepM.Signal();
@ -97,84 +74,48 @@ cSatipTuner::~cSatipTuner()
Cancel(3);
Close();
// Cleanup epoll
if (fdM >= 0) {
if ((rtpSocketM->Fd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, rtpSocketM->Fd(), NULL) == -1)) {
error("Cannot remove RTP socket from epoll [device %d]", deviceIdM);
}
if ((rtcpSocketM->Fd() >= 0) && (epoll_ctl(fdM, EPOLL_CTL_DEL, rtcpSocketM->Fd(), NULL) == -1)) {
error("Cannot remove RTP socket from epoll [device %d]", deviceIdM);
}
close(fdM);
}
// Close the listening sockets
rtpSocketM->Close();
rtcpSocketM->Close();
// Free allocated memory
free(packetBufferM);
DELETENULL(rtcpSocketM);
DELETENULL(rtpSocketM);
DELETENULL(rtspM);
cSatipPoller::GetInstance()->Unregister(rtcpM);
cSatipPoller::GetInstance()->Unregister(rtpM);
rtcpM.Close();
rtpM.Close();
}
void cSatipTuner::Action(void)
{
debug("cSatipTuner::%s(): entering [device %d]", __FUNCTION__, deviceIdM);
cTimeMs reconnection(eConnectTimeoutMs);
// Increase priority
SetPriority(-1);
reConnectM.Set(eConnectTimeoutMs);
// Do the thread loop
while (packetBufferM && rtpSocketM && rtcpSocketM && Running()) {
struct epoll_event events[eMaxFileDescriptors];
int nfds = epoll_wait(fdM, events, eMaxFileDescriptors, eReadTimeoutMs);
switch (nfds) {
default:
reconnection.Set(eConnectTimeoutMs);
for (int i = 0; i < nfds; ++i) {
if (events[i].data.fd == rtpSocketM->Fd()) {
// Read data
int length = rtpSocketM->ReadVideo(packetBufferM, min(deviceM->CheckData(), packetBufferLenM));
if (length > 0) {
AddTunerStatistic(length);
deviceM->WriteData(packetBufferM, length);
}
}
else if (events[i].data.fd == rtcpSocketM->Fd()) {
unsigned char buf[1450];
memset(buf, 0, sizeof(buf));
if (rtcpSocketM->ReadApplication(buf, sizeof(buf)) > 0)
ParseReceptionParameters((const char *)buf);
}
}
// fall through into epoll timeout
case 0:
switch (tunerStatusM) {
while (Running()) {
mutexM.Lock();
if (StateRequested())
currentStateM = nextStateM;
mutexM.Unlock();
switch (currentStateM) {
case tsIdle:
//debug("cSatipTuner::%s(): tsIdle [device %d]", __FUNCTION__, deviceIdM);
break;
case tsRelease:
//debug("cSatipTuner::%s(): tsRelease [device %d]", __FUNCTION__, deviceIdM);
Disconnect();
tunerStatusM = tsIdle;
RequestState(tsIdle);
break;
case tsSet:
//debug("cSatipTuner::%s(): tsSet [device %d]", __FUNCTION__, deviceIdM);
reconnection.Set(eConnectTimeoutMs);
reConnectM.Set(eConnectTimeoutMs);
Disconnect();
if (Connect()) {
tunerStatusM = tsTuned;
RequestState(tsTuned);
UpdatePids(true);
}
else {
error("Tuning failed - re-tuning [device %d]", deviceIdM);
tunerStatusM = tsIdle;
RequestState(tsIdle);
}
break;
case tsTuned:
//debug("cSatipTuner::%s(): tsTuned [device %d]", __FUNCTION__, deviceIdM);
reconnection.Set(eConnectTimeoutMs);
reConnectM.Set(eConnectTimeoutMs);
// Read reception statistics via DESCRIBE and RTCP
if (hasLockM || ReadReceptionStatus()) {
// Quirk for devices without valid reception data
@ -184,37 +125,33 @@ void cSatipTuner::Action(void)
signalQualityM = eDefaultSignalQuality;
}
if (hasLockM)
tunerStatusM = tsLocked;
RequestState(tsLocked);
}
break;
case tsLocked:
//debug("cSatipTuner::%s(): tsLocked [device %d]", __FUNCTION__, deviceIdM);
tunerStatusM = tsLocked;
if (!UpdatePids()) {
error("Pid update failed - re-tuning [device %d]", deviceIdM);
tunerStatusM = tsSet;
RequestState(tsSet);
break;
}
if (!KeepAlive()) {
error("Keep-alive failed - re-tuning [device %d]", deviceIdM);
tunerStatusM = tsSet;
RequestState(tsSet);
break;
}
if (reconnection.TimedOut()) {
if (reConnectM.TimedOut()) {
error("Connection timeout - re-tuning [device %d]", deviceIdM);
tunerStatusM = tsSet;
RequestState(tsSet);
break;
}
break;
default:
error("Unknown tuner status %d [device %d]", tunerStatusM, deviceIdM);
break;
}
break;
case -1:
ERROR_IF((nfds == -1), "epoll_wait() failed");
error("Unknown tuner status %d [device %d]", currentStateM, deviceIdM);
break;
}
if (!StateRequested())
sleepM.Wait(eSleepTimeoutMs); // to avoid busy loop and reduce cpu load
}
debug("cSatipTuner::%s(): exiting [device %d]", __FUNCTION__, deviceIdM);
}
@ -223,7 +160,10 @@ bool cSatipTuner::Open(void)
{
cMutexLock MutexLock(&mutexM);
debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM);
tunerStatusM = tsSet;
RequestState(tsSet);
// return always true
return true;
}
@ -231,7 +171,10 @@ bool cSatipTuner::Close(void)
{
cMutexLock MutexLock(&mutexM);
debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM);
tunerStatusM = tsRelease;
RequestState(tsRelease);
// return always true
return true;
}
@ -244,18 +187,18 @@ bool cSatipTuner::Connect(void)
cString connectionUri = cString::sprintf("rtsp://%s", *streamAddrM);
cString uri = cString::sprintf("%s/?%s", *connectionUri, *streamParamM);
// Just retune
if ((tunerStatusM >= tsTuned) && (streamIdM >= 0) && rtpSocketM && rtcpSocketM) {
if (streamIdM >= 0) {
debug("cSatipTuner::%s(): retune [device %d]", __FUNCTION__, deviceIdM);
KeepAlive(true);
// Flush any old content
rtpSocketM->Flush();
return rtspM->Setup(*uri, rtpSocketM->Port(), rtcpSocketM->Port());
rtpM.Flush();
return rtspM.Setup(*uri, rtpM.Port(), rtcpM.Port());
}
keepAliveM.Set(timeoutM);
if (rtspM->Options(*connectionUri)) {
if (rtspM.Options(*connectionUri)) {
if (nextServerM && nextServerM->Quirk(cSatipServer::eSatipQuirkSessionId))
rtspM->SetSession(SkipZeroes(*sessionM));
if (rtspM->Setup(*uri, rtpSocketM->Port(), rtcpSocketM->Port())) {
rtspM.SetSession(SkipZeroes(*sessionM));
if (rtspM.Setup(*uri, rtpM.Port(), rtcpM.Port())) {
if (nextServerM) {
cSatipDiscover::GetInstance()->UseServer(nextServerM, true);
currentServerM = nextServerM;
@ -275,11 +218,12 @@ bool cSatipTuner::Disconnect(void)
cMutexLock MutexLock(&mutexM);
debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM);
if ((tunerStatusM >= tsTuned) && !isempty(*streamAddrM) && (streamIdM >= 0)) {
if (!isempty(*streamAddrM) && (streamIdM >= 0)) {
cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM);
rtspM->Teardown(*uri);
rtspM.Teardown(*uri);
}
tunerStatusM = tsIdle;
currentStateM = tsIdle;
nextStateM = tsIdle;
// Reset signal parameters
hasLockM = false;
@ -297,15 +241,33 @@ bool cSatipTuner::Disconnect(void)
return true;
}
void cSatipTuner::ParseReceptionParameters(const char *paramP)
unsigned int cSatipTuner::GetVideoDataSize(void)
{
//debug("cSatipTuner::%s(%s) [device %d]", __FUNCTION__, paramP, deviceIdM);
//debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM);
return deviceM->CheckData();
}
void cSatipTuner::ProcessVideoData(u_char *bufferP, int lengthP)
{
//debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, lengthP, deviceIdM);
if (lengthP > 0) {
AddTunerStatistic(lengthP);
deviceM->WriteData(bufferP, lengthP);
}
cMutexLock MutexLock(&mutexM);
reConnectM.Set(eConnectTimeoutMs);
}
void cSatipTuner::ProcessApplicationData(u_char *bufferP, int lengthP)
{
//debug("cSatipTuner::%s(%d) [device %d]", __FUNCTION__, lengthP, deviceIdM);
// DVB-S2:
// ver=<major>.<minor>;src=<srcID>;tuner=<feID>,<level>,<lock>,<quality>,<frequency>,<polarisation>,<system>,<type>,<pilots>,<roll_off>,<symbol_rate>,<fec_inner>;pids=<pid0>,...,<pidn>
// DVB-T2:
// ver=1.1;tuner=<feID>,<level>,<lock>,<quality>,<freq>,<bw>,<msys>,<tmode>,<mtype>,<gi>,<fec>,<plp>,<t2id>,<sm>;pids=<pid0>,...,<pidn>
if (!isempty(paramP)) {
char *s = strdup(paramP);
if (lengthP > 0) {
char *s = strndup((char *)bufferP, lengthP);
//debug("cSatipTuner::%s(%s) [device %d]", __FUNCTION__, s, deviceIdM);
char *c = strstr(s, ";tuner=");
if (c) {
int value;
@ -370,15 +332,16 @@ bool cSatipTuner::SetSource(cSatipServer *serverP, const char *parameterP, const
cMutexLock MutexLock(&mutexM);
if (serverP) {
nextServerM = cSatipDiscover::GetInstance()->GetServer(serverP);
if (rtspM && nextServerM && !isempty(nextServerM->Address()) && !isempty(parameterP)) {
if (nextServerM && !isempty(nextServerM->Address()) && !isempty(parameterP)) {
// Update stream address and parameter
streamAddrM = rtspM->RtspUnescapeString(nextServerM->Address());
streamParamM = rtspM->RtspUnescapeString(parameterP);
tunerStatusM = tsSet;
streamAddrM = rtspM.RtspUnescapeString(nextServerM->Address());
streamParamM = rtspM.RtspUnescapeString(parameterP);
RequestState(tsSet);
}
}
else
tunerStatusM = tsRelease;
RequestState(tsRelease);
return true;
}
@ -397,14 +360,16 @@ bool cSatipTuner::SetPid(int pidP, int typeP, bool onP)
addPidsM.RemovePid(pidP);
}
pidUpdateCacheM.Set(ePidUpdateIntervalMs);
sleepM.Signal();
return true;
}
bool cSatipTuner::UpdatePids(bool forceP)
{
//debug("cSatipTuner::%s(%d) tunerStatus=%s [device %d]", __FUNCTION__, forceP, TunerStatusString(tunerStatusM), deviceIdM);
//debug("cSatipTuner::%s(%d) tunerState=%s [device %d]", __FUNCTION__, forceP, TunerStateString(currentStateM), deviceIdM);
if (((forceP && pidsM.Size()) || (pidUpdateCacheM.TimedOut() && (addPidsM.Size() || delPidsM.Size()))) &&
(tunerStatusM >= tsTuned) && !isempty(*streamAddrM) && (streamIdM > 0)) {
!isempty(*streamAddrM) && (streamIdM > 0)) {
cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM);
bool usedummy = !!(currentServerM && currentServerM->Quirk(cSatipServer::eSatipQuirkPlayPids));
if (forceP || usedummy) {
@ -428,7 +393,7 @@ bool cSatipTuner::UpdatePids(bool forceP)
uri = cString::sprintf("%s%d%s", *uri, delPidsM[i], (i == (delPidsM.Size() - 1)) ? "" : ",");
}
}
if (!rtspM->Play(*uri))
if (!rtspM.Play(*uri))
return false;
addPidsM.Clear();
delPidsM.Clear();
@ -439,7 +404,7 @@ bool cSatipTuner::UpdatePids(bool forceP)
bool cSatipTuner::KeepAlive(bool forceP)
{
//debug("cSatipTuner::%s(%d) tunerStatus=%s [device %d]", __FUNCTION__, forceP, TunerStatusString(tunerStatusM), deviceIdM);
//debug("cSatipTuner::%s(%d) tunerState=%s [device %d]", __FUNCTION__, forceP, TunerStateString(currentStateM), deviceIdM);
cMutexLock MutexLock(&mutexM);
if (keepAliveM.TimedOut()) {
keepAliveM.Set(timeoutM);
@ -447,7 +412,7 @@ bool cSatipTuner::KeepAlive(bool forceP)
}
if (forceP && !isempty(*streamAddrM) && (streamIdM > 0)) {
cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM);
if (!rtspM->Options(*uri))
if (!rtspM.Options(*uri))
return false;
}
@ -456,7 +421,7 @@ bool cSatipTuner::KeepAlive(bool forceP)
bool cSatipTuner::ReadReceptionStatus(bool forceP)
{
//debug("cSatipTuner::%s(%d) tunerStatus=%s [device %d]", __FUNCTION__, forceP, TunerStatusString(tunerStatusM), deviceIdM);
//debug("cSatipTuner::%s(%d) tunerState=%s [device %d]", __FUNCTION__, forceP, TunerStateString(currentStateM), deviceIdM);
cMutexLock MutexLock(&mutexM);
if (statusUpdateM.TimedOut()) {
statusUpdateM.Set(eStatusUpdateTimeoutMs);
@ -464,16 +429,35 @@ bool cSatipTuner::ReadReceptionStatus(bool forceP)
}
if (forceP && !isempty(*streamAddrM) && (streamIdM > 0)) {
cString uri = cString::sprintf("rtsp://%s/stream=%d", *streamAddrM, streamIdM);
if (rtspM->Describe(*uri))
if (rtspM.Describe(*uri))
return true;
}
return false;
}
const char *cSatipTuner::TunerStatusString(eTunerStatus statusP)
bool cSatipTuner::StateRequested(void)
{
switch (statusP) {
cMutexLock MutexLock(&mutexM);
//debug("cSatipTuner::%s(%s <> %s) [device %d]", __FUNCTION__, TunerStateString(currentStateM), TunerStateString(nextStateM), deviceIdM);
return (currentStateM != nextStateM);
}
bool cSatipTuner::RequestState(eTunerState stateP)
{
cMutexLock MutexLock(&mutexM);
debug("cSatipTuner::%s(%s) [device %d]", __FUNCTION__, TunerStateString(stateP), deviceIdM);
nextStateM = stateP;
// validate legal state changes
return true;
}
const char *cSatipTuner::TunerStateString(eTunerState stateP)
{
switch (stateP) {
case tsIdle:
return "tsIdle";
case tsRelease:
@ -487,6 +471,7 @@ const char *cSatipTuner::TunerStatusString(eTunerStatus statusP)
default:
break;
}
return "---";
}
@ -505,7 +490,7 @@ int cSatipTuner::SignalQuality(void)
bool cSatipTuner::HasLock(void)
{
//debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM);
return (tunerStatusM >= tsTuned) && hasLockM;
return (currentStateM >= tsTuned) && hasLockM;
}
cString cSatipTuner::GetSignalStatus(void)
@ -517,5 +502,5 @@ cString cSatipTuner::GetSignalStatus(void)
cString cSatipTuner::GetInformation(void)
{
//debug("cSatipTuner::%s() [device %d]", __FUNCTION__, deviceIdM);
return (tunerStatusM >= tsTuned) ? cString::sprintf("rtsp://%s/?%s [stream=%d]", *streamAddrM, *streamParamM, streamIdM) : "connection failed";
return (currentStateM >= tsTuned) ? cString::sprintf("rtsp://%s/?%s [stream=%d]", *streamAddrM, *streamParamM, streamIdM) : "connection failed";
}

35
tuner.h
View File

@ -12,10 +12,11 @@
#include <vdr/tools.h>
#include "deviceif.h"
#include "rtp.h"
#include "rtcp.h"
#include "rtsp.h"
#include "server.h"
#include "statistics.h"
#include "socket.h"
class cSatipPid : public cVector<int> {
private:
@ -43,40 +44,40 @@ public:
}
};
class cSatipTuner : public cThread, public cSatipTunerStatistics, public cSatipTunerIf {
class cSatipTuner : public cThread, public cSatipTunerStatistics, public cSatipTunerIf
{
private:
enum {
eMaxFileDescriptors = 2, // RTP + RTCP
eDummyPid = 100,
eDefaultSignalStrength = 15,
eDefaultSignalQuality = 224,
eReadTimeoutMs = 500, // in milliseconds
eApplicationMaxSizeB = 1500,
eSleepTimeoutMs = 500, // in milliseconds
eStatusUpdateTimeoutMs = 1000, // in milliseconds
ePidUpdateIntervalMs = 250, // in milliseconds
eConnectTimeoutMs = 5000, // in milliseconds
eMinKeepAliveIntervalMs = 30000 // in milliseconds
};
enum eTunerStatus { tsIdle, tsRelease, tsSet, tsTuned, tsLocked };
enum eTunerState { tsIdle, tsRelease, tsSet, tsTuned, tsLocked };
cCondWait sleepM;
cSatipDeviceIf* deviceM;
int deviceIdM;
unsigned char* packetBufferM;
unsigned int packetBufferLenM;
cSatipRtsp *rtspM;
cSatipSocket *rtpSocketM;
cSatipSocket *rtcpSocketM;
cSatipRtsp rtspM;
cSatipRtp rtpM;
cSatipRtcp rtcpM;
cString streamAddrM;
cString streamParamM;
cSatipServer *currentServerM;
cSatipServer *nextServerM;
cMutex mutexM;
cTimeMs reConnectM;
cTimeMs keepAliveM;
cTimeMs statusUpdateM;
cTimeMs pidUpdateCacheM;
cString sessionM;
eTunerStatus tunerStatusM;
int fdM;
eTunerState currentStateM;
eTunerState nextStateM;
int timeoutM;
bool hasLockM;
int signalStrengthM;
@ -91,7 +92,9 @@ private:
bool KeepAlive(bool forceP = false);
bool ReadReceptionStatus(bool forceP = false);
bool UpdatePids(bool forceP = false);
const char *TunerStatusString(eTunerStatus statusP);
bool StateRequested(void);
bool RequestState(eTunerState stateP);
const char *TunerStateString(eTunerState stateP);
protected:
virtual void Action(void);
@ -99,7 +102,7 @@ protected:
public:
cSatipTuner(cSatipDeviceIf &deviceP, unsigned int packetLenP);
virtual ~cSatipTuner();
bool IsTuned(void) const { return tunerStatusM > tsIdle; }
bool IsTuned(void) const { return currentStateM > tsIdle; }
bool SetSource(cSatipServer *serverP, const char *parameterP, const int indexP);
bool SetPid(int pidP, int typeP, bool onP);
bool Open(void);
@ -112,7 +115,9 @@ public:
// for internal tuner interface
public:
virtual void ParseReceptionParameters(const char *paramP);
virtual unsigned int GetVideoDataSize(void);
virtual void ProcessVideoData(u_char *bufferP, int lengthP);
virtual void ProcessApplicationData(u_char *bufferP, int lengthP);
virtual void SetStreamId(int streamIdP);
virtual void SetSessionTimeout(const char *sessionP, int timeoutP);
virtual int GetId(void);

View File

@ -12,7 +12,9 @@ class cSatipTunerIf {
public:
cSatipTunerIf() {}
virtual ~cSatipTunerIf() {}
virtual void ParseReceptionParameters(const char *paramP) = 0;
virtual unsigned int GetVideoDataSize(void) = 0;
virtual void ProcessVideoData(u_char *bufferP, int lenghtP) = 0;
virtual void ProcessApplicationData(u_char *bufferP, int lenghtP) = 0;
virtual void SetStreamId(int streamIdP) = 0;
virtual void SetSessionTimeout(const char *sessionP, int timeoutP) = 0;
virtual int GetId(void) = 0;