Snapshot 2007-05-09

This commit is contained in:
Frank Schmirler 2010-12-02 08:59:14 +01:00
parent e6249bf957
commit 5a270cc3ab
22 changed files with 1040 additions and 260 deletions

View File

@ -1,7 +1,7 @@
#
# Makefile for a Video Disk Recorder plugin
#
# $Id: Makefile,v 1.7 2006/09/14 10:30:16 schmirl Exp $
# $Id: Makefile,v 1.8 2007/04/16 11:01:02 schmirl Exp $
# The official name of this plugin.
# This name will be used in the '-P...' option of VDR to load the plugin.
@ -16,7 +16,7 @@ VERSION = $(shell grep 'const char \*VERSION *=' common.c | awk '{ print $$5 }'
### The C++ compiler and options:
CXX ?= g++
CXXFLAGS ?= -fPIC -W -Woverloaded-virtual
CXXFLAGS ?= -fPIC -Wall -Woverloaded-virtual
### The directory environment:

View File

@ -1,5 +1,5 @@
/*
* $Id: device.c,v 1.8 2007/01/15 12:15:12 schmirl Exp $
* $Id: device.c,v 1.13 2007/05/07 12:18:18 schmirl Exp $
*/
#include "client/device.h"
@ -42,6 +42,8 @@ cStreamdevDevice::cStreamdevDevice(void) {
#endif
m_Device = this;
m_Pids = 0;
m_DvrClosed = true;
if (StreamdevClientSetup.SyncEPG)
ClientSocket.SynchronizeEPG();
@ -59,13 +61,22 @@ cStreamdevDevice::~cStreamdevDevice() {
bool cStreamdevDevice::ProvidesSource(int Source) const {
Dprintf("ProvidesSource, Source=%d\n", Source);
return false;
return true;
}
bool cStreamdevDevice::ProvidesTransponder(const cChannel *Channel) const
{
Dprintf("ProvidesTransponder\n");
return false;
return true;
}
bool cStreamdevDevice::IsTunedToTransponder(const cChannel *Channel)
{
bool res = false;
if (ClientSocket.DataSocket(siLive) != NULL
&& TRANSPONDER(Channel, m_Channel))
res = true;
return res;
}
bool cStreamdevDevice::ProvidesChannel(const cChannel *Channel, int Priority,
@ -123,37 +134,118 @@ bool cStreamdevDevice::SetChannelDevice(const cChannel *Channel,
bool cStreamdevDevice::SetPid(cPidHandle *Handle, int Type, bool On) {
Dprintf("SetPid, Pid=%d, Type=%d, On=%d, used=%d\n", Handle->pid, Type, On,
Handle->used);
if (Handle->pid && (On || !Handle->used))
return ClientSocket.SetPid(Handle->pid, On);
return true;
LOCK_THREAD;
if (On && !m_TSBuffer) {
Dprintf("SetPid: no data connection -> OpenDvr()");
OpenDvrInt();
}
bool res = true;
if (Handle->pid && (On || !Handle->used)) {
res = ClientSocket.SetPid(Handle->pid, On);
m_Pids += (!res) ? 0 : On ? 1 : -1;
if (m_Pids < 0)
m_Pids = 0;
if(m_Pids < 1 && m_DvrClosed) {
Dprintf("SetPid: 0 pids left -> CloseDvr()");
CloseDvrInt();
}
}
return res;
}
bool cStreamdevDevice::OpenDvrInt(void) {
Dprintf("OpenDvrInt\n");
LOCK_THREAD;
CloseDvrInt();
if (m_TSBuffer) {
Dprintf("cStreamdevDevice::OpenDvrInt(): DVR connection already open\n");
return true;
}
Dprintf("cStreamdevDevice::OpenDvrInt(): Connecting ...\n");
if (ClientSocket.CreateDataConnection(siLive)) {
m_TSBuffer = new cTSBuffer(*ClientSocket.DataSocket(siLive), MEGABYTE(2), CardIndex() + 1);
return true;
}
esyslog("cStreamdevDevice::OpenDvrInt(): DVR connection FAILED");
return false;
}
bool cStreamdevDevice::OpenDvr(void) {
Dprintf("OpenDvr\n");
CloseDvr();
if (ClientSocket.CreateDataConnection(siLive)) {
//m_Assembler = new cStreamdevAssembler(ClientSocket.DataSocket(siLive));
//m_TSBuffer = new cTSBuffer(m_Assembler->ReadPipe(), MEGABYTE(2), CardIndex() + 1);
m_TSBuffer = new cTSBuffer(*ClientSocket.DataSocket(siLive), MEGABYTE(2), CardIndex() + 1);
Dprintf("waiting\n");
//m_Assembler->WaitForFill();
Dprintf("resuming\n");
return true;
LOCK_THREAD;
m_DvrClosed = false;
return OpenDvrInt();
}
void cStreamdevDevice::CloseDvrInt(void) {
Dprintf("CloseDvrInt\n");
LOCK_THREAD;
if (ClientSocket.CheckConnection()) {
if (!m_DvrClosed) {
Dprintf("cStreamdevDevice::CloseDvrInt(): m_DvrClosed=false -> not closing yet\n");
return;
}
if (m_Pids > 0) {
Dprintf("cStreamdevDevice::CloseDvrInt(): %d active pids -> not closing yet\n", m_Pids);
return;
}
} else {
Dprintf("cStreamdevDevice::CloseDvrInt(): Control connection gone !\n");
}
return false;
Dprintf("cStreamdevDevice::CloseDvrInt(): Closing DVR connection\n");
DELETENULL(m_TSBuffer);
ClientSocket.CloseDvr();
}
void cStreamdevDevice::CloseDvr(void) {
Dprintf("CloseDvr\n");
LOCK_THREAD;
//DELETENULL(m_Assembler);
DELETENULL(m_TSBuffer);
ClientSocket.CloseDvr();
m_DvrClosed = true;
CloseDvrInt();
}
bool cStreamdevDevice::GetTSPacket(uchar *&Data) {
if (m_TSBuffer) {
Data = m_TSBuffer->Get();
#if 1 // TODO: this should be fixed in vdr cTSBuffer
// simple disconnect detection
static int m_TSFails = 0;
if (!Data) {
LOCK_THREAD;
if(!ClientSocket.DataSocket(siLive)) {
return false; // triggers CloseDvr() + OpenDvr() in cDevice
}
cPoller Poller(*ClientSocket.DataSocket(siLive));
errno = 0;
if (Poller.Poll() && !errno) {
char tmp[1];
if (recv(*ClientSocket.DataSocket(siLive), tmp, 1, MSG_PEEK) == 0 && !errno) {
esyslog("cStreamDevice::GetTSPacket: GetChecked: NOTHING (%d)", m_TSFails);
m_TSFails++;
if (m_TSFails > 10) {
isyslog("cStreamdevDevice::GetTSPacket(): disconnected");
m_Pids = 0;
CloseDvrInt();
m_TSFails = 0;
return false;
}
return true;
}
}
m_TSFails = 0;
}
#endif
return true;
}
return false;
@ -162,11 +254,24 @@ bool cStreamdevDevice::GetTSPacket(uchar *&Data) {
#if VDRVERSNUM >= 10300
int cStreamdevDevice::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
Dprintf("OpenFilter\n");
if (StreamdevClientSetup.StreamFilters
&& ClientSocket.SetFilter(Pid, Tid, Mask, true)) {
return m_Filters->OpenFilter(Pid, Tid, Mask);
} else
if (!StreamdevClientSetup.StreamFilters)
return -1;
if (!ClientSocket.DataSocket(siLiveFilter)) {
if (ClientSocket.CreateDataConnection(siLiveFilter)) {
m_Filters->SetConnection(*ClientSocket.DataSocket(siLiveFilter));
} else {
isyslog("cStreamdevDevice::OpenFilter: connect failed: %m");
return -1;
}
}
if (ClientSocket.SetFilter(Pid, Tid, Mask, true))
return m_Filters->OpenFilter(Pid, Tid, Mask);
return -1;
}
#endif
@ -177,11 +282,17 @@ bool cStreamdevDevice::Init(void) {
}
bool cStreamdevDevice::ReInit(void) {
if(m_Device) {
m_Device->Lock();
m_Device->m_Filters->SetConnection(-1);
m_Device->m_Pids = 0;
}
ClientSocket.Quit();
ClientSocket.Reset();
if (m_Device != NULL) {
DELETENULL(m_Device->m_TSBuffer);
//DELETENULL(m_Device->m_TSBuffer);
DELETENULL(m_Device->m_Assembler);
m_Device->Unlock();
}
return StreamdevClientSetup.StartClient ? Init() : true;
}

View File

@ -1,5 +1,5 @@
/*
* $Id: device.h,v 1.3 2005/02/08 15:21:19 lordjaxom Exp $
* $Id: device.h,v 1.5 2007/04/24 10:43:40 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_DEVICE_H
@ -25,9 +25,14 @@ private:
#if VDRVERSNUM >= 10307
cStreamdevFilters *m_Filters;
#endif
int m_Pids;
bool m_DvrClosed;
static cStreamdevDevice *m_Device;
bool OpenDvrInt(void);
void CloseDvrInt(void);
protected:
virtual bool SetChannelDevice(const cChannel *Channel, bool LiveView);
virtual bool HasLock(int TimeoutMs)
@ -51,9 +56,10 @@ public:
virtual ~cStreamdevDevice();
virtual bool ProvidesSource(int Source) const;
virtual bool ProvidesTransponder(const cChannel *Channel) const;
virtual bool ProvidesTransponder(const cChannel *Channel) const;
virtual bool ProvidesChannel(const cChannel *Channel, int Priority = -1,
bool *NeedsDetachReceivers = NULL) const;
virtual bool IsTunedToTransponder(const cChannel *Channel);
static bool Init(void);
static bool ReInit(void);

View File

@ -1,5 +1,5 @@
/*
* $Id: filter.c,v 1.3 2005/11/06 16:43:58 lordjaxom Exp $
* $Id: filter.c,v 1.11 2007/04/24 11:23:16 schmirl Exp $
*/
#include "client/filter.h"
@ -7,113 +7,288 @@
#include "tools/select.h"
#include "common.h"
#include <vdr/ringbuffer.h>
#include <vdr/device.h>
#if VDRVERSNUM >= 10300
// --- cStreamdevFilter ------------------------------------------------------
class cStreamdevFilter: public cListObject {
private:
uchar m_Buffer[4096];
int m_Used;
int m_Pipe[2];
u_short m_Pid;
u_char m_Tid;
u_char m_Mask;
public:
cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask);
virtual ~cStreamdevFilter();
bool Matches(u_short Pid, u_char Tid);
bool PutSection(const uchar *Data, int Length, bool Pusi);
int ReadPipe(void) const { return m_Pipe[0]; }
bool IsClosed(void);
void Reset(void);
u_short Pid(void) const { return m_Pid; }
u_char Tid(void) const { return m_Tid; }
u_char Mask(void) const { return m_Mask; }
};
inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) {
return m_Pid == Pid && m_Tid == (Tid & m_Mask);
}
cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) {
m_Used = 0;
m_Pid = Pid;
m_Tid = Tid;
m_Mask = Mask;
m_Pipe[0] = m_Pipe[1] = -1;
if (pipe(m_Pipe) != 0 || fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0) {
esyslog("streamev-client: coudln't open section filter pipe: %m");
m_Pipe[0] = m_Pipe[1] = -1;
#ifdef SOCK_SEQPACKET
// SOCK_SEQPACKET (since kernel 2.6.4)
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, m_Pipe) != 0) {
esyslog("streamdev-client: socketpair(SOCK_SEQPACKET) failed: %m, trying SOCK_DGRAM");
}
#endif
if (m_Pipe[0] < 0 && socketpair(AF_UNIX, SOCK_DGRAM, 0, m_Pipe) != 0) {
esyslog("streamdev-client: couldn't open section filter socket: %m");
}
else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 ||
fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) {
esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m");
}
}
cStreamdevFilter::~cStreamdevFilter() {
Dprintf("~cStreamdevFilter %p\n", this);
if (m_Pipe[0] >= 0)
close(m_Pipe[0]);
// ownership of handle m_Pipe[0] has been transferred to VDR section handler
//if (m_Pipe[0] >= 0)
// close(m_Pipe[0]);
if (m_Pipe[1] >= 0)
close(m_Pipe[1]);
}
bool cStreamdevFilter::PutSection(const uchar *Data, int Length) {
bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) {
if (!m_Used && !Pusi) /* wait for payload unit start indicator */
return true;
if (m_Used && Pusi) /* reset at payload unit start */
Reset();
if (m_Used + Length >= (int)sizeof(m_Buffer)) {
esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)",
Length);
m_Used = 0;
Reset();
return true;
}
memcpy(m_Buffer + m_Used, Data, Length);
m_Used += Length;
if (m_Used > 3) {
int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3;
if (m_Used == length) {
if (write(m_Pipe[1], m_Buffer, length) < 0)
return false;
m_Used = 0;
if (write(m_Pipe[1], m_Buffer, length) < 0) {
if(errno == EAGAIN || errno == EWOULDBLOCK)
dsyslog("cStreamdevFilter::PutSection socket overflow, "
"Pid %4d Tid %3d", m_Pid, m_Tid);
else
return false;
}
}
if (m_Used > length) {
dsyslog("cStreamdevFilter::PutSection: m_Used > length ! Pid %2d, Tid%2d "
"(len %3d, got %d/%d)", m_Pid, m_Tid, Length, m_Used, length);
if(Length < TS_SIZE-5) {
// TS packet not full -> this must be last TS packet of section data -> safe to reset now
Reset();
}
}
}
return true;
}
void cStreamdevFilter::Reset(void) {
if(m_Used)
dsyslog("cStreamdevFilter::Reset skipping %d bytes", m_Used);
m_Used = 0;
}
bool cStreamdevFilter::IsClosed(void) {
char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */
// Test if pipe/socket has been closed by writing empty section
if (write(m_Pipe[1], m_Buffer, 3) < 0 &&
errno != EAGAIN &&
errno != EWOULDBLOCK) {
if (errno != ECONNREFUSED &&
errno != ECONNRESET &&
errno != EPIPE)
esyslog("cStreamdevFilter::IsClosed failed: %m");
return true;
}
return false;
}
// --- cStreamdevFilters -----------------------------------------------------
cStreamdevFilters::cStreamdevFilters(void):
cThread("streamdev-client: sections assembler") {
m_Active = false;
m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true);
Start();
m_TSBuffer = NULL;
}
cStreamdevFilters::~cStreamdevFilters() {
if (m_Active) {
m_Active = false;
Cancel(3);
}
delete m_RingBuffer;
SetConnection(-1);
}
int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
CarbageCollect();
cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask);
int fh = f->ReadPipe();
Lock();
Add(f);
return f->ReadPipe();
Unlock();
return fh;
}
cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) {
for (cStreamdevFilter *f = First(); f; f = Next(f)) {
if (f->Matches(Pid, Tid))
return f;
void cStreamdevFilters::CarbageCollect(void) {
LOCK_THREAD;
for (cStreamdevFilter *fi = First(); fi;) {
if (fi->IsClosed()) {
if (errno == ECONNREFUSED ||
errno == ECONNRESET ||
errno == EPIPE) {
ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false);
Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)",
(int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
cStreamdevFilter *next = Prev(fi);
Del(fi);
fi = next ? Next(next) : First();
} else {
esyslog("cStreamdevFilters::CarbageCollector() error: "
"Pid %4d, Tid %3d, Mask %2x (%d filters left) failed",
(int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
LOG_ERROR;
fi = Next(fi);
}
} else {
fi = Next(fi);
}
}
return NULL;
}
void cStreamdevFilters::Put(const uchar *Data) {
int p = m_RingBuffer->Put(Data, TS_SIZE);
if (p != TS_SIZE)
m_RingBuffer->ReportOverflow(TS_SIZE - p);
bool cStreamdevFilters::ReActivateFilters(void)
{
LOCK_THREAD;
bool res = true;
CarbageCollect();
for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) {
res = ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res;
Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL");
}
return res;
}
void cStreamdevFilters::SetConnection(int Handle) {
Cancel(2);
DELETENULL(m_TSBuffer);
if (Handle >= 0) {
m_TSBuffer = new cTSBuffer(Handle, MEGABYTE(1), 1);
ReActivateFilters();
Start();
}
}
void cStreamdevFilters::Action(void) {
m_Active = true;
while (m_Active) {
int recvd;
const uchar *block = m_RingBuffer->Get(recvd);
int fails = 0;
if (block && recvd > 0) {
cStreamdevFilter *f;
while (Running()) {
const uchar *block = m_TSBuffer->Get();
if (block) {
u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2];
u_char tid = block[3];
if ((f = Matches(pid, tid)) != NULL) {
int len = block[4];
if (!f->PutSection(block + 5, len)) {
if (errno != EPIPE) {
esyslog("streamdev-client: couldn't send section packet: %m");
bool Pusi = block[1] & 0x40;
int len = block[4];
#if 0
if (block[1] == 0xff &&
block[2] == 0xff &&
block[3] == 0xff &&
block[4] == 0x7f)
isyslog("*********** TRANSPONDER -> %s **********", block+5);
#endif
LOCK_THREAD;
cStreamdevFilter *f = First();
while (f) {
cStreamdevFilter *next = Next(f);
if (f->Matches(pid, tid)) {
if (f->PutSection(block + 5, len, Pusi))
break;
if (errno != ECONNREFUSED &&
errno != ECONNRESET &&
errno != EPIPE) {
Dprintf("FATAL ERROR: %m\n");
esyslog("streamdev-client: couldn't send section packet: %m");
}
ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false);
Del(f);
// Filter was closed.
// - need to check remaining filters for another match
}
f = next;
}
m_RingBuffer->Del(TS_SIZE);
} else
usleep(1);
} else {
#if 1 // TODO: this should be fixed in vdr cTSBuffer
// Check disconnection
int fd = *ClientSocket.DataSocket(siLiveFilter);
if(fd < 0)
break;
cPoller Poller(fd);
if (Poller.Poll()) {
char tmp[1];
errno = 0;
Dprintf("cStreamdevFilters::Action(): checking connection");
if (recv(fd, tmp, 1, MSG_PEEK) == 0 && errno != EAGAIN) {
++fails;
if (fails >= 10) {
esyslog("cStreamdevFilters::Action(): stream disconnected ?");
ClientSocket.CloseDataConnection(siLiveFilter);
break;
}
} else {
fails = 0;
}
} else {
fails = 0;
}
cCondWait::SleepMs(10);
#endif
}
}
DELETENULL(m_TSBuffer);
dsyslog("StreamdevFilters::Action() ended");
}
#endif // VDRVERSNUM >= 10300

View File

@ -1,5 +1,5 @@
/*
* $Id: filter.h,v 1.1.1.1 2004/12/30 22:44:04 lordjaxom Exp $
* $Id: filter.h,v 1.4 2007/04/24 11:23:16 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_FILTER_H
@ -12,52 +12,25 @@
#include <vdr/tools.h>
#include <vdr/thread.h>
class cRingBufferFrame;
class cRingBufferLinear;
class cStreamdevFilter: public cListObject {
private:
uchar m_Buffer[4096];
int m_Used;
int m_Pipe[2];
u_short m_Pid;
u_char m_Tid;
u_char m_Mask;
cRingBufferFrame *m_RingBuffer;
public:
cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask);
virtual ~cStreamdevFilter();
bool Matches(u_short Pid, u_char Tid);
bool PutSection(const uchar *Data, int Length);
int ReadPipe(void) const { return m_Pipe[0]; }
u_short Pid(void) const { return m_Pid; }
u_char Tid(void) const { return m_Tid; }
u_char Mask(void) const { return m_Mask; }
};
inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) {
return m_Pid == Pid && m_Tid == (Tid & m_Mask);
}
class cTSBuffer;
class cStreamdevFilter;
class cStreamdevFilters: public cList<cStreamdevFilter>, public cThread {
private:
bool m_Active;
cRingBufferLinear *m_RingBuffer;
cTSBuffer *m_TSBuffer;
protected:
virtual void Action(void);
void CarbageCollect(void);
bool ReActivateFilters(void);
public:
cStreamdevFilters(void);
virtual ~cStreamdevFilters();
void SetConnection(int Handle);
int OpenFilter(u_short Pid, u_char Tid, u_char Mask);
cStreamdevFilter *Matches(u_short Pid, u_char Tid);
void Put(const uchar *Data);
};
# endif // VDRVERSNUM >= 10300

View File

@ -1,5 +1,5 @@
/*
* $Id: socket.c,v 1.7 2007/01/15 11:45:48 schmirl Exp $
* $Id: socket.c,v 1.8 2007/04/24 10:57:34 schmirl Exp $
*/
#include <tools/select.h>
@ -215,6 +215,24 @@ bool cClientSocket::CreateDataConnection(eSocketId Id) {
return true;
}
bool cClientSocket::CloseDataConnection(eSocketId Id) {
//if (!CheckConnection()) return false;
CMD_LOCK;
if(Id == siLive || Id == siLiveFilter)
if (m_DataSockets[Id] != NULL) {
std::string command = (std::string)"ABRT " + (const char*)itoa(Id);
if (!Command(command, 220)) {
if (errno == 0)
esyslog("ERROR: Streamdev: Couldn't cleanly close data connection");
//return false;
}
DELETENULL(m_DataSockets[Id]);
}
return true;
}
bool cClientSocket::SetChannelDevice(const cChannel *Channel) {
if (!CheckConnection()) return false;

View File

@ -1,5 +1,5 @@
/*
* $Id: socket.h,v 1.3 2005/02/08 17:22:35 lordjaxom Exp $
* $Id: socket.h,v 1.4 2007/04/24 10:57:34 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_CLIENT_CONNECTION_H
@ -47,6 +47,7 @@ public:
bool CheckConnection(void);
bool ProvidesChannel(const cChannel *Channel, int Priority);
bool CreateDataConnection(eSocketId Id);
bool CloseDataConnection(eSocketId Id);
bool SetChannelDevice(const cChannel *Channel);
bool SetPid(int Pid, bool On);
#if VDRVERSNUM >= 10300

View File

@ -11,7 +11,7 @@
using namespace std;
const char *VERSION = "0.3.3-20070403";
const char *VERSION = "0.3.3-20070509";
const char *StreamTypes[st_Count] = {
"TS",

View File

@ -1,5 +1,5 @@
/*
* $Id: common.h,v 1.7 2005/11/06 16:43:58 lordjaxom Exp $
* $Id: common.h,v 1.8 2007/04/24 10:50:13 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_COMMON_H
@ -83,6 +83,7 @@ enum eSuspendMode {
enum eSocketId {
siLive,
siReplay,
siLiveFilter,
si_Count
};

View File

@ -1,5 +1,5 @@
/*
* $Id: connection.c,v 1.8 2007/01/15 12:00:19 schmirl Exp $
* $Id: connection.c,v 1.10 2007/05/07 12:25:11 schmirl Exp $
*/
#include "server/connection.h"
@ -101,9 +101,16 @@ bool cServerConnection::Respond(const char *Message, bool Last, ...)
length = vasprintf(&buffer, Message, ap);
va_end(ap);
if (length < 0) {
esyslog("ERROR: streamdev: buffer allocation failed (%s) for %s:%d",
m_Protocol, RemoteIp().c_str(), RemotePort());
return false;
}
if (m_WriteBytes + length + 2 > sizeof(m_WriteBuffer)) {
esyslog("ERROR: streamdev: output buffer overflow (%s) for %s:%d",
m_Protocol, RemoteIp().c_str(), RemotePort());
free(buffer);
return false;
}
Dprintf("OUT: |%s|\n", buffer);

View File

@ -1,5 +1,5 @@
/*
* $Id: connection.h,v 1.4 2007/04/02 10:32:34 schmirl Exp $
* $Id: connection.h,v 1.5 2007/04/16 11:01:02 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_SERVER_CONNECTION_H
@ -38,8 +38,8 @@ protected:
Only one line at a time may be sent. If there are lines to follow, set
Last to false. Command(NULL) will be called in the next cycle, so you can
post the next line. */
virtual bool Respond(const char *Message, bool Last = true, ...)
__attribute__ ((format (printf, 2, 4)));
virtual bool Respond(const char *Message, bool Last = true, ...);
//__attribute__ ((format (printf, 2, 4)));
public:
/* If you derive, specify a short string such as HTTP for Protocol, which

View File

@ -1,5 +1,5 @@
/*
* $Id: connectionHTTP.c,v 1.10 2006/01/26 19:40:18 lordjaxom Exp $
* $Id: connectionHTTP.c,v 1.12 2007/05/09 09:12:42 schmirl Exp $
*/
#include <ctype.h>
@ -41,6 +41,8 @@ bool cConnectionHTTP::Command(char *Cmd)
}
Dprintf("header\n");
return true;
default:
break;
}
return false; // ??? shouldn't happen
}
@ -69,6 +71,8 @@ bool cConnectionHTTP::ProcessRequest(void)
device->SwitchChannel(m_Channel, false);
if (m_LiveStreamer->SetChannel(m_Channel, m_StreamType, m_Apid)) {
m_LiveStreamer->SetDevice(device);
if (!SetDSCP())
LOG_ERROR_STR("unable to set DSCP sockopt");
if (m_StreamType == stES && (m_Apid != 0 || ISRADIO(m_Channel))) {
return Respond("HTTP/1.0 200 OK")
&& Respond("Content-Type: audio/mpeg")
@ -153,7 +157,7 @@ bool cConnectionHTTP::CmdGET(const std::string &Opts)
{
const char *sp = Opts.c_str(), *ptr = sp, *ep;
const cChannel *chan;
int apid = 0, pos;
int apid = 0;
ptr = skipspace(ptr);
while (*ptr == '/')

View File

@ -1,5 +1,5 @@
/*
* $Id: connectionVTP.c,v 1.8 2007/03/02 15:27:07 schmirl Exp $
* $Id: connectionVTP.c,v 1.14 2007/05/09 09:12:42 schmirl Exp $
*/
#include "server/connectionVTP.h"
@ -153,8 +153,6 @@ cLSTEHandler::~cLSTEHandler()
bool cLSTEHandler::Next(bool &Last)
{
char *buffer;
if (m_Error != NULL) {
Last = true;
cString str(m_Error, true);
@ -468,6 +466,8 @@ cConnectionVTP::cConnectionVTP(void):
cServerConnection("VTP"),
m_LiveSocket(NULL),
m_LiveStreamer(NULL),
m_FilterSocket(NULL),
m_FilterStreamer(NULL),
m_LastCommand(NULL),
m_NoTSPIDS(false),
m_LSTEHandler(NULL),
@ -482,11 +482,18 @@ cConnectionVTP::~cConnectionVTP()
free(m_LastCommand);
delete m_LiveStreamer;
delete m_LiveSocket;
delete m_FilterStreamer;
delete m_FilterSocket;
delete m_LSTTHandler;
delete m_LSTCHandler;
delete m_LSTEHandler;
}
inline bool cConnectionVTP::Abort(void) const
{
return m_LiveStreamer && m_LiveStreamer->Abort();
}
void cConnectionVTP::Welcome(void)
{
Respond(220, "Welcome to Video Disk Recorder (VTP)");
@ -500,12 +507,14 @@ void cConnectionVTP::Reject(void)
void cConnectionVTP::Detach(void)
{
if (m_LiveStreamer != NULL) m_LiveStreamer->Detach();
if (m_LiveStreamer) m_LiveStreamer->Detach();
if (m_FilterStreamer) m_FilterStreamer->Detach();
}
void cConnectionVTP::Attach(void)
{
if (m_LiveStreamer != NULL) m_LiveStreamer->Attach();
if (m_LiveStreamer) m_LiveStreamer->Attach();
if (m_FilterStreamer) m_FilterStreamer->Attach();
}
bool cConnectionVTP::Command(char *Cmd)
@ -549,8 +558,8 @@ bool cConnectionVTP::Command(char *Cmd)
else if (strcasecmp(Cmd, "ADDF") == 0) return CmdADDF(param);
else if (strcasecmp(Cmd, "DELF") == 0) return CmdDELF(param);
else if (strcasecmp(Cmd, "ABRT") == 0) return CmdABRT(param);
else if (strcasecmp(Cmd, "QUIT") == 0) return CmdQUIT(param);
else if (strcasecmp(Cmd, "SUSP") == 0) return CmdSUSP(param);
else if (strcasecmp(Cmd, "QUIT") == 0) return CmdQUIT();
else if (strcasecmp(Cmd, "SUSP") == 0) return CmdSUSP();
// Commands adopted from SVDRP
//else if (strcasecmp(Cmd, "DELR") == 0) return CmdDELR(param);
else if (strcasecmp(Cmd, "MODT") == 0) return CmdMODT(param);
@ -562,8 +571,6 @@ bool cConnectionVTP::Command(char *Cmd)
bool cConnectionVTP::CmdCAPS(char *Opts)
{
char *buffer;
if (strcasecmp(Opts, "TS") == 0) {
m_NoTSPIDS = true;
return Respond(220, "Ignored, capability \"%s\" accepted for "
@ -575,6 +582,14 @@ bool cConnectionVTP::CmdCAPS(char *Opts)
return Respond(220, "Capability \"%s\" accepted", Opts);
}
#if VDRVERSNUM >= 10300
//
// Deliver section filters data in separate, channel-independent data stream
//
if (strcasecmp(Opts, "FILTERS") == 0)
return Respond(220, "Capability \"%s\" accepted", Opts);
#endif
return Respond(561, "Capability \"%s\" not known", Opts);
}
@ -607,9 +622,14 @@ bool cConnectionVTP::CmdPORT(char *Opts)
id = strtoul(Opts, &ep, 10);
if (ep == Opts || !isspace(*ep))
return Respond(500, "Use: PORT Id Destination");
if (id != 0)
#if VDRVERSNUM >= 10300
if (id != siLive && id != siLiveFilter)
return Respond(501, "Wrong connection id %d", id);
#else
if (id != siLive)
return Respond(501, "Wrong connection id %d", id);
#endif
Opts = skipspace(ep);
n = 0;
@ -636,6 +656,33 @@ bool cConnectionVTP::CmdPORT(char *Opts)
isyslog("Streamdev: Setting data connection to %s:%d", dataip, dataport);
#if VDRVERSNUM >= 10300
if (id == siLiveFilter) {
if(m_FilterStreamer)
m_FilterStreamer->Stop();
delete m_FilterSocket;
m_FilterSocket = new cTBSocket(SOCK_STREAM);
if (!m_FilterSocket->Connect(dataip, dataport)) {
esyslog("ERROR: Streamdev: Couldn't open data connection to %s:%d: %s",
dataip, dataport, strerror(errno));
DELETENULL(m_FilterSocket);
return Respond(551, "Couldn't open data connection");
}
if(!m_FilterStreamer)
m_FilterStreamer = new cStreamdevFilterStreamer;
m_FilterStreamer->Start(m_FilterSocket);
m_FilterStreamer->Activate(true);
return Respond(220, "Port command ok, data connection opened");
}
#endif
if(m_LiveSocket && m_LiveStreamer)
m_LiveStreamer->Stop();
delete m_LiveSocket;
m_LiveSocket = new cTBSocket(SOCK_STREAM);
if (!m_LiveSocket->Connect(dataip, dataport)) {
esyslog("ERROR: Streamdev: Couldn't open data connection to %s:%d: %s",
@ -644,10 +691,12 @@ bool cConnectionVTP::CmdPORT(char *Opts)
return Respond(551, "Couldn't open data connection");
}
if (id == siLive)
if (!m_LiveSocket->SetDSCP())
LOG_ERROR_STR("unable to set DSCP sockopt");
if (m_LiveStreamer)
m_LiveStreamer->Start(m_LiveSocket);
return Respond(220, "Port command ok, data connection opened");
return Respond(220, "Port command ok, data connection opened");
}
bool cConnectionVTP::CmdTUNE(char *Opts)
@ -668,7 +717,16 @@ bool cConnectionVTP::CmdTUNE(char *Opts)
m_LiveStreamer = new cStreamdevLiveStreamer(1);
m_LiveStreamer->SetChannel(chan, m_NoTSPIDS ? stTS : stTSPIDS);
m_LiveStreamer->SetDevice(dev);
if(m_LiveSocket)
m_LiveStreamer->Start(m_LiveSocket);
#if VDRVERSNUM >= 10300
if(!m_FilterStreamer)
m_FilterStreamer = new cStreamdevFilterStreamer;
m_FilterStreamer->SetDevice(dev);
//m_FilterStreamer->SetChannel(chan);
#endif
return Respond(220, "Channel tuned");
}
@ -706,8 +764,8 @@ bool cConnectionVTP::CmdADDF(char *Opts)
int pid, tid, mask;
char *ep;
if (m_LiveStreamer == NULL)
return Respond(560, "Can't set filters without a stream");
if (m_FilterStreamer == NULL)
return Respond(560, "Can't set filters without a filter stream");
pid = strtol(Opts, &ep, 10);
if (ep == Opts || (*ep != ' '))
@ -721,7 +779,7 @@ bool cConnectionVTP::CmdADDF(char *Opts)
if (ep == Opts || (*ep != '\0' && *ep != ' '))
return Respond(500, "Use: ADDF Pid Tid Mask");
return m_LiveStreamer->SetFilter(pid, tid, mask, true)
return m_FilterStreamer->SetFilter(pid, tid, mask, true)
? Respond(220, "Filter %d transferring", pid)
: Respond(560, "Filter %d not available", pid);
#else
@ -735,7 +793,7 @@ bool cConnectionVTP::CmdDELF(char *Opts)
int pid, tid, mask;
char *ep;
if (m_LiveStreamer == NULL)
if (m_FilterStreamer == NULL)
return Respond(560, "Can't delete filters without a stream");
pid = strtol(Opts, &ep, 10);
@ -750,9 +808,8 @@ bool cConnectionVTP::CmdDELF(char *Opts)
if (ep == Opts || (*ep != '\0' && *ep != ' '))
return Respond(500, "Use: DELF Pid Tid Mask");
return m_LiveStreamer->SetFilter(pid, tid, mask, false)
? Respond(220, "Filter %d stopped", pid)
: Respond(560, "Filter %d not transferring", pid);
m_FilterStreamer->SetFilter(pid, tid, mask, false);
return Respond(220, "Filter %d stopped", pid);
#else
return Respond(500, "DELF known but unimplemented with VDR < 1.3.0");
#endif
@ -768,20 +825,32 @@ bool cConnectionVTP::CmdABRT(char *Opts)
return Respond(500, "Use: ABRT Id");
switch (id) {
case 0: DELETENULL(m_LiveStreamer); break;
case siLive:
DELETENULL(m_LiveStreamer);
DELETENULL(m_LiveSocket);
break;
#if VDRVERSNUM >= 10300
case siLiveFilter:
DELETENULL(m_FilterStreamer);
DELETENULL(m_FilterSocket);
break;
#endif
default:
return Respond(501, "Wrong connection id %d", id);
break;
}
DELETENULL(m_LiveSocket);
return Respond(220, "Data connection closed");
}
bool cConnectionVTP::CmdQUIT(char *Opts)
bool cConnectionVTP::CmdQUIT(void)
{
DeferClose();
return Respond(221, "Video Disk Recorder closing connection");
}
bool cConnectionVTP::CmdSUSP(char *Opts)
bool cConnectionVTP::CmdSUSP(void)
{
if (StreamdevServerSetup.SuspendMode == smAlways || cSuspendCtl::IsActive())
return Respond(220, "Server is suspended");

View File

@ -2,9 +2,10 @@
#define VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H
#include "server/connection.h"
#include "server/livestreamer.h"
class cTBSocket;
class cStreamdevLiveStreamer;
class cStreamdevFilterStreamer;
class cLSTEHandler;
class cLSTCHandler;
class cLSTTHandler;
@ -16,8 +17,10 @@ class cConnectionVTP: public cServerConnection {
using cServerConnection::Respond;
private:
cTBSocket *m_LiveSocket;
cStreamdevLiveStreamer *m_LiveStreamer;
cTBSocket *m_LiveSocket;
cStreamdevLiveStreamer *m_LiveStreamer;
cTBSocket *m_FilterSocket;
cStreamdevFilterStreamer *m_FilterStreamer;
char *m_LastCommand;
bool m_NoTSPIDS;
@ -53,8 +56,8 @@ public:
bool CmdADDF(char *Opts);
bool CmdDELF(char *Opts);
bool CmdABRT(char *Opts);
bool CmdQUIT(char *Opts);
bool CmdSUSP(char *Opts);
bool CmdQUIT(void);
bool CmdSUSP(void);
// Thread-safe implementations of SVDRP commands
bool CmdLSTE(char *Opts);
@ -73,9 +76,4 @@ public:
__attribute__ ((format (printf, 3, 4)));
};
inline bool cConnectionVTP::Abort(void) const
{
return m_LiveStreamer && m_LiveStreamer->Abort();
}
#endif // VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H

View File

@ -1,20 +1,24 @@
/*
* $Id: livefilter.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
* $Id: livefilter.c,v 1.4 2007/04/24 11:06:12 schmirl Exp $
*/
#include "server/livefilter.h"
#include "server/livestreamer.h"
#include "server/streamer.h"
#include "common.h"
#ifndef TS_SIZE
# define TS_SIZE 188
#endif
#ifndef TS_SYNC_BYTE
# define TS_SYNC_BYTE 0x47
#endif
#if VDRVERSNUM >= 10300
cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevLiveStreamer *Streamer) {
cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevStreamer *Streamer) {
m_Streamer = Streamer;
}
cStreamdevLiveFilter::~cStreamdevLiveFilter() {
}
void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data, int Length)
{
uchar buffer[TS_SIZE];
@ -24,7 +28,7 @@ void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data,
while (length > 0) {
int chunk = min(length, TS_SIZE - 5);
buffer[0] = TS_SYNC_BYTE;
buffer[1] = (Pid >> 8) & 0xff;
buffer[1] = ((Pid >> 8) & 0x3f) | (pos==0 ? 0x40 : 0); /* bit 6: payload unit start indicator (PUSI) */
buffer[2] = Pid & 0xff;
buffer[3] = Tid;
buffer[4] = (uchar)chunk;

View File

@ -1,5 +1,5 @@
/*
* $Id: livefilter.h,v 1.2 2005/11/07 19:28:41 lordjaxom Exp $
* $Id: livefilter.h,v 1.4 2007/04/24 11:29:29 schmirl Exp $
*/
#ifndef VDR_STREAMEV_LIVEFILTER_H
@ -11,20 +11,24 @@
#include <vdr/filter.h>
class cStreamdevLiveStreamer;
class cStreamdevStreamer;
class cStreamdevLiveFilter: public cFilter {
friend class cStreamdevLiveStreamer;
private:
cStreamdevLiveStreamer *m_Streamer;
cStreamdevStreamer *m_Streamer;
protected:
virtual void Process(u_short Pid, u_char Tid, const u_char *Data, int Length);
public:
cStreamdevLiveFilter(cStreamdevLiveStreamer *Streamer);
virtual ~cStreamdevLiveFilter();
cStreamdevLiveFilter(cStreamdevStreamer *Streamer);
void Set(u_short Pid, u_char Tid, u_char Mask) {
cFilter::Set(Pid, Tid, Mask);
}
void Del(u_short Pid, u_char Tid, u_char Mask) {
cFilter::Del(Pid, Tid, Mask);
}
};
# endif // VDRVERSNUM >= 10300

View File

@ -1,6 +1,12 @@
#include <assert.h>
#include <libsi/section.h>
#include <libsi/descriptor.h>
#include <vdr/ringbuffer.h>
#include "server/livestreamer.h"
#include "server/livefilter.h"
#include "remux/ts2ps.h"
#include "remux/ts2es.h"
#include "remux/extern.h"
@ -8,12 +14,31 @@
// --- cStreamdevLiveReceiver -------------------------------------------------
class cStreamdevLiveReceiver: public cReceiver {
friend class cStreamdevStreamer;
private:
cStreamdevStreamer *m_Streamer;
protected:
virtual void Activate(bool On);
virtual void Receive(uchar *Data, int Length);
public:
#if VDRVERSNUM < 10500
cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca,
cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, int Ca, int Priority, const int *Pids);
#else
cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, tChannelID ChannelID, int Priority, const int *Pids);
#endif
virtual ~cStreamdevLiveReceiver();
};
#if VDRVERSNUM < 10500
cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, int Ca,
int Priority, const int *Pids):
cReceiver(Ca, Priority, 0, Pids),
#else
cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, tChannelID ChannelID,
cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, tChannelID ChannelID,
int Priority, const int *Pids):
cReceiver(ChannelID, Priority, 0, Pids),
#endif
@ -33,6 +58,228 @@ void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) {
m_Streamer->ReportOverflow(Length - p);
}
inline void cStreamdevLiveReceiver::Activate(bool On)
{
Dprintf("LiveReceiver->Activate(%d)\n", On);
m_Streamer->Activate(On);
}
// --- cStreamdevPatFilter ----------------------------------------------------
class cStreamdevPatFilter : public cFilter {
private:
int pmtPid;
int pmtSid;
int pmtVersion;
const cChannel *m_Channel;
cStreamdevLiveStreamer *m_Streamer;
virtual void Process(u_short Pid, u_char Tid, const u_char *Data, int Length);
int GetPid(SI::PMT::Stream& stream);
public:
cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel);
};
cStreamdevPatFilter::cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel)
{
Dprintf("cStreamdevPatFilter(\"%s\")", Channel->Name());
assert(Streamer);
m_Channel = Channel;
m_Streamer = Streamer;
pmtPid = 0;
pmtSid = 0;
pmtVersion = -1;
Set(0x00, 0x00); // PAT
}
static const char * const psStreamTypes[] = {
"UNKNOWN",
"ISO/IEC 11172 Video",
"ISO/IEC 13818-2 Video",
"ISO/IEC 11172 Audio",
"ISO/IEC 13818-3 Audio",
"ISO/IEC 13818-1 Privete sections",
"ISO/IEC 13818-1 Private PES data",
"ISO/IEC 13512 MHEG",
"ISO/IEC 13818-1 Annex A DSM CC",
"0x09",
"ISO/IEC 13818-6 Multiprotocol encapsulation",
"ISO/IEC 13818-6 DSM-CC U-N Messages",
"ISO/IEC 13818-6 Stream Descriptors",
"ISO/IEC 13818-6 Sections (any type, including private data)",
"ISO/IEC 13818-1 auxiliary",
"ISO/IEC 13818-7 Audio with ADTS transport sytax",
"ISO/IEC 14496-2 Visual (MPEG-4)",
"ISO/IEC 14496-3 Audio with LATM transport syntax",
"0x12", "0x13", "0x14", "0x15", "0x16", "0x17", "0x18", "0x19", "0x1a",
"ISO/IEC 14496-10 Video (MPEG-4 part 10/AVC, aka H.264)",
"",
};
int cStreamdevPatFilter::GetPid(SI::PMT::Stream& stream)
{
SI::Descriptor *d;
if (!stream.getPid())
return 0;
switch (stream.getStreamType()) {
case 0x01: // ISO/IEC 11172 Video
case 0x02: // ISO/IEC 13818-2 Video
case 0x03: // ISO/IEC 11172 Audio
case 0x04: // ISO/IEC 13818-3 Audio
#if 0
case 0x07: // ISO/IEC 13512 MHEG
case 0x08: // ISO/IEC 13818-1 Annex A DSM CC
case 0x0a: // ISO/IEC 13818-6 Multiprotocol encapsulation
case 0x0b: // ISO/IEC 13818-6 DSM-CC U-N Messages
case 0x0c: // ISO/IEC 13818-6 Stream Descriptors
case 0x0d: // ISO/IEC 13818-6 Sections (any type, including private data)
case 0x0e: // ISO/IEC 13818-1 auxiliary
#endif
case 0x0f: // ISO/IEC 13818-7 Audio with ADTS transport syntax
case 0x10: // ISO/IEC 14496-2 Visual (MPEG-4)
case 0x11: // ISO/IEC 14496-3 Audio with LATM transport syntax
case 0x1b: // ISO/IEC 14496-10 Video (MPEG-4 part 10/AVC, aka H.264)
Dprintf("cStreamdevPatFilter PMT scanner adding PID %d (%s)",
stream.getPid(), psStreamTypes[stream.getStreamType()]);
return stream.getPid();
case 0x05: // ISO/IEC 13818-1 private sections
case 0x06: // ISO/IEC 13818-1 PES packets containing private data
for (SI::Loop::Iterator it; (d = stream.streamDescriptors.getNext(it)); ) {
switch (d->getDescriptorTag()) {
case SI::AC3DescriptorTag:
Dprintf("cStreamdevPatFilter PMT scanner: adding PID %d (%s) %s",
stream.getPid(), psStreamTypes[stream.getStreamType()], "AC3");
return stream.getPid();
case SI::TeletextDescriptorTag:
Dprintf("cStreamdevPatFilter PMT scanner: adding PID %d (%s) %s",
stream.getPid(), psStreamTypes[stream.getStreamType()], "Teletext");
return stream.getPid();
case SI::SubtitlingDescriptorTag:
Dprintf("cStreamdevPatFilter PMT scanner: adding PID %d (%s) %s",
stream.getPid(), psStreamTypes[stream.getStreamType()], "DVBSUB");
return stream.getPid();
default:
Dprintf("cStreamdevPatFilter PMT scanner: NOT adding PID %d (%s) %s",
stream.getPid(), psStreamTypes[stream.getStreamType()], "UNKNOWN");
break;
}
delete d;
}
break;
default:
/* This following section handles all the cases where the audio track
* info is stored in PMT user info with stream id >= 0x80
* we check the registration format identifier to see if it
* holds "AC-3"
*/
if (stream.getStreamType() >= 0x80) {
bool found = false;
for (SI::Loop::Iterator it; (d = stream.streamDescriptors.getNext(it)); ) {
switch (d->getDescriptorTag()) {
case SI::RegistrationDescriptorTag:
/* unfortunately libsi does not implement RegistrationDescriptor */
if (d->getLength() >= 4) {
found = true;
SI::CharArray rawdata = d->getData();
if (/*rawdata[0] == 5 && rawdata[1] >= 4 && */
rawdata[2] == 'A' && rawdata[3] == 'C' &&
rawdata[4] == '-' && rawdata[5] == '3') {
isyslog("cStreamdevPatFilter PMT scanner:"
"Adding pid %d (type 0x%x) RegDesc len %d (%c%c%c%c)",
stream.getPid(), stream.getStreamType(),
d->getLength(), rawdata[2], rawdata[3],
rawdata[4], rawdata[5]);
return stream.getPid();
}
}
break;
default:
break;
}
delete d;
}
if(!found) {
isyslog("Adding pid %d (type 0x%x) RegDesc not found -> assume AC-3",
stream.getPid(), stream.getStreamType());
return stream.getPid();
}
}
Dprintf("cStreamdevPatFilter PMT scanner: NOT adding PID %d (%s) %s",
stream.getPid(), psStreamTypes[stream.getStreamType()<0x1c?stream.getStreamType():0], "UNKNOWN");
break;
}
return 0;
}
void cStreamdevPatFilter::Process(u_short Pid, u_char Tid, const u_char *Data, int Length)
{
if (Pid == 0x00) {
if (Tid == 0x00 && !pmtPid) {
SI::PAT pat(Data, false);
if (!pat.CheckCRCAndParse())
return;
SI::PAT::Association assoc;
for (SI::Loop::Iterator it; pat.associationLoop.getNext(assoc, it); ) {
if (!assoc.isNITPid()) {
const cChannel *Channel = Channels.GetByServiceID(Source(), Transponder(), assoc.getServiceId());
if (Channel && (Channel == m_Channel)) {
if (0 != (pmtPid = assoc.getPid())) {
Dprintf("cStreamdevPatFilter: PMT pid for channel %s: %d", Channel->Name(), pmtPid);
pmtSid = assoc.getServiceId();
if (Length < TS_SIZE-5) {
// repack PAT to TS frame and send to client
uint8_t pat_ts[TS_SIZE] = {TS_SYNC_BYTE, 0x40 /* pusi=1 */, 0 /* pid=0 */, 0x10 /* adaption=1 */, 0 /* pointer */};
memcpy(pat_ts + 5, Data, Length);
m_Streamer->Put(pat_ts, TS_SIZE);
} else
isyslog("cStreamdevPatFilter: PAT size %d too large to fit in one TS", Length);
m_Streamer->SetPids(pmtPid);
Add(pmtPid, 0x02);
pmtVersion = -1;
return;
}
}
}
}
}
} else if (Pid == pmtPid && Tid == SI::TableIdPMT && Source() && Transponder()) {
SI::PMT pmt(Data, false);
if (!pmt.CheckCRCAndParse())
return;
if (pmt.getServiceId() != pmtSid)
return; // skip broken PMT records
if (pmtVersion != -1) {
if (pmtVersion != pmt.getVersionNumber()) {
Dprintf("cStreamdevPatFilter: PMT version changed, detaching all pids");
Del(pmtPid, 0x02);
pmtPid = 0; // this triggers PAT scan
}
return;
}
pmtVersion = pmt.getVersionNumber();
SI::PMT::Stream stream;
int pids[MAXRECEIVEPIDS + 1], npids = 0;
pids[npids++] = pmtPid;
#if 0
pids[npids++] = 0x10; // pid 0x10, tid 0x40: NIT
pids[npids++] = 0x11; // pid 0x11, tid 0x42: SDT
pids[npids++] = 0x12; // pid 0x12, tid 0x4E...0x6F: EIT
pids[npids++] = 0x14; // pid 0x14, tid 0x70: TDT
#endif
for (SI::Loop::Iterator it; pmt.streamLoop.getNext(stream, it); )
if (0 != (pids[npids] = GetPid(stream)) && npids < MAXRECEIVEPIDS)
npids++;
pids[npids] = 0;
m_Streamer->SetPids(pmt.getPCRPid(), pids);
}
}
// --- cStreamdevLiveStreamer -------------------------------------------------
cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority):
@ -43,6 +290,7 @@ cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority):
m_Channel(NULL),
m_Device(NULL),
m_Receiver(NULL),
m_PatFilter(NULL),
m_PESRemux(NULL),
m_ESRemux(NULL),
m_PSRemux(NULL),
@ -54,14 +302,24 @@ cStreamdevLiveStreamer::~cStreamdevLiveStreamer()
{
Dprintf("Desctructing Live streamer\n");
Stop();
delete m_Receiver;
if(m_PatFilter) {
Detach();
DELETENULL(m_PatFilter);
}
DELETENULL(m_Receiver);
delete m_PESRemux;
delete m_ESRemux;
delete m_PSRemux;
delete m_ExtRemux;
#if VDRVERSNUM >= 10300
//delete m_Filter; TODO
#endif
}
bool cStreamdevLiveStreamer::HasPid(int Pid)
{
int idx;
for (idx = 0; idx < m_NumPids; ++idx)
if (m_Pids[idx] == Pid)
return true;
return false;
}
bool cStreamdevLiveStreamer::SetPid(int Pid, bool On)
@ -93,6 +351,44 @@ bool cStreamdevLiveStreamer::SetPid(int Pid, bool On)
}
}
StartReceiver();
return true;
}
bool cStreamdevLiveStreamer::SetPids(int Pid, const int *Pids1, const int *Pids2, const int *Pids3)
{
m_NumPids = 0;
if (Pid)
m_Pids[m_NumPids++] = Pid;
if (Pids1)
for ( ; *Pids1 && m_NumPids < MAXRECEIVEPIDS; Pids1++)
if (!HasPid(*Pids1))
m_Pids[m_NumPids++] = *Pids1;
if (Pids2)
for ( ; *Pids2 && m_NumPids < MAXRECEIVEPIDS; Pids2++)
if (!HasPid(*Pids2))
m_Pids[m_NumPids++] = *Pids2;
if (Pids3)
for ( ; *Pids3 && m_NumPids < MAXRECEIVEPIDS; Pids3++)
if (!HasPid(*Pids3))
m_Pids[m_NumPids++] = *Pids3;
if (m_NumPids >= MAXRECEIVEPIDS) {
esyslog("ERROR: Streamdev: No free slot to receive pid %d\n", Pid);
return false;
}
m_Pids[m_NumPids] = 0;
StartReceiver();
return true;
}
void cStreamdevLiveStreamer::StartReceiver(void)
{
DELETENULL(m_Receiver);
if (m_NumPids > 0) {
Dprintf("Creating Receiver to respect changed pids\n");
@ -106,15 +402,19 @@ bool cStreamdevLiveStreamer::SetPid(int Pid, bool On)
Attach();
}
}
return true;
}
bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType StreamType, int Apid)
{
Dprintf("Initializing Remuxer for full channel transfer\n");
printf("ca pid: %d\n", Channel->Ca());
//printf("ca pid: %d\n", Channel->Ca());
m_Channel = Channel;
m_StreamType = StreamType;
int apid[2] = { Apid, 0 };
const int *Apids = Apid ? apid : m_Channel->Apids();
const int *Dpids = Apid ? NULL : m_Channel->Dpids();
switch (m_StreamType) {
case stES:
{
@ -122,51 +422,33 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType Str
if (Apid != 0)
pid = Apid;
m_ESRemux = new cTS2ESRemux(pid);
return SetPid(pid, true);
return SetPids(pid);
}
case stPES:
Dprintf("PES\n");
m_PESRemux = new cRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(),
m_Channel->Spids(), false);
if (Apid != 0)
return SetPid(m_Channel->Vpid(), true)
&& SetPid(Apid, true);
else
return SetPid(m_Channel->Vpid(), true)
&& SetPid(m_Channel->Apid(0), true)
&& SetPid(m_Channel->Dpid(0), true);
return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids());
case stPS:
m_PSRemux = new cTS2PSRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(),
m_Channel->Spids());
if (Apid != 0)
return SetPid(m_Channel->Vpid(), true)
&& SetPid(Apid, true);
else
return SetPid(m_Channel->Vpid(), true)
&& SetPid(m_Channel->Apid(0), true)
&& SetPid(m_Channel->Dpid(0), true);
return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids());
case stTS:
if (Apid != 0)
return SetPid(m_Channel->Vpid(), true)
&& SetPid(Apid, true);
else
return SetPid(m_Channel->Vpid(), true)
&& SetPid(m_Channel->Apid(0), true)
&& SetPid(m_Channel->Dpid(0), true);
// This should never happen, but ...
if (m_PatFilter) {
Detach();
DELETENULL(m_PatFilter);
}
// Set pids from PMT
m_PatFilter = new cStreamdevPatFilter(this, m_Channel);
return true;
case stExtern:
m_ExtRemux = new cExternRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(),
m_Channel->Spids());
if (Apid != 0)
return SetPid(m_Channel->Vpid(), true)
&& SetPid(Apid, true);
else
return SetPid(m_Channel->Vpid(), true)
&& SetPid(m_Channel->Apid(0), true)
&& SetPid(m_Channel->Dpid(0), true);
return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids());
case stTSPIDS:
Dprintf("pid streaming mode\n");
@ -175,25 +457,6 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType Str
return false;
}
bool cStreamdevLiveStreamer::SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On)
{
#if 0
Dprintf("setting filter\n");
if (On) {
if (m_Filter == NULL) {
m_Filter = new cStreamdevLiveFilter(this);
Dprintf("attaching filter to device\n");
m_Device->AttachFilter(m_Filter);
}
m_Filter->Set(Pid, Tid, Mask);
} else if (m_Filter != NULL)
m_Filter->Del(Pid, Tid, Mask);
return true;
#else
return false;
#endif
}
int cStreamdevLiveStreamer::Put(const uchar *Data, int Count)
{
switch (m_StreamType) {
@ -270,14 +533,28 @@ void cStreamdevLiveStreamer::Del(int Count)
void cStreamdevLiveStreamer::Attach(void)
{
printf("RIGHT ATTACH\n");
m_Device->AttachReceiver(m_Receiver);
Dprintf("cStreamdevLiveStreamer::Attach()\n");
if (m_Device) {
if (m_Receiver) {
m_Device->Detach(m_Receiver);
m_Device->AttachReceiver(m_Receiver);
}
if (m_PatFilter) {
m_Device->Detach(m_PatFilter);
m_Device->AttachFilter(m_PatFilter);
}
}
}
void cStreamdevLiveStreamer::Detach(void)
{
printf("RIGHT DETACH\n");
m_Device->Detach(m_Receiver);
Dprintf("cStreamdevLiveStreamer::Detach()\n");
if (m_Device) {
if (m_Receiver)
m_Device->Detach(m_Receiver);
if (m_PatFilter)
m_Device->Detach(m_PatFilter);
}
}
std::string cStreamdevLiveStreamer::Report(void)
@ -296,3 +573,110 @@ std::string cStreamdevLiveStreamer::Report(void)
result += "\n";
return result;
}
// --- cStreamdevFilterStreamer -------------------------------------------------
#if VDRVERSNUM >= 10300
cStreamdevFilterStreamer::cStreamdevFilterStreamer():
cStreamdevStreamer("streamdev-filterstreaming"),
m_Device(NULL),
m_Filter(NULL)/*,
m_Channel(NULL)*/
{
}
cStreamdevFilterStreamer::~cStreamdevFilterStreamer()
{
Dprintf("Desctructing Filter streamer\n");
Detach();
m_Device = NULL;
DELETENULL(m_Filter);
Stop();
}
void cStreamdevFilterStreamer::Attach(void)
{
Dprintf("cStreamdevFilterStreamer::Attach()\n");
LOCK_THREAD;
if(m_Device && m_Filter)
m_Device->AttachFilter(m_Filter);
}
void cStreamdevFilterStreamer::Detach(void)
{
Dprintf("cStreamdevFilterStreamer::Detach()\n");
LOCK_THREAD;
if(m_Device && m_Filter)
m_Device->Detach(m_Filter);
}
#if 0
void cStreamdevFilterStreamer::SetChannel(const cChannel *Channel)
{
LOCK_THREAD;
Dprintf("cStreamdevFilterStreamer::SetChannel(%s : %s)", Channel?Channel->Name():"<null>",
Channel ? *Channel->GetChannelID().ToString() : "");
m_Channel = Channel;
}
#endif
void cStreamdevFilterStreamer::SetDevice(cDevice *Device)
{
Dprintf("cStreamdevFilterStreamer::SetDevice()\n");
LOCK_THREAD;
if(Device != m_Device) {
Detach();
m_Device = Device;
//m_Channel = NULL;
Attach();
}
}
bool cStreamdevFilterStreamer::SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On)
{
Dprintf("cStreamdevFilterStreamer::SetFilter(%u,0x%x,0x%x,%s)\n", Pid, Tid, Mask, On?"On":"Off");
if(!m_Device)
return false;
if (On) {
if (m_Filter == NULL) {
m_Filter = new cStreamdevLiveFilter(this);
Dprintf("attaching filter to device\n");
Attach();
}
m_Filter->Set(Pid, Tid, Mask);
} else if (m_Filter != NULL)
m_Filter->Del(Pid, Tid, Mask);
return true;
}
#if 0
void cStreamdevFilterStreamer::ChannelSwitch(const cDevice *Device, int ChannelNumber) {
LOCK_THREAD;
if(Device == m_Device) {
if(ChannelNumber > 0) {
cChannel *ch = Channels.GetByNumber(ChannelNumber);
if(ch != NULL) {
if(m_Filter != NULL &&
m_Channel != NULL &&
(! TRANSPONDER(ch, m_Channel))) {
isyslog("***** LiveFilterStreamer: transponder changed ! %s",
*ch->GetChannelID().ToString());
uchar buffer[TS_SIZE] = {TS_SYNC_BYTE, 0xff, 0xff, 0xff, 0x7f, 0};
strcpy((char*)(buffer + 5), ch->GetChannelID().ToString());
int p = Put(buffer, TS_SIZE);
if (p != TS_SIZE)
ReportOverflow(TS_SIZE - p);
}
m_Channel = ch;
}
}
}
}
#endif
#endif // if VDRVERSNUM >= 10300

View File

@ -5,34 +5,14 @@
#include <vdr/receiver.h>
#include "server/streamer.h"
#include "server/livefilter.h"
#include "common.h"
class cTS2PSRemux;
class cTS2ESRemux;
class cExternRemux;
class cRemux;
// --- cStreamdevLiveReceiver -------------------------------------------------
class cStreamdevLiveReceiver: public cReceiver {
friend class cStreamdevLiveStreamer;
private:
cStreamdevLiveStreamer *m_Streamer;
protected:
virtual void Activate(bool On);
virtual void Receive(uchar *Data, int Length);
public:
#if VDRVERSNUM < 10500
cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca, int Priority, const int *Pids);
#else
cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, tChannelID ChannelID, int Priority, const int *Pids);
#endif
virtual ~cStreamdevLiveReceiver();
};
class cStreamdevPatFilter;
class cStreamdevLiveReceiver;
// --- cStreamdevLiveStreamer -------------------------------------------------
@ -45,19 +25,23 @@ private:
const cChannel *m_Channel;
cDevice *m_Device;
cStreamdevLiveReceiver *m_Receiver;
cStreamdevPatFilter *m_PatFilter;
cRemux *m_PESRemux;
cTS2ESRemux *m_ESRemux;
cTS2PSRemux *m_PSRemux;
cExternRemux *m_ExtRemux;
void StartReceiver(void);
bool HasPid(int Pid);
public:
cStreamdevLiveStreamer(int Priority);
virtual ~cStreamdevLiveStreamer();
void SetDevice(cDevice *Device) { m_Device = Device; }
bool SetPid(int Pid, bool On);
bool SetPids(int Pid, const int *Pids1 = NULL, const int *Pids2 = NULL, const int *Pids3 = NULL);
bool SetChannel(const cChannel *Channel, eStreamType StreamType, int Apid = 0);
bool SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On);
virtual int Put(const uchar *Data, int Count);
virtual uchar *Get(int &Count);
@ -70,12 +54,36 @@ public:
virtual std::string Report(void);
};
// --- cStreamdevLiveReceiver reverse inlines ---------------------------------
inline void cStreamdevLiveReceiver::Activate(bool On)
{
Dprintf("LiveReceiver->Activate(%d)\n", On);
m_Streamer->Activate(On);
}
// --- cStreamdevFilterStreamer -------------------------------------------------
# if VDRVERSNUM >= 10300
//#include <vdr/status.h>
class cStreamdevLiveFilter;
class cStreamdevFilterStreamer: public cStreamdevStreamer /*, public cStatus*/ {
private:
cDevice *m_Device;
cStreamdevLiveFilter *m_Filter;
//const cChannel *m_Channel;
public:
cStreamdevFilterStreamer();
virtual ~cStreamdevFilterStreamer();
void SetDevice(cDevice *Device);
//void SetChannel(const cChannel *Channel);
bool SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On);
virtual void Attach(void);
virtual void Detach(void);
// cStatus message handlers
//virtual void ChannelSwitch(const cDevice *Device, int ChannelNumber);
};
# endif // if VDRVERSNUM >= 10300
#endif // VDR_STREAMDEV_LIVESTREAMER_H

View File

@ -3,7 +3,7 @@
*
* See the README file for copyright information and how to reach the author.
*
* $Id: streamdev-server.c,v 1.5 2007/02/19 12:08:16 schmirl Exp $
* $Id: streamdev-server.c,v 1.6 2007/04/16 11:01:02 schmirl Exp $
*/
#include <getopt.h>
@ -63,7 +63,7 @@ bool cPluginStreamdevServer::Start(void)
if (!StreamdevHosts.Load(STREAMDEVHOSTSPATH, true, true)) {
esyslog("streamdev-server: error while loading %s", STREAMDEVHOSTSPATH);
fprintf(stderr, "streamdev-server: error while loading %s\n");
fprintf(stderr, "streamdev-server: error while loading %s\n", STREAMDEVHOSTSPATH);
if (access(STREAMDEVHOSTSPATH, F_OK) != 0) {
fprintf(stderr, " Please install streamdevhosts.conf into the path "
"printed above. Without it\n"

View File

@ -6,6 +6,15 @@
#include <errno.h>
#include <fcntl.h>
// default class: best effort
#define DSCP_BE 0
// gold class (video): assured forwarding 4 with lowest drop precedence
#define DSCP_AF41 34 << 2
// premium class (voip): expedited forwarding
#define DSCP_EF 46 << 2
// actual DSCP value used
#define STREAMDEV_DSCP DSCP_AF41
cTBSocket::cTBSocket(int Type) {
memset(&m_LocalAddr, 0, sizeof(m_LocalAddr));
memset(&m_RemoteAddr, 0, sizeof(m_RemoteAddr));
@ -141,3 +150,8 @@ bool cTBSocket::Shutdown(int how) {
return ::shutdown(*this, how) != -1;
}
bool cTBSocket::SetDSCP(void) {
int dscp = STREAMDEV_DSCP;
return ::setsockopt(*this, SOL_IP, IP_TOS, &dscp, sizeof(dscp)) != -1;
}

View File

@ -68,6 +68,9 @@ public:
an appropriate value. */
virtual bool Accept(const cTBSocket &Listener);
/* Sets DSCP sockopt */
bool SetDSCP(void);
/* LocalPort() returns the port number this socket is connected to locally.
The result is undefined for a non-open socket. */
int LocalPort(void) const { return ntohs(m_LocalAddr.sin_port); }

View File

@ -110,7 +110,7 @@ bool cTBSource::SafeWrite(const void *Buffer, size_t Length) {
ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq,
uint TimeoutMs) {
int seqlen, ms;
int ms;
size_t len;
cTBSelect sel;