- first adoptions (transfer-commit)

This commit is contained in:
lordjaxom 2005-02-08 13:59:16 +00:00
parent 04a5985dcc
commit 8e4556b0a9
14 changed files with 76 additions and 163 deletions

View File

@ -1,5 +1,5 @@
/*
* $Id: filter.c,v 1.1 2004/12/30 22:44:04 lordjaxom Exp $
* $Id: filter.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
*/
#include "client/filter.h"
@ -82,32 +82,9 @@ cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) {
}
void cStreamdevFilters::Put(const uchar *Data) {
static time_t firsterr = 0;
static int errcnt = 0;
static bool showerr = true;
int p = m_RingBuffer->Put(Data, TS_SIZE);
if (p != TS_SIZE) {
++errcnt;
if (showerr) {
if (firsterr == 0)
firsterr = time_ms();
else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) {
esyslog("ERROR: too many buffer overflows, logging stopped");
showerr = false;
firsterr = time_ms();
}
} else if (firsterr + BUFOVERTIME < time_ms()) {
showerr = true;
firsterr = 0;
errcnt = 0;
}
if (showerr)
esyslog("ERROR: ring buffer overflow (%d bytes dropped)", TS_SIZE - p);
else
firsterr = time_ms();
}
if (p != TS_SIZE)
m_RingBuffer->ReportOverflow(TS_SIZE - p);
}
void cStreamdevFilters::Action(void) {

View File

@ -1,5 +1,5 @@
/*
* $Id: remote.c,v 1.1 2004/12/30 22:44:02 lordjaxom Exp $
* $Id: remote.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
*/
#include "client/remote.h"
@ -457,7 +457,7 @@ const char *cRemoteTimer::ToText(void) {
summary = strreplace(strdup(m_Summary), ':', '|');
asprintf(&m_Buffer, "%d:%s:%s:%04d:%04d:%d:%d:%s:%s", m_Active,
Channel()->GetChannelID().ToString(), PrintDay(m_Day, m_FirstDay),
(const char*)Channel()->GetChannelID().ToString(), PrintDay(m_Day, m_FirstDay),
m_Start, m_Stop, m_Priority, m_Lifetime, m_File, summary ? summary : "");
if (summary != NULL)

View File

@ -1,5 +1,5 @@
/*
* $Id: socket.c,v 1.1 2004/12/30 22:44:04 lordjaxom Exp $
* $Id: socket.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
*/
#include <tools/select.h>
@ -39,15 +39,12 @@ cTBSocket *cClientSocket::DataSocket(eSocketId Id) const {
bool cClientSocket::Command(const cTBString &Command, uint Expected,
uint TimeoutMs) {
cTBString pkt;
time_t st;
errno = 0;
pkt = Command + "\015\012";
cTBString pkt = Command + "\015\012";
Dprintf("OUT: |%s|\n", (const char*)Command);
st = time_ms();
cTimeMs starttime;
if (!TimedWrite((const char*)pkt, pkt.Length(), TimeoutMs)) {
esyslog("Streamdev: Lost connection to %s:%d: %s",
(const char*)RemoteIp(), RemotePort(), strerror(errno));
@ -55,8 +52,9 @@ bool cClientSocket::Command(const cTBString &Command, uint Expected,
return false;
}
if (Expected != 0) {
TimeoutMs -= time_ms() - st;
uint64 elapsed = starttime.Elapsed();
if (Expected != 0) { // XXX+ What if elapsed > TimeoutMs?
TimeoutMs -= elapsed;
return Expect(Expected, NULL, TimeoutMs);
}

View File

@ -1,5 +1,5 @@
/*
* $Id: common.h,v 1.1 2004/12/30 22:43:55 lordjaxom Exp $
* $Id: common.h,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
*/
#ifndef VDR_STREAMDEV_COMMON_H
@ -50,7 +50,7 @@ cChannel *ChannelFromString(char *String);
#define BUFOVERTIME 5000
#define BUFOVERCOUNT 100
#define STREAMDEVHOSTS AddDirectory(cPlugin::ConfigDirectory(), \
#define STREAMDEVHOSTS (const char*)AddDirectory(cPlugin::ConfigDirectory(), \
"streamdevhosts.conf")
#define POLLFAIL esyslog("Streamdev: Polling failed: %s", strerror(errno))

View File

@ -80,7 +80,7 @@ uchar *cTSRemux::Process(const uchar *Data, int &Count, int &Result) {
// Check for frame borders:
if (m_ResultCount >= MINVIDEODATA) {
if (m_ResultCount > 0) {
for (int i = 0; i < m_ResultCount; i++) {
if (m_ResultBuffer[i] == 0 && m_ResultBuffer[i + 1] == 0 && m_ResultBuffer[i + 2] == 1) {
switch (m_ResultBuffer[i + 3]) {

View File

@ -1,5 +1,5 @@
/*
* $Id: connectionHTTP.c,v 1.1 2004/12/30 22:44:19 lordjaxom Exp $
* $Id: connectionHTTP.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
*/
#include "server/connectionHTTP.h"
@ -99,7 +99,7 @@ void cConnectionHTTP::Flushed(void) {
else
line.Format("<li><a href=\"http://%s:%d/%s\">%s</a></li>",
(const char*)LocalIp(), StreamdevServerSetup.HTTPServerPort,
m_ListChannel->GetChannelID().ToString(), m_ListChannel->Name());
(const char*)m_ListChannel->GetChannelID().ToString(), m_ListChannel->Name());
if (!Respond(line))
DeferClose();
m_ListChannel = Channels.Next(m_ListChannel);

View File

@ -1,5 +1,5 @@
/*
* $Id: connectionVTP.c,v 1.1 2004/12/30 22:44:21 lordjaxom Exp $
* $Id: connectionVTP.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
*/
#include "server/connectionVTP.h"
@ -284,11 +284,11 @@ bool cConnectionVTP::CmdABRT(char *Opts) {
if (ep == Opts || (*ep != '\0' && *ep != ' '))
return Respond(500, "Use: ABRT Id");
time_t st = time_ms();
cTimeMs starttime;
if (id == siLive)
DELETENULL(m_LiveStreamer);
Dprintf("ABRT took %ld ms\n", time_ms() - st);
Dprintf("ABRT took %ld ms\n", starttime.Elapsed());
DELETENULL(m_DataSockets[id]);
return Respond(220, "Data connection closed");
}
@ -422,7 +422,7 @@ bool cConnectionVTP::CmdLSTT(char *Option) {
if (isnumber(Option)) {
cTimer *timer = Timers.Get(strtol(Option, NULL, 10) - 1);
if (timer)
Reply(250, "%d %s", timer->Index() + 1, timer->ToText(true));
Reply(250, "%d %s", timer->Index() + 1, (const char*)timer->ToText(true));
else
Reply(501, "Timer \"%s\" not defined", Option);
}
@ -433,7 +433,7 @@ bool cConnectionVTP::CmdLSTT(char *Option) {
for (int i = 0; i < Timers.Count(); i++) {
cTimer *timer = Timers.Get(i);
if (timer)
Reply(i < Timers.Count() - 1 ? -250 : 250, "%d %s", timer->Index() + 1, timer->ToText(true));
Reply(i < Timers.Count() - 1 ? -250 : 250, "%d %s", timer->Index() + 1, (const char*)timer->ToText(true));
else
Reply(501, "Timer \"%d\" not found", i + 1);
}
@ -478,7 +478,7 @@ bool cConnectionVTP::CmdMODT(char *Option) {
isyslog("timer %d modified (%s)", timer->Index() + 1,
timer->HasFlags(tfActive) ? "active" : "inactive");
#endif
Reply(250, "%d %s", timer->Index() + 1, timer->ToText(true));
Reply(250, "%d %s", timer->Index() + 1, (const char*)timer->ToText(true));
}
else
Reply(501, "Timer \"%d\" not defined", n);
@ -501,11 +501,11 @@ bool cConnectionVTP::CmdNEWT(char *Option) {
Timers.Add(timer);
Timers.Save();
isyslog("timer %d added", timer->Index() + 1);
Reply(250, "%d %s", timer->Index() + 1, timer->ToText(true));
Reply(250, "%d %s", timer->Index() + 1, (const char*)timer->ToText(true));
EXIT_WRAPPER();
}
else
Reply(550, "Timer already defined: %d %s", t->Index() + 1, t->ToText(true));
Reply(550, "Timer already defined: %d %s", t->Index() + 1, (const char*)t->ToText(true));
}
else
Reply(501, "Error in timer settings");

View File

@ -1,5 +1,5 @@
/*
* $Id: livefilter.c,v 1.1 2004/12/30 22:44:27 lordjaxom Exp $
* $Id: livefilter.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
*/
#include "server/livefilter.h"
@ -15,12 +15,8 @@ cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevLiveStreamer *Streamer) {
cStreamdevLiveFilter::~cStreamdevLiveFilter() {
}
void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data,
int Length) {
static time_t firsterr = 0;
static int errcnt = 0;
static bool showerr = true;
void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data, int Length)
{
uchar buffer[TS_SIZE];
int length = Length;
int pos = 0;
@ -37,27 +33,8 @@ void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data,
pos += chunk;
int p = m_Streamer->Put(buffer, TS_SIZE);
if (p != TS_SIZE) {
++errcnt;
if (showerr) {
if (firsterr == 0)
firsterr = time_ms();
else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) {
esyslog("ERROR: too many buffer overflows, logging stopped");
showerr = false;
firsterr = time_ms();
}
} else if (firsterr + BUFOVERTIME < time_ms()) {
showerr = true;
firsterr = 0;
errcnt = 0;
}
if (showerr)
esyslog("ERROR: ring buffer overflow (%d bytes dropped)", TS_SIZE - p);
else
firsterr = time_ms();
}
if (p != TS_SIZE)
m_Streamer->ReportOverflow(TS_SIZE - p);
}
}

View File

@ -5,55 +5,29 @@
#include "remux/ts2es.h"
#include "common.h"
cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer,
int Ca, int Priority,
int Pid1, int Pid2, int Pid3, int Pid4,
int Pid5, int Pid6, int Pid7, int Pid8,
int Pid9, int Pid10, int Pid11, int Pid12,
int Pid13, int Pid14, int Pid15, int Pid16):
cReceiver(Ca, Priority, 16,
Pid1, Pid2, Pid3, Pid4, Pid5, Pid6, Pid7, Pid8,
Pid9, Pid10, Pid11, Pid12, Pid13, Pid14, Pid15, Pid16) {
m_Streamer = Streamer;
cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca,
int Priority, const int *Pids):
cReceiver(Ca, Priority, 0, Pids),
m_Streamer(Streamer)
{
}
cStreamdevLiveReceiver::~cStreamdevLiveReceiver() {
cStreamdevLiveReceiver::~cStreamdevLiveReceiver()
{
Dprintf("Killing live receiver\n");
Detach();
}
void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) {
static time_t firsterr = 0;
static int errcnt = 0;
static bool showerr = true;
int p = m_Streamer->Put(Data, Length);
if (p != Length) {
++errcnt;
if (showerr) {
if (firsterr == 0)
firsterr = time_ms();
else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) {
esyslog("ERROR: too many buffer overflows, logging stopped");
showerr = false;
firsterr = time_ms();
}
} else if (firsterr + BUFOVERTIME < time_ms()) {
showerr = true;
firsterr = 0;
errcnt = 0;
}
if (showerr)
esyslog("ERROR: ring buffer overflow (%d bytes dropped)", Length - p);
else
firsterr = time_ms();
}
if (p != Length)
m_Streamer->ReportOverflow(Length - p);
}
cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority):
cStreamdevStreamer("Live streamer") {
m_Priority = Priority;
m_NumPids = 0;
m_Channel = NULL;
m_Device = NULL;
m_Receiver = NULL;
@ -91,40 +65,33 @@ void cStreamdevLiveStreamer::Start(cTBSocket *Socket) {
bool cStreamdevLiveStreamer::SetPid(int Pid, bool On) {
int idx;
bool haspids = false;
if (On) {
for (idx = 0; idx < MAXRECEIVEPIDS; ++idx) {
for (idx = 0; idx < m_NumPids; ++idx) {
if (m_Pids[idx] == Pid)
return true; // No change needed
else if (m_Pids[idx] == 0) {
m_Pids[idx] = Pid;
haspids = true;
break;
}
}
if (idx == MAXRECEIVEPIDS) {
if (m_NumPids == MAXRECEIVEPIDS) {
esyslog("ERROR: Streamdev: No free slot to receive pid %d\n", Pid);
return false;
}
m_Pids[m_NumPids++] = Pid;
m_Pids[m_NumPids] = 0;
} else {
for (idx = 0; idx < MAXRECEIVEPIDS; ++idx) {
if (m_Pids[idx] == Pid)
m_Pids[idx] = 0;
else if (m_Pids[idx] != 0)
haspids = true;
for (idx = 0; idx < m_NumPids; ++idx) {
if (m_Pids[idx] == Pid) {
--m_NumPids;
memmove(&m_Pids[idx], &m_Pids[idx + 1], sizeof(int) * (m_NumPids - idx));
}
}
}
DELETENULL(m_Receiver);
if (haspids) {
if (m_NumPids > 0) {
Dprintf("Creating Receiver to respect changed pids\n");
m_Receiver = new cStreamdevLiveReceiver(this, m_Channel->Ca(), m_Priority,
m_Pids[0], m_Pids[1], m_Pids[2], m_Pids[3],
m_Pids[4], m_Pids[5], m_Pids[6], m_Pids[7],
m_Pids[8], m_Pids[9], m_Pids[10], m_Pids[11],
m_Pids[12], m_Pids[13], m_Pids[14], m_Pids[15]);
m_Receiver = new cStreamdevLiveReceiver(this, m_Channel->Ca(), m_Priority, m_Pids);
if (m_Device != NULL) {
Dprintf("Attaching new receiver\n");
m_Device->AttachReceiver(m_Receiver);
@ -141,32 +108,32 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, int StreamType,
switch (StreamType) {
case stES:
{
int pid = ISRADIO(Channel) ? Channel->Apid1() : Channel->Vpid();
int pid = ISRADIO(Channel) ? Channel->Apid(0) : Channel->Vpid();
m_Remux = new cTS2ESRemux(pid);
return SetPid(pid, true);
}
case stPES:
m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid1(),
Channel->Apid2(), Channel->Dpid1(), 0, false);
m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid(0), Channel->Apid(1),
Channel->Dpid(0), 0, false);
return SetPid(Channel->Vpid(), true)
&& SetPid(Channel->Apid1(), true)
&& SetPid(Channel->Apid2(), true)
&& SetPid(Channel->Dpid1(), true);
&& SetPid(Channel->Apid(0), true)
&& SetPid(Channel->Apid(1), true)
&& SetPid(Channel->Dpid(0), true);
break;
case stPS:
m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid1(), 0, 0, 0, true);
m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid(0), 0, 0, 0, true);
return SetPid(Channel->Vpid(), true)
&& SetPid(Channel->Apid1(), true);
&& SetPid(Channel->Apid(0), true);
break;
case stTS:
if (!StreamPIDS) {
return SetPid(Channel->Vpid(), true)
&& SetPid(Channel->Apid1(), true)
&& SetPid(Channel->Apid2(), true)
&& SetPid(Channel->Dpid1(), true);
&& SetPid(Channel->Apid(0), true)
&& SetPid(Channel->Apid(1), true)
&& SetPid(Channel->Dpid(0), true);
}
Dprintf("pid streaming mode\n");
return true;

View File

@ -24,18 +24,16 @@ protected:
virtual void Receive(uchar *Data, int Length);
public:
cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Priority, int Ca,
int Pid1 = 0, int Pid2 = 0, int Pid3 = 0, int Pid4 = 0,
int Pid5 = 0, int Pid6 = 0, int Pid7 = 0, int Pid8 = 0,
int Pid9 = 0, int Pid10 = 0, int Pid11 = 0, int Pid12 = 0,
int Pid13 = 0, int Pid14 = 0, int Pid15 = 0, int Pid16 = 0);
cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca, int Priority,
const int *Pids);
virtual ~cStreamdevLiveReceiver();
};
class cStreamdevLiveStreamer: public cStreamdevStreamer {
private:
int m_Priority;
int m_Pids[MAXRECEIVEPIDS];
int m_Pids[MAXRECEIVEPIDS + 1];
int m_NumPids;
const cChannel *m_Channel;
cDevice *m_Device;
cStreamdevLiveReceiver *m_Receiver;

View File

@ -1,5 +1,5 @@
/*
* $Id: streamer.c,v 1.1 2004/12/30 22:44:21 lordjaxom Exp $
* $Id: streamer.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
*/
#include <vdr/ringbuffer.h>
@ -13,7 +13,7 @@
#include "tools/socket.h"
#include "common.h"
#define VIDEOBUFSIZE MEGABYTE(3)
#define VIDEOBUFSIZE MEGABYTE(4)
#define MAXBLOCKSIZE TS_SIZE*10
cStreamdevStreamer::cStreamdevStreamer(const char *Name)

View File

@ -1,5 +1,5 @@
/*
* $Id: streamer.h,v 1.1 2004/12/30 22:44:21 lordjaxom Exp $
* $Id: streamer.h,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
*/
#ifndef VDR_STREAMDEV_STREAMER_H
@ -34,6 +34,7 @@ public:
virtual void Stop(void);
int Put(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); }
void ReportOverflow(int Bytes) { m_RingBuffer->ReportOverflow(Bytes); }
virtual void Detach(void) = 0;
virtual void Attach(void) = 0;

View File

@ -16,7 +16,6 @@ cTBSelect::~cTBSelect() {
int cTBSelect::Select(uint TimeoutMs) {
struct timeval tv;
time_t st, et;
ssize_t res;
int ms;
@ -26,15 +25,13 @@ int cTBSelect::Select(uint TimeoutMs) {
if (TimeoutMs == 0)
return ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL, &tv);
st = time_ms();
cTimeMs starttime;
ms = TimeoutMs;
while (ms > 0 && (res = ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL,
&tv)) == -1 && errno == EINTR) {
et = time_ms();
ms -= et - st;
ms = TimeoutMs - starttime.Elapsed();
tv.tv_usec = (ms % 1000) * 1000;
tv.tv_sec = ms / 1000;
st = et;
}
if (ms <= 0) {
errno = ETIMEDOUT;

View File

@ -56,10 +56,9 @@ ssize_t cTBSource::Write(const void *Buffer, size_t Length) {
bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) {
cTBSelect sel;
time_t st;
int ms, offs;
st = time_ms();
cTimeMs starttime;
ms = TimeoutMs;
offs = 0;
while (Length > 0) {
@ -77,7 +76,7 @@ bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) {
Length -= b;
}
ms -= time_ms() - st;
ms = TimeoutMs - starttime.Elapsed();
if (ms <= 0) {
errno = ETIMEDOUT;
return false;
@ -89,7 +88,6 @@ bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) {
ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq,
uint TimeoutMs) {
char *offs;
time_t st;
int seqlen, ms;
size_t olen;
cTBSelect sel;
@ -108,7 +106,7 @@ ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq,
return olen;
}
st = time_ms();
cTimeMs starttime;
ms = TimeoutMs;
while (m_LineBuffer.Length() < BUFSIZ) {
int b;
@ -142,7 +140,7 @@ ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq,
}
}
ms -= time_ms() - st;
ms = TimeoutMs - starttime.Elapsed();
if (ms <= 0) {
errno = ETIMEDOUT;
return -1;