From 520adaf3da652f8b29de9882ac518470a5592842 Mon Sep 17 00:00:00 2001 From: Frank Schmirler Date: Sat, 30 Aug 2014 23:58:11 +0200 Subject: [PATCH] Implemented remuxing when replaying recordings --- remux/extern.c | 79 +++++++++++++++++++++++++++++++---------- remux/extern.h | 2 ++ server/connectionHTTP.c | 61 +++++++++++++++++++++++++------ server/recplayer.c | 7 ++++ server/recplayer.h | 3 ++ server/recstreamer.c | 32 ++++++++++++++++- server/recstreamer.h | 3 +- 7 files changed, 156 insertions(+), 31 deletions(-) diff --git a/remux/extern.c b/remux/extern.c index 3ac68bb..c3e6ad6 100644 --- a/remux/extern.c +++ b/remux/extern.c @@ -3,6 +3,7 @@ #include "server/connection.h" #include "server/streamer.h" #include +#include #include #include #include @@ -25,7 +26,7 @@ protected: virtual void Action(void); public: - cTSExt(cRingBufferLinear *ResultBuffer, const cServerConnection *Connection, const cChannel *Channel, const int *Apids, const int *Dpids); + cTSExt(cRingBufferLinear *ResultBuffer, const cServerConnection *Connection, const cChannel *Channel, const cPatPmtParser *PatPmt, const int *Apids, const int *Dpids); virtual ~cTSExt(); void Put(const uchar *Data, int Count); @@ -34,7 +35,7 @@ public: } // namespace Streamdev using namespace Streamdev; -cTSExt::cTSExt(cRingBufferLinear *ResultBuffer, const cServerConnection *Connection, const cChannel *Channel, const int *Apids, const int *Dpids): +cTSExt::cTSExt(cRingBufferLinear *ResultBuffer, const cServerConnection *Connection, const cChannel *Channel, const cPatPmtParser *PatPmt, const int *Apids, const int *Dpids): m_ResultBuffer(ResultBuffer), m_Active(false), m_Process(-1), @@ -73,15 +74,24 @@ cTSExt::cTSExt(cRingBufferLinear *ResultBuffer, const cServerConnection *Connect #define ADDENV(x...) if (asprintf(&env[i++], x) < 0) i-- // add channel ID, name and pids to environment - ADDENV("REMUX_CHANNEL_ID=%s", *Channel->GetChannelID().ToString()); - ADDENV("REMUX_CHANNEL_NAME=%s", Channel->Name()); - ADDENV("REMUX_VTYPE=%d", Channel->Vtype()); - if (Channel->Vpid()) - ADDENV("REMUX_VPID=%d", Channel->Vpid()); - if (Channel->Ppid() != Channel->Vpid()) - ADDENV("REMUX_PPID=%d", Channel->Ppid()); - if (Channel->Tpid()) - ADDENV("REMUX_TPID=%d", Channel->Tpid()); + if (Channel) { + ADDENV("REMUX_CHANNEL_ID=%s", *Channel->GetChannelID().ToString()); + ADDENV("REMUX_CHANNEL_NAME=%s", Channel->Name()); + ADDENV("REMUX_VTYPE=%d", Channel->Vtype()); + if (Channel->Vpid()) + ADDENV("REMUX_VPID=%d", Channel->Vpid()); + if (Channel->Ppid() != Channel->Vpid()) + ADDENV("REMUX_PPID=%d", Channel->Ppid()); + if (Channel->Tpid()) + ADDENV("REMUX_TPID=%d", Channel->Tpid()); + } + else if (PatPmt) { + ADDENV("REMUX_VTYPE=%d", PatPmt->Vtype()); + if (PatPmt->Vpid()) + ADDENV("REMUX_VPID=%d", PatPmt->Vpid()); + if (PatPmt->Ppid() != PatPmt->Vpid()) + ADDENV("REMUX_PPID=%d", PatPmt->Ppid()); + } std::string buffer; if (Apids && *Apids) { @@ -92,9 +102,16 @@ cTSExt::cTSExt(cRingBufferLinear *ResultBuffer, const cServerConnection *Connect buffer.clear(); for (const int *pid = Apids; *pid; pid++) { int j; - for (j = 0; Channel->Apid(j) && Channel->Apid(j) != *pid; j++) - ; - (buffer += Channel->Alang(j)) += (*(pid + 1) ? " " : ""); + if (Channel) { + for (j = 0; Channel->Apid(j) && Channel->Apid(j) != *pid; j++) + ; + (buffer += Channel->Alang(j)) += (*(pid + 1) ? " " : ""); + } + else if (PatPmt) { + for (j = 0; PatPmt->Apid(j) && PatPmt->Apid(j) != *pid; j++) + ; + (buffer += PatPmt->Alang(j)) += (*(pid + 1) ? " " : ""); + } } ADDENV("REMUX_ALANG=%s", buffer.c_str()); } @@ -108,14 +125,21 @@ cTSExt::cTSExt(cRingBufferLinear *ResultBuffer, const cServerConnection *Connect buffer.clear(); for (const int *pid = Dpids; *pid; pid++) { int j; - for (j = 0; Channel->Dpid(j) && Channel->Dpid(j) != *pid; j++) - ; - (buffer += Channel->Dlang(j)) += (*(pid + 1) ? " " : ""); + if (Channel) { + for (j = 0; Channel->Dpid(j) && Channel->Dpid(j) != *pid; j++) + ; + (buffer += Channel->Dlang(j)) += (*(pid + 1) ? " " : ""); + } + else if (PatPmt) { + for (j = 0; PatPmt->Dpid(j) && PatPmt->Dpid(j) != *pid; j++) + ; + (buffer += PatPmt->Dlang(j)) += (*(pid + 1) ? " " : ""); + } } ADDENV("REMUX_DLANG=%s", buffer.c_str()); } - if (Channel->Spid(0)) { + if (Channel && Channel->Spid(0)) { buffer.clear(); for (const int *pid = Channel->Spids(); *pid; pid++) (buffer += (const char *) itoa(*pid)) += (*(pid + 1) ? " " : ""); @@ -126,6 +150,17 @@ cTSExt::cTSExt(cRingBufferLinear *ResultBuffer, const cServerConnection *Connect (buffer += Channel->Slang(j)) += (Channel->Spid(j + 1) ? " " : ""); ADDENV("REMUX_SLANG=%s", buffer.c_str()); } + else if (PatPmt && PatPmt->Spid(0)) { + buffer.clear(); + for (const int *pid = PatPmt->Spids(); *pid; pid++) + (buffer += (const char *) itoa(*pid)) += (*(pid + 1) ? " " : ""); + ADDENV("REMUX_SPID=%s", buffer.c_str()); + + buffer.clear(); + for (int j = 0; PatPmt->Spid(j); j++) + (buffer += PatPmt->Slang(j)) += (PatPmt->Spid(j + 1) ? " " : ""); + ADDENV("REMUX_SLANG=%s", buffer.c_str()); + } if (Connection) { // add vars for a CGI like interface @@ -296,7 +331,13 @@ void cTSExt::Put(const uchar *Data, int Count) cExternRemux::cExternRemux(const cServerConnection *Connection, const cChannel *Channel, const int *Apids, const int *Dpids): m_ResultBuffer(new cRingBufferLinear(WRITERBUFSIZE)), - m_Remux(new cTSExt(m_ResultBuffer, Connection, Channel, Apids, Dpids)) + m_Remux(new cTSExt(m_ResultBuffer, Connection, Channel, NULL, Apids, Dpids)) +{ + m_ResultBuffer->SetTimeouts(500, 100); +} +cExternRemux::cExternRemux(const cServerConnection *Connection, const cPatPmtParser *PatPmt, const int *Apids, const int *Dpids): + m_ResultBuffer(new cRingBufferLinear(WRITERBUFSIZE)), + m_Remux(new cTSExt(m_ResultBuffer, Connection, NULL, PatPmt, Apids, Dpids)) { m_ResultBuffer->SetTimeouts(500, 100); } diff --git a/remux/extern.h b/remux/extern.h index 070e4f6..62f193b 100644 --- a/remux/extern.h +++ b/remux/extern.h @@ -6,6 +6,7 @@ #include class cChannel; +class cPatPmtParser; class cServerConnection; namespace Streamdev { @@ -19,6 +20,7 @@ private: public: cExternRemux(const cServerConnection *Connection, const cChannel *Channel, const int *APids, const int *Dpids); + cExternRemux(const cServerConnection *Connection, const cPatPmtParser *PatPmt, const int *APids, const int *Dpids); virtual ~cExternRemux(); int Put(const uchar *Data, int Count); diff --git a/server/connectionHTTP.c b/server/connectionHTTP.c index bcb4b58..517fb2e 100644 --- a/server/connectionHTTP.c +++ b/server/connectionHTTP.c @@ -35,6 +35,7 @@ cConnectionHTTP::cConnectionHTTP(void): cConnectionHTTP::~cConnectionHTTP() { + SetStreamer(NULL); delete m_RecPlayer; delete m_MenuList; } @@ -164,6 +165,14 @@ bool cConnectionHTTP::ProcessRequest(void) } } + tStrStrMap::const_iterator it; + it = m_Params.find("apid"); + if (it != m_Params.end()) + m_Apid[0] = atoi(it->second.c_str()); + it = m_Params.find("dpid"); + if (it != m_Params.end()) + m_Dpid[0] = atoi(it->second.c_str()); + tStrStrMap::const_iterator it_method = Headers().find(REQUEST_METHOD); tStrStrMap::const_iterator it_pathinfo = Headers().find(PATH_INFO); if (it_method == Headers().end() || it_pathinfo == Headers().end()) { @@ -196,30 +205,46 @@ bool cConnectionHTTP::ProcessRequest(void) } else if (m_RecPlayer != NULL) { Dprintf("GET recording\n"); + bool isPes = m_RecPlayer->getCurrentRecording()->IsPesRecording(); + // no remuxing for old PES recordings + if (isPes && m_StreamType != stPES) + return HttpResponse(503, true); + int64_t from, to; bool hasRange = ParseRange(from, to); cStreamdevRecStreamer* recStreamer; if (from == 0 && hasRange && m_ReplayFakeRange) { - recStreamer = new cStreamdevRecStreamer(m_RecPlayer, this); + recStreamer = new cStreamdevRecStreamer(this, m_RecPlayer, m_StreamType, (int64_t) 0L, m_Apid[0] ? m_Apid : NULL, m_Dpid[0] ? m_Dpid : NULL); from += m_ReplayPos; if (to >= 0) to += m_ReplayPos; } else - recStreamer = new cStreamdevRecStreamer(m_RecPlayer, this, m_ReplayPos); + recStreamer = new cStreamdevRecStreamer(this, m_RecPlayer, m_StreamType, m_ReplayPos, m_Apid[0] ? m_Apid : NULL, m_Dpid[0] ? m_Dpid : NULL); SetStreamer(recStreamer); + + if (m_StreamType == stEXT) + return Respond("HTTP/1.0 200 OK"); + else if (m_StreamType == stES && (m_Apid[0] || m_Dpid[0])) + return HttpResponse(200, false, "audio/mpeg"); + + const char* contentType = (isPes || m_RecPlayer->getPatPmtData()->Vpid()) ? "video/mpeg" : "audio/mpeg"; + // range not supported when remuxing + if (m_StreamType != stTS && !isPes) + return HttpResponse(200, false, contentType); + uint64_t total = recStreamer->GetLength(); if (hasRange) { int64_t length = recStreamer->SetRange(from, to); Dprintf("range response: %lld-%lld/%lld, len %lld\n", (long long)from, (long long)to, (long long)total, (long long)length); if (length < 0L) - return HttpResponse(416, true, "video/mpeg", "Accept-Ranges: bytes\r\nContent-Range: bytes */%llu", (unsigned long long) total); + return HttpResponse(416, true, contentType, "Accept-Ranges: bytes\r\nContent-Range: bytes */%llu", (unsigned long long) total); else - return HttpResponse(206, false, "video/mpeg", "Accept-Ranges: bytes\r\nContent-Range: bytes %lld-%lld/%llu\r\nContent-Length: %lld", (long long) from, (long long) to, (unsigned long long) total, (long long) length); + return HttpResponse(206, false, contentType, "Accept-Ranges: bytes\r\nContent-Range: bytes %lld-%lld/%llu\r\nContent-Length: %lld", (long long) from, (long long) to, (unsigned long long) total, (long long) length); } else - return HttpResponse(200, false, "video/mpeg", "Accept-Ranges: bytes"); + return HttpResponse(200, false, contentType, "Accept-Ranges: bytes"); } else { return HttpResponse(404, true); @@ -247,29 +272,45 @@ bool cConnectionHTTP::ProcessRequest(void) } else if (m_RecPlayer != NULL) { Dprintf("HEAD recording\n"); + bool isPes = m_RecPlayer->getCurrentRecording()->IsPesRecording(); + // no remuxing for old PES recordings + if (isPes && m_StreamType != stPES) + return HttpResponse(503, true); + int64_t from, to; bool hasRange = ParseRange(from, to); cStreamdevRecStreamer* recStreamer; if (from == 0 && hasRange && m_ReplayFakeRange) { - recStreamer = new cStreamdevRecStreamer(m_RecPlayer, this, m_ReplayPos); + recStreamer = new cStreamdevRecStreamer(this, m_RecPlayer, m_StreamType, m_ReplayPos, m_Apid[0] ? m_Apid : NULL, m_Dpid[0] ? m_Dpid : NULL); from += m_ReplayPos; if (to >= 0) to += m_ReplayPos; } else - recStreamer = new cStreamdevRecStreamer(m_RecPlayer, this, m_ReplayPos); + recStreamer = new cStreamdevRecStreamer(this, m_RecPlayer, m_StreamType, m_ReplayPos, m_Apid[0] ? m_Apid : NULL, m_Dpid[0] ? m_Dpid : NULL); SetStreamer(recStreamer); + + if (m_StreamType == stEXT) + return Respond("HTTP/1.0 200 OK"); + else if (m_StreamType == stES && (m_Apid[0] || m_Dpid[0])) + return HttpResponse(200, true, "audio/mpeg"); + + const char* contentType = (isPes || m_RecPlayer->getPatPmtData()->Vpid()) ? "video/mpeg" : "audio/mpeg"; + // range not supported when remuxing + if (m_StreamType != stTS && !isPes) + return HttpResponse(200, false, contentType); + uint64_t total = recStreamer->GetLength(); if (hasRange) { int64_t length = recStreamer->SetRange(from, to); if (length < 0L) - return HttpResponse(416, true, "video/mpeg", "Accept-Ranges: bytes\r\nContent-Range: bytes */%llu", (unsigned long long) total); + return HttpResponse(416, true, contentType, "Accept-Ranges: bytes\r\nContent-Range: bytes */%llu", (unsigned long long) total); else - return HttpResponse(206, true, "video/mpeg", "Accept-Ranges: bytes\r\nContent-Range: bytes %lld-%lld/%llu\r\nContent-Length: %lld", (long long) from, (long long) to, (unsigned long long) total, (long long) length); + return HttpResponse(206, true, contentType, "Accept-Ranges: bytes\r\nContent-Range: bytes %lld-%lld/%llu\r\nContent-Length: %lld", (long long) from, (long long) to, (unsigned long long) total, (long long) length); } else - return HttpResponse(200, true, "video/mpeg", "Accept-Ranges: bytes"); + return HttpResponse(200, true, contentType, "Accept-Ranges: bytes"); } else { return HttpResponse(404, true); diff --git a/server/recplayer.c b/server/recplayer.c index 3885b4c..1b60ef5 100644 --- a/server/recplayer.c +++ b/server/recplayer.c @@ -42,6 +42,12 @@ RecPlayer::RecPlayer(const char* FileName) if (!indexFile) esyslog("ERROR: Streamdev: Failed to create indexfile!"); scan(); + + parser = new cPatPmtParser(); + unsigned char buffer[2 * TS_SIZE]; + unsigned long l = getBlock(buffer, 0UL, sizeof(buffer)); + if (!l || !parser->ParsePatPmt(buffer, (int) l)) + esyslog("ERROR: Streamdev: Failed to parse PAT/PMT"); } void RecPlayer::scan() @@ -87,6 +93,7 @@ RecPlayer::~RecPlayer() if (file) fclose(file); delete indexFile; delete recording; + delete parser; } int RecPlayer::openFile(int index) diff --git a/server/recplayer.h b/server/recplayer.h index e56c2cd..60100e8 100644 --- a/server/recplayer.h +++ b/server/recplayer.h @@ -23,6 +23,7 @@ #include #include +#include #include "server/streamer.h" @@ -44,6 +45,7 @@ class RecPlayer int openFile(int index); uint64_t getLastPosition(); cRecording* getCurrentRecording(); + const cPatPmtParser* getPatPmtData() { return parser; } void scan(); uint64_t positionFromResume(int ResumeID); uint64_t positionFromMark(int MarkIndex); @@ -56,6 +58,7 @@ class RecPlayer private: cRecording* recording; cIndexFile* indexFile; + cPatPmtParser* parser; FILE* file; int fileOpen; Segment* segments[1000]; diff --git a/server/recstreamer.c b/server/recstreamer.c index bd01b96..1ee0b25 100644 --- a/server/recstreamer.c +++ b/server/recstreamer.c @@ -12,7 +12,7 @@ using namespace Streamdev; // --- cStreamdevRecStreamer ------------------------------------------------- -cStreamdevRecStreamer::cStreamdevRecStreamer(RecPlayer *RecPlayer, const cServerConnection *Connection, int64_t StartOffset): +cStreamdevRecStreamer::cStreamdevRecStreamer(const cServerConnection *Connection, RecPlayer *RecPlayer, eStreamType StreamType, int64_t StartOffset, const int *Apid, const int *Dpid): cStreamdevStreamer("streamdev-recstreaming", Connection), m_RecPlayer(RecPlayer), m_StartOffset(StartOffset), @@ -20,6 +20,36 @@ cStreamdevRecStreamer::cStreamdevRecStreamer(RecPlayer *RecPlayer, const cServer { Dprintf("New rec streamer\n"); m_To = (int64_t) m_RecPlayer->getLengthBytes() - StartOffset - 1; + + const cPatPmtParser *parser = RecPlayer->getPatPmtData(); + const int *Apids = Apid ? Apid : parser->Apids(); + const int *Dpids = Dpid ? Dpid : parser->Dpids(); + switch (StreamType) { + case stES: + { + int pid = parser->Vpid(); + if (Apid && Apid[0]) + pid = Apid[0]; + else if (Dpid && Dpid[0]) + pid = Dpid[0]; + SetRemux(new cTS2ESRemux(pid)); + } + break; + case stPES: + if (!m_RecPlayer->getCurrentRecording()->IsPesRecording()) + SetRemux(new cTS2PESRemux(parser->Vpid(), Apids, Dpids, parser->Spids())); + break; +#ifdef STREAMDEV_PS + case stPS: + SetRemux(new cTS2PSRemux(parser->Vpid(), Apids, Dpids, parser->Spids())); + break; +#endif + case stEXT: + SetRemux(new cExternRemux(Connection, parser, Apids, Dpids)); + break; + default: + break; + } } cStreamdevRecStreamer::~cStreamdevRecStreamer() diff --git a/server/recstreamer.h b/server/recstreamer.h index ee4b120..68f0c17 100644 --- a/server/recstreamer.h +++ b/server/recstreamer.h @@ -1,6 +1,7 @@ #ifndef VDR_STREAMDEV_RECSTREAMER_H #define VDR_STREAMDEV_RECSTREAMER_H +#include "common.h" #include "server/streamer.h" #include "server/recplayer.h" @@ -26,7 +27,7 @@ public: uint64_t GetLength() { return m_RecPlayer->getLengthBytes() - m_StartOffset; } int64_t SetRange(int64_t &From, int64_t &To); virtual cString ToText() const; - cStreamdevRecStreamer(RecPlayer *RecPlayer, const cServerConnection *Connection, int64_t StartOffset = 0L); + cStreamdevRecStreamer(const cServerConnection *Connection, RecPlayer *RecPlayer, eStreamType StreamType, int64_t StartOffset = 0L, const int *Apids = NULL, const int *Dpids = NULL); virtual ~cStreamdevRecStreamer(); };