From 8e4556b0a982dab75d8ba3bb388965ecf47fa0b0 Mon Sep 17 00:00:00 2001 From: lordjaxom Date: Tue, 8 Feb 2005 13:59:16 +0000 Subject: [PATCH] - first adoptions (transfer-commit) --- client/filter.c | 29 ++---------- client/remote.c | 4 +- client/socket.c | 14 +++--- common.h | 4 +- remux/tsremux.c | 2 +- server/connectionHTTP.c | 4 +- server/connectionVTP.c | 16 +++---- server/livefilter.c | 33 +++----------- server/livestreamer.c | 99 ++++++++++++++--------------------------- server/livestreamer.h | 10 ++--- server/streamer.c | 4 +- server/streamer.h | 3 +- tools/select.c | 7 +-- tools/source.c | 10 ++--- 14 files changed, 76 insertions(+), 163 deletions(-) diff --git a/client/filter.c b/client/filter.c index dad86f3..265276f 100644 --- a/client/filter.c +++ b/client/filter.c @@ -1,5 +1,5 @@ /* - * $Id: filter.c,v 1.1 2004/12/30 22:44:04 lordjaxom Exp $ + * $Id: filter.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ */ #include "client/filter.h" @@ -82,32 +82,9 @@ cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) { } void cStreamdevFilters::Put(const uchar *Data) { - static time_t firsterr = 0; - static int errcnt = 0; - static bool showerr = true; - int p = m_RingBuffer->Put(Data, TS_SIZE); - if (p != TS_SIZE) { - ++errcnt; - if (showerr) { - if (firsterr == 0) - firsterr = time_ms(); - else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) { - esyslog("ERROR: too many buffer overflows, logging stopped"); - showerr = false; - firsterr = time_ms(); - } - } else if (firsterr + BUFOVERTIME < time_ms()) { - showerr = true; - firsterr = 0; - errcnt = 0; - } - - if (showerr) - esyslog("ERROR: ring buffer overflow (%d bytes dropped)", TS_SIZE - p); - else - firsterr = time_ms(); - } + if (p != TS_SIZE) + m_RingBuffer->ReportOverflow(TS_SIZE - p); } void cStreamdevFilters::Action(void) { diff --git a/client/remote.c b/client/remote.c index 4cc7cfe..8463709 100644 --- a/client/remote.c +++ b/client/remote.c @@ -1,5 +1,5 @@ /* - * $Id: remote.c,v 1.1 2004/12/30 22:44:02 lordjaxom Exp $ + * $Id: remote.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ */ #include "client/remote.h" @@ -457,7 +457,7 @@ const char *cRemoteTimer::ToText(void) { summary = strreplace(strdup(m_Summary), ':', '|'); asprintf(&m_Buffer, "%d:%s:%s:%04d:%04d:%d:%d:%s:%s", m_Active, - Channel()->GetChannelID().ToString(), PrintDay(m_Day, m_FirstDay), + (const char*)Channel()->GetChannelID().ToString(), PrintDay(m_Day, m_FirstDay), m_Start, m_Stop, m_Priority, m_Lifetime, m_File, summary ? summary : ""); if (summary != NULL) diff --git a/client/socket.c b/client/socket.c index 44af8db..9e399cb 100644 --- a/client/socket.c +++ b/client/socket.c @@ -1,5 +1,5 @@ /* - * $Id: socket.c,v 1.1 2004/12/30 22:44:04 lordjaxom Exp $ + * $Id: socket.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ */ #include @@ -39,15 +39,12 @@ cTBSocket *cClientSocket::DataSocket(eSocketId Id) const { bool cClientSocket::Command(const cTBString &Command, uint Expected, uint TimeoutMs) { - cTBString pkt; - time_t st; - errno = 0; - pkt = Command + "\015\012"; + cTBString pkt = Command + "\015\012"; Dprintf("OUT: |%s|\n", (const char*)Command); - st = time_ms(); + cTimeMs starttime; if (!TimedWrite((const char*)pkt, pkt.Length(), TimeoutMs)) { esyslog("Streamdev: Lost connection to %s:%d: %s", (const char*)RemoteIp(), RemotePort(), strerror(errno)); @@ -55,8 +52,9 @@ bool cClientSocket::Command(const cTBString &Command, uint Expected, return false; } - if (Expected != 0) { - TimeoutMs -= time_ms() - st; + uint64 elapsed = starttime.Elapsed(); + if (Expected != 0) { // XXX+ What if elapsed > TimeoutMs? + TimeoutMs -= elapsed; return Expect(Expected, NULL, TimeoutMs); } diff --git a/common.h b/common.h index 252a655..9f4afdd 100644 --- a/common.h +++ b/common.h @@ -1,5 +1,5 @@ /* - * $Id: common.h,v 1.1 2004/12/30 22:43:55 lordjaxom Exp $ + * $Id: common.h,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ */ #ifndef VDR_STREAMDEV_COMMON_H @@ -50,7 +50,7 @@ cChannel *ChannelFromString(char *String); #define BUFOVERTIME 5000 #define BUFOVERCOUNT 100 -#define STREAMDEVHOSTS AddDirectory(cPlugin::ConfigDirectory(), \ +#define STREAMDEVHOSTS (const char*)AddDirectory(cPlugin::ConfigDirectory(), \ "streamdevhosts.conf") #define POLLFAIL esyslog("Streamdev: Polling failed: %s", strerror(errno)) diff --git a/remux/tsremux.c b/remux/tsremux.c index 93f513b..665c8ff 100644 --- a/remux/tsremux.c +++ b/remux/tsremux.c @@ -80,7 +80,7 @@ uchar *cTSRemux::Process(const uchar *Data, int &Count, int &Result) { // Check for frame borders: - if (m_ResultCount >= MINVIDEODATA) { + if (m_ResultCount > 0) { for (int i = 0; i < m_ResultCount; i++) { if (m_ResultBuffer[i] == 0 && m_ResultBuffer[i + 1] == 0 && m_ResultBuffer[i + 2] == 1) { switch (m_ResultBuffer[i + 3]) { diff --git a/server/connectionHTTP.c b/server/connectionHTTP.c index 7d20f80..0d3a039 100644 --- a/server/connectionHTTP.c +++ b/server/connectionHTTP.c @@ -1,5 +1,5 @@ /* - * $Id: connectionHTTP.c,v 1.1 2004/12/30 22:44:19 lordjaxom Exp $ + * $Id: connectionHTTP.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ */ #include "server/connectionHTTP.h" @@ -99,7 +99,7 @@ void cConnectionHTTP::Flushed(void) { else line.Format("
  • %s
  • ", (const char*)LocalIp(), StreamdevServerSetup.HTTPServerPort, - m_ListChannel->GetChannelID().ToString(), m_ListChannel->Name()); + (const char*)m_ListChannel->GetChannelID().ToString(), m_ListChannel->Name()); if (!Respond(line)) DeferClose(); m_ListChannel = Channels.Next(m_ListChannel); diff --git a/server/connectionVTP.c b/server/connectionVTP.c index 2177905..092bd26 100644 --- a/server/connectionVTP.c +++ b/server/connectionVTP.c @@ -1,5 +1,5 @@ /* - * $Id: connectionVTP.c,v 1.1 2004/12/30 22:44:21 lordjaxom Exp $ + * $Id: connectionVTP.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ */ #include "server/connectionVTP.h" @@ -284,11 +284,11 @@ bool cConnectionVTP::CmdABRT(char *Opts) { if (ep == Opts || (*ep != '\0' && *ep != ' ')) return Respond(500, "Use: ABRT Id"); - time_t st = time_ms(); + cTimeMs starttime; if (id == siLive) DELETENULL(m_LiveStreamer); - Dprintf("ABRT took %ld ms\n", time_ms() - st); + Dprintf("ABRT took %ld ms\n", starttime.Elapsed()); DELETENULL(m_DataSockets[id]); return Respond(220, "Data connection closed"); } @@ -422,7 +422,7 @@ bool cConnectionVTP::CmdLSTT(char *Option) { if (isnumber(Option)) { cTimer *timer = Timers.Get(strtol(Option, NULL, 10) - 1); if (timer) - Reply(250, "%d %s", timer->Index() + 1, timer->ToText(true)); + Reply(250, "%d %s", timer->Index() + 1, (const char*)timer->ToText(true)); else Reply(501, "Timer \"%s\" not defined", Option); } @@ -433,7 +433,7 @@ bool cConnectionVTP::CmdLSTT(char *Option) { for (int i = 0; i < Timers.Count(); i++) { cTimer *timer = Timers.Get(i); if (timer) - Reply(i < Timers.Count() - 1 ? -250 : 250, "%d %s", timer->Index() + 1, timer->ToText(true)); + Reply(i < Timers.Count() - 1 ? -250 : 250, "%d %s", timer->Index() + 1, (const char*)timer->ToText(true)); else Reply(501, "Timer \"%d\" not found", i + 1); } @@ -478,7 +478,7 @@ bool cConnectionVTP::CmdMODT(char *Option) { isyslog("timer %d modified (%s)", timer->Index() + 1, timer->HasFlags(tfActive) ? "active" : "inactive"); #endif - Reply(250, "%d %s", timer->Index() + 1, timer->ToText(true)); + Reply(250, "%d %s", timer->Index() + 1, (const char*)timer->ToText(true)); } else Reply(501, "Timer \"%d\" not defined", n); @@ -501,11 +501,11 @@ bool cConnectionVTP::CmdNEWT(char *Option) { Timers.Add(timer); Timers.Save(); isyslog("timer %d added", timer->Index() + 1); - Reply(250, "%d %s", timer->Index() + 1, timer->ToText(true)); + Reply(250, "%d %s", timer->Index() + 1, (const char*)timer->ToText(true)); EXIT_WRAPPER(); } else - Reply(550, "Timer already defined: %d %s", t->Index() + 1, t->ToText(true)); + Reply(550, "Timer already defined: %d %s", t->Index() + 1, (const char*)t->ToText(true)); } else Reply(501, "Error in timer settings"); diff --git a/server/livefilter.c b/server/livefilter.c index 14f20fa..4524a88 100644 --- a/server/livefilter.c +++ b/server/livefilter.c @@ -1,5 +1,5 @@ /* - * $Id: livefilter.c,v 1.1 2004/12/30 22:44:27 lordjaxom Exp $ + * $Id: livefilter.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ */ #include "server/livefilter.h" @@ -15,12 +15,8 @@ cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevLiveStreamer *Streamer) { cStreamdevLiveFilter::~cStreamdevLiveFilter() { } -void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data, - int Length) { - static time_t firsterr = 0; - static int errcnt = 0; - static bool showerr = true; - +void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data, int Length) +{ uchar buffer[TS_SIZE]; int length = Length; int pos = 0; @@ -37,27 +33,8 @@ void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data, pos += chunk; int p = m_Streamer->Put(buffer, TS_SIZE); - if (p != TS_SIZE) { - ++errcnt; - if (showerr) { - if (firsterr == 0) - firsterr = time_ms(); - else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) { - esyslog("ERROR: too many buffer overflows, logging stopped"); - showerr = false; - firsterr = time_ms(); - } - } else if (firsterr + BUFOVERTIME < time_ms()) { - showerr = true; - firsterr = 0; - errcnt = 0; - } - - if (showerr) - esyslog("ERROR: ring buffer overflow (%d bytes dropped)", TS_SIZE - p); - else - firsterr = time_ms(); - } + if (p != TS_SIZE) + m_Streamer->ReportOverflow(TS_SIZE - p); } } diff --git a/server/livestreamer.c b/server/livestreamer.c index 4204ea3..7848d3f 100644 --- a/server/livestreamer.c +++ b/server/livestreamer.c @@ -5,55 +5,29 @@ #include "remux/ts2es.h" #include "common.h" -cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, - int Ca, int Priority, - int Pid1, int Pid2, int Pid3, int Pid4, - int Pid5, int Pid6, int Pid7, int Pid8, - int Pid9, int Pid10, int Pid11, int Pid12, - int Pid13, int Pid14, int Pid15, int Pid16): - cReceiver(Ca, Priority, 16, - Pid1, Pid2, Pid3, Pid4, Pid5, Pid6, Pid7, Pid8, - Pid9, Pid10, Pid11, Pid12, Pid13, Pid14, Pid15, Pid16) { - m_Streamer = Streamer; +cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca, + int Priority, const int *Pids): + cReceiver(Ca, Priority, 0, Pids), + m_Streamer(Streamer) +{ } -cStreamdevLiveReceiver::~cStreamdevLiveReceiver() { +cStreamdevLiveReceiver::~cStreamdevLiveReceiver() +{ Dprintf("Killing live receiver\n"); Detach(); } void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) { - static time_t firsterr = 0; - static int errcnt = 0; - static bool showerr = true; - int p = m_Streamer->Put(Data, Length); - if (p != Length) { - ++errcnt; - if (showerr) { - if (firsterr == 0) - firsterr = time_ms(); - else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) { - esyslog("ERROR: too many buffer overflows, logging stopped"); - showerr = false; - firsterr = time_ms(); - } - } else if (firsterr + BUFOVERTIME < time_ms()) { - showerr = true; - firsterr = 0; - errcnt = 0; - } - - if (showerr) - esyslog("ERROR: ring buffer overflow (%d bytes dropped)", Length - p); - else - firsterr = time_ms(); - } + if (p != Length) + m_Streamer->ReportOverflow(Length - p); } cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority): cStreamdevStreamer("Live streamer") { m_Priority = Priority; + m_NumPids = 0; m_Channel = NULL; m_Device = NULL; m_Receiver = NULL; @@ -91,40 +65,33 @@ void cStreamdevLiveStreamer::Start(cTBSocket *Socket) { bool cStreamdevLiveStreamer::SetPid(int Pid, bool On) { int idx; - bool haspids = false; if (On) { - for (idx = 0; idx < MAXRECEIVEPIDS; ++idx) { + for (idx = 0; idx < m_NumPids; ++idx) { if (m_Pids[idx] == Pid) return true; // No change needed - else if (m_Pids[idx] == 0) { - m_Pids[idx] = Pid; - haspids = true; - break; - } } - if (idx == MAXRECEIVEPIDS) { + if (m_NumPids == MAXRECEIVEPIDS) { esyslog("ERROR: Streamdev: No free slot to receive pid %d\n", Pid); return false; } + + m_Pids[m_NumPids++] = Pid; + m_Pids[m_NumPids] = 0; } else { - for (idx = 0; idx < MAXRECEIVEPIDS; ++idx) { - if (m_Pids[idx] == Pid) - m_Pids[idx] = 0; - else if (m_Pids[idx] != 0) - haspids = true; + for (idx = 0; idx < m_NumPids; ++idx) { + if (m_Pids[idx] == Pid) { + --m_NumPids; + memmove(&m_Pids[idx], &m_Pids[idx + 1], sizeof(int) * (m_NumPids - idx)); + } } } DELETENULL(m_Receiver); - if (haspids) { + if (m_NumPids > 0) { Dprintf("Creating Receiver to respect changed pids\n"); - m_Receiver = new cStreamdevLiveReceiver(this, m_Channel->Ca(), m_Priority, - m_Pids[0], m_Pids[1], m_Pids[2], m_Pids[3], - m_Pids[4], m_Pids[5], m_Pids[6], m_Pids[7], - m_Pids[8], m_Pids[9], m_Pids[10], m_Pids[11], - m_Pids[12], m_Pids[13], m_Pids[14], m_Pids[15]); + m_Receiver = new cStreamdevLiveReceiver(this, m_Channel->Ca(), m_Priority, m_Pids); if (m_Device != NULL) { Dprintf("Attaching new receiver\n"); m_Device->AttachReceiver(m_Receiver); @@ -141,32 +108,32 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, int StreamType, switch (StreamType) { case stES: { - int pid = ISRADIO(Channel) ? Channel->Apid1() : Channel->Vpid(); + int pid = ISRADIO(Channel) ? Channel->Apid(0) : Channel->Vpid(); m_Remux = new cTS2ESRemux(pid); return SetPid(pid, true); } case stPES: - m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid1(), - Channel->Apid2(), Channel->Dpid1(), 0, false); + m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid(0), Channel->Apid(1), + Channel->Dpid(0), 0, false); return SetPid(Channel->Vpid(), true) - && SetPid(Channel->Apid1(), true) - && SetPid(Channel->Apid2(), true) - && SetPid(Channel->Dpid1(), true); + && SetPid(Channel->Apid(0), true) + && SetPid(Channel->Apid(1), true) + && SetPid(Channel->Dpid(0), true); break; case stPS: - m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid1(), 0, 0, 0, true); + m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid(0), 0, 0, 0, true); return SetPid(Channel->Vpid(), true) - && SetPid(Channel->Apid1(), true); + && SetPid(Channel->Apid(0), true); break; case stTS: if (!StreamPIDS) { return SetPid(Channel->Vpid(), true) - && SetPid(Channel->Apid1(), true) - && SetPid(Channel->Apid2(), true) - && SetPid(Channel->Dpid1(), true); + && SetPid(Channel->Apid(0), true) + && SetPid(Channel->Apid(1), true) + && SetPid(Channel->Dpid(0), true); } Dprintf("pid streaming mode\n"); return true; diff --git a/server/livestreamer.h b/server/livestreamer.h index 7682206..62137d1 100644 --- a/server/livestreamer.h +++ b/server/livestreamer.h @@ -24,18 +24,16 @@ protected: virtual void Receive(uchar *Data, int Length); public: - cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Priority, int Ca, - int Pid1 = 0, int Pid2 = 0, int Pid3 = 0, int Pid4 = 0, - int Pid5 = 0, int Pid6 = 0, int Pid7 = 0, int Pid8 = 0, - int Pid9 = 0, int Pid10 = 0, int Pid11 = 0, int Pid12 = 0, - int Pid13 = 0, int Pid14 = 0, int Pid15 = 0, int Pid16 = 0); + cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca, int Priority, + const int *Pids); virtual ~cStreamdevLiveReceiver(); }; class cStreamdevLiveStreamer: public cStreamdevStreamer { private: int m_Priority; - int m_Pids[MAXRECEIVEPIDS]; + int m_Pids[MAXRECEIVEPIDS + 1]; + int m_NumPids; const cChannel *m_Channel; cDevice *m_Device; cStreamdevLiveReceiver *m_Receiver; diff --git a/server/streamer.c b/server/streamer.c index 2ffd1cd..d26f518 100644 --- a/server/streamer.c +++ b/server/streamer.c @@ -1,5 +1,5 @@ /* - * $Id: streamer.c,v 1.1 2004/12/30 22:44:21 lordjaxom Exp $ + * $Id: streamer.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ */ #include @@ -13,7 +13,7 @@ #include "tools/socket.h" #include "common.h" -#define VIDEOBUFSIZE MEGABYTE(3) +#define VIDEOBUFSIZE MEGABYTE(4) #define MAXBLOCKSIZE TS_SIZE*10 cStreamdevStreamer::cStreamdevStreamer(const char *Name) diff --git a/server/streamer.h b/server/streamer.h index 1bfcc47..0f374b5 100644 --- a/server/streamer.h +++ b/server/streamer.h @@ -1,5 +1,5 @@ /* - * $Id: streamer.h,v 1.1 2004/12/30 22:44:21 lordjaxom Exp $ + * $Id: streamer.h,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ */ #ifndef VDR_STREAMDEV_STREAMER_H @@ -34,6 +34,7 @@ public: virtual void Stop(void); int Put(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); } + void ReportOverflow(int Bytes) { m_RingBuffer->ReportOverflow(Bytes); } virtual void Detach(void) = 0; virtual void Attach(void) = 0; diff --git a/tools/select.c b/tools/select.c index 77229c8..0ab5f9b 100644 --- a/tools/select.c +++ b/tools/select.c @@ -16,7 +16,6 @@ cTBSelect::~cTBSelect() { int cTBSelect::Select(uint TimeoutMs) { struct timeval tv; - time_t st, et; ssize_t res; int ms; @@ -26,15 +25,13 @@ int cTBSelect::Select(uint TimeoutMs) { if (TimeoutMs == 0) return ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL, &tv); - st = time_ms(); + cTimeMs starttime; ms = TimeoutMs; while (ms > 0 && (res = ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL, &tv)) == -1 && errno == EINTR) { - et = time_ms(); - ms -= et - st; + ms = TimeoutMs - starttime.Elapsed(); tv.tv_usec = (ms % 1000) * 1000; tv.tv_sec = ms / 1000; - st = et; } if (ms <= 0) { errno = ETIMEDOUT; diff --git a/tools/source.c b/tools/source.c index 91c36c8..e882583 100644 --- a/tools/source.c +++ b/tools/source.c @@ -56,10 +56,9 @@ ssize_t cTBSource::Write(const void *Buffer, size_t Length) { bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) { cTBSelect sel; - time_t st; int ms, offs; - st = time_ms(); + cTimeMs starttime; ms = TimeoutMs; offs = 0; while (Length > 0) { @@ -77,7 +76,7 @@ bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) { Length -= b; } - ms -= time_ms() - st; + ms = TimeoutMs - starttime.Elapsed(); if (ms <= 0) { errno = ETIMEDOUT; return false; @@ -89,7 +88,6 @@ bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) { ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq, uint TimeoutMs) { char *offs; - time_t st; int seqlen, ms; size_t olen; cTBSelect sel; @@ -108,7 +106,7 @@ ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq, return olen; } - st = time_ms(); + cTimeMs starttime; ms = TimeoutMs; while (m_LineBuffer.Length() < BUFSIZ) { int b; @@ -142,7 +140,7 @@ ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq, } } - ms -= time_ms() - st; + ms = TimeoutMs - starttime.Elapsed(); if (ms <= 0) { errno = ETIMEDOUT; return -1;