- added namespace to remuxers

- increased WRITERBUFSIZE - buffer was too small for high bandwidth content
- removed cStreamdevStreamer::m_Running
- eliminated potential busy waits in remuxers
- updated cTSRemux static helpers to code of their VDR 1.6.0 counterparts
- use a copy of VDR 1.6.0's cRemux for TS to PES remuxing.
- make sure that only complete TS packets are written to ringbuffers
- use signaling instead of sleeps when writing to ringbuffers
- optimized cStreamdevPatFilter PAT packet initialization
- fixed cStreamdevPatFilter not processing PATs with length > TS_SIZE - 5
- use a small ringbuffer for cStreamdevPatFilter instead of writing to
  cStreamdevStreamers SendBuffer as two threads mustn't write to the same
  ringbuffer
Modified Files:
 Tag: v0_4
	CONTRIBUTORS HISTORY Makefile streamdev-server.c
	libdvbmpeg/transform.h remux/extern.c remux/extern.h
	remux/ts2es.c remux/ts2es.h remux/ts2ps.c remux/ts2ps.h
	remux/tsremux.c remux/tsremux.h server/livestreamer.c
	server/livestreamer.h server/streamer.c server/streamer.h
Added Files:
 Tag: v0_4
	remux/ts2pes.c remux/ts2pes.h
This commit is contained in:
schmirl 2009-06-29 06:25:27 +00:00
parent cacd4b73d5
commit 412c6982b6
19 changed files with 2313 additions and 128 deletions

View File

@ -1,6 +1,10 @@
Special thanks go to the following persons (if you think your name is missing Special thanks go to the following persons (if you think your name is missing
here, please send an email to vdrdev@schmirler.de): here, please send an email to vdrdev@schmirler.de):
Klaus Schmidinger
for VDR as a whole
for permission to use VDR 1.6.0 cRemux code for PES remuxing
Sascha Volkenandt, the original author, Sascha Volkenandt, the original author,
for this great plugin for this great plugin
@ -72,6 +76,7 @@ alexw
Olli Lammi Olli Lammi
for fixing a busy wait when client isn't accepting data fast enough for fixing a busy wait when client isn't accepting data fast enough
for suggesting signaling instead of sleeping when writing to buffers
Joerg Pulz Joerg Pulz
for his FreeBSD compatibility patch for his FreeBSD compatibility patch

13
HISTORY
View File

@ -1,6 +1,19 @@
VDR Plugin 'streamdev' Revision History VDR Plugin 'streamdev' Revision History
--------------------------------------- ---------------------------------------
- added namespace to remuxers
- increased WRITERBUFSIZE - buffer was too small for high bandwidth content
- removed cStreamdevStreamer::m_Running
- eliminated potential busy waits in remuxers
- updated cTSRemux static helpers to code of their VDR 1.6.0 counterparts
- use a copy of VDR 1.6.0's cRemux for TS to PES remuxing.
- make sure that only complete TS packets are written to ringbuffers
- use signaling instead of sleeps when writing to ringbuffers
- optimized cStreamdevPatFilter PAT packet initialization
- fixed cStreamdevPatFilter not processing PATs with length > TS_SIZE - 5
- use a small ringbuffer for cStreamdevPatFilter instead of writing to
cStreamdevStreamers SendBuffer as two threads mustn't write to the same
ringbuffer
- added IGMP based multicast streaming - added IGMP based multicast streaming
- ignore trailing blank lines in HTTP requests - ignore trailing blank lines in HTTP requests
- fixed parsing Min/MaxPriority from config (thanks to Joachim König-Baltes) - fixed parsing Min/MaxPriority from config (thanks to Joachim König-Baltes)

View File

@ -1,7 +1,7 @@
# #
# Makefile for a Video Disk Recorder plugin # Makefile for a Video Disk Recorder plugin
# #
# $Id: Makefile,v 1.15.2.1 2009/02/13 10:39:40 schmirl Exp $ # $Id: Makefile,v 1.15.2.2 2009/06/29 06:25:27 schmirl Exp $
# The official name of this plugin. # The official name of this plugin.
# This name will be used in the '-P...' option of VDR to load the plugin. # This name will be used in the '-P...' option of VDR to load the plugin.
@ -62,7 +62,7 @@ SERVEROBJS = $(PLUGIN)-server.o \
server/connectionVTP.o server/connectionHTTP.o server/connectionIGMP.o \ server/connectionVTP.o server/connectionHTTP.o server/connectionIGMP.o \
server/streamer.o server/livestreamer.o server/livefilter.o \ server/streamer.o server/livestreamer.o server/livefilter.o \
server/suspend.o server/setup.o server/menuHTTP.o \ server/suspend.o server/setup.o server/menuHTTP.o \
remux/tsremux.o remux/ts2ps.o remux/ts2es.o remux/extern.o remux/tsremux.o remux/ts2pes.o remux/ts2ps.o remux/ts2es.o remux/extern.o
ifdef DEBUG ifdef DEBUG
DEFINES += -DDEBUG DEFINES += -DDEBUG

View File

@ -106,7 +106,7 @@
#define MAX_PLENGTH 0xFFFF #define MAX_PLENGTH 0xFFFF
#define MMAX_PLENGTH (8*MAX_PLENGTH) #define MMAX_PLENGTH (64*MAX_PLENGTH)
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {

View File

@ -7,6 +7,8 @@
#include <signal.h> #include <signal.h>
#include <unistd.h> #include <unistd.h>
namespace Streamdev {
class cTSExt: public cThread { class cTSExt: public cThread {
private: private:
cRingBufferLinear *m_ResultBuffer; cRingBufferLinear *m_ResultBuffer;
@ -24,6 +26,9 @@ public:
void Put(const uchar *Data, int Count); void Put(const uchar *Data, int Count);
}; };
} // namespace Streamdev
using namespace Streamdev;
cTSExt::cTSExt(cRingBufferLinear *ResultBuffer, std::string Parameter): cTSExt::cTSExt(cRingBufferLinear *ResultBuffer, std::string Parameter):
m_ResultBuffer(ResultBuffer), m_ResultBuffer(ResultBuffer),
m_Active(false), m_Active(false),

View File

@ -5,6 +5,8 @@
#include <vdr/ringbuffer.h> #include <vdr/ringbuffer.h>
#include <string> #include <string>
namespace Streamdev {
class cTSExt; class cTSExt;
class cExternRemux: public cTSRemux { class cExternRemux: public cTSRemux {
@ -21,4 +23,6 @@ public:
void Del(int Count) { m_ResultBuffer->Del(Count); } void Del(int Count) { m_ResultBuffer->Del(Count); }
}; };
} // namespace Streamdev
#endif // VDR_STREAMDEV_EXTERNREMUX_H #endif // VDR_STREAMDEV_EXTERNREMUX_H

View File

@ -1,12 +1,13 @@
#include "remux/ts2es.h" #include "remux/ts2es.h"
#include "server/streamer.h" #include "server/streamer.h"
#include "libdvbmpeg/transform.h"
#include "common.h" #include "common.h"
#include <vdr/device.h> #include <vdr/device.h>
// from VDR's remux.c // from VDR's remux.c
#define MAXNONUSEFULDATA (10*1024*1024) #define MAXNONUSEFULDATA (10*1024*1024)
namespace Streamdev {
class cTS2ES: public ipack { class cTS2ES: public ipack {
friend void PutES(uint8_t *Buffer, int Size, void *Data); friend void PutES(uint8_t *Buffer, int Size, void *Data);
@ -32,6 +33,9 @@ void PutES(uint8_t *Buffer, int Size, void *Data)
This->start = 1; This->start = 1;
} }
} // namespace Streamdev
using namespace Streamdev;
cTS2ES::cTS2ES(cRingBufferLinear *ResultBuffer) cTS2ES::cTS2ES(cRingBufferLinear *ResultBuffer)
{ {
m_ResultBuffer = ResultBuffer; m_ResultBuffer = ResultBuffer;
@ -75,10 +79,10 @@ void cTS2ES::PutTSPacket(const uint8_t *Buffer) {
cTS2ESRemux::cTS2ESRemux(int Pid): cTS2ESRemux::cTS2ESRemux(int Pid):
m_Pid(Pid), m_Pid(Pid),
m_ResultBuffer(new cRingBufferLinear(WRITERBUFSIZE, IPACKS)), m_ResultBuffer(new cStreamdevBuffer(WRITERBUFSIZE, IPACKS)),
m_Remux(new cTS2ES(m_ResultBuffer)) m_Remux(new cTS2ES(m_ResultBuffer))
{ {
m_ResultBuffer->SetTimeouts(0, 100); m_ResultBuffer->SetTimeouts(100, 100);
} }
cTS2ESRemux::~cTS2ESRemux() cTS2ESRemux::~cTS2ESRemux()
@ -111,8 +115,10 @@ int cTS2ESRemux::Put(const uchar *Data, int Count)
break; break;
if (Data[i] != TS_SYNC_BYTE) if (Data[i] != TS_SYNC_BYTE)
break; break;
if (m_ResultBuffer->Free() < 2 * IPACKS) if (m_ResultBuffer->Free() < 2 * IPACKS) {
m_ResultBuffer->WaitForPut();
break; // A cTS2ES might write one full packet and also a small rest break; // A cTS2ES might write one full packet and also a small rest
}
int pid = cTSRemux::GetPid(Data + i + 1); int pid = cTSRemux::GetPid(Data + i + 1);
if (Data[i + 3] & 0x10) { // got payload if (Data[i + 3] & 0x10) { // got payload
if (m_Pid == pid) if (m_Pid == pid)

View File

@ -2,15 +2,16 @@
#define VDR_STREAMDEV_TS2ESREMUX_H #define VDR_STREAMDEV_TS2ESREMUX_H
#include "remux/tsremux.h" #include "remux/tsremux.h"
#include <vdr/ringbuffer.h> #include "server/streamer.h"
namespace Streamdev {
class cTS2ES; class cTS2ES;
class cRingBufferLinear;
class cTS2ESRemux: public cTSRemux { class cTS2ESRemux: public cTSRemux {
private: private:
int m_Pid; int m_Pid;
cRingBufferLinear *m_ResultBuffer; cStreamdevBuffer *m_ResultBuffer;
cTS2ES *m_Remux; cTS2ES *m_Remux;
public: public:
@ -22,4 +23,6 @@ public:
void Del(int Count) { m_ResultBuffer->Del(Count); } void Del(int Count) { m_ResultBuffer->Del(Count); }
}; };
} // namespace Streamdev
#endif // VDR_STREAMDEV_TS2ESREMUX_H #endif // VDR_STREAMDEV_TS2ESREMUX_H

2011
remux/ts2pes.c Normal file

File diff suppressed because it is too large Load Diff

58
remux/ts2pes.h Normal file
View File

@ -0,0 +1,58 @@
/*
* ts2pes.h: A streaming MPEG2 remultiplexer
*
* This file is based on a copy of remux.h from Klaus Schmidinger's
* VDR, version 1.6.0.
*
* $Id: ts2pes.h,v 1.2.2.2 2009/06/29 06:25:28 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_TS2PES_H
#define VDR_STREAMDEV_TS2PES_H
#include "remux/tsremux.h"
#include "server/streamer.h"
#define MAXTRACKS 64
namespace Streamdev {
class cTS2PES;
class cTS2PESRemux: public cTSRemux {
private:
bool noVideo;
bool synced;
int skipped;
cTS2PES *ts2pes[MAXTRACKS];
int numTracks;
cStreamdevBuffer *resultBuffer;
int resultSkipped;
public:
cTS2PESRemux(int VPid, const int *APids, const int *DPids, const int *SPids);
///< Creates a new remuxer for the given PIDs. VPid is the video PID, while
///< APids, DPids and SPids are pointers to zero terminated lists of audio,
///< dolby and subtitle PIDs (the pointers may be NULL if there is no such
///< PID).
~cTS2PESRemux();
int Put(const uchar *Data, int Count);
///< Puts at most Count bytes of Data into the remuxer.
///< \return Returns the number of bytes actually consumed from Data.
uchar *Get(int &Count, uchar *PictureType = NULL);
///< Gets all currently available data from the remuxer.
///< \return Count contains the number of bytes the result points to, and
///< PictureType (if not NULL) will contain one of NO_PICTURE, I_FRAME, P_FRAME
///< or B_FRAME.
void Del(int Count);
///< Deletes Count bytes from the remuxer. Count must be the number returned
///< from a previous call to Get(). Several calls to Del() with fractions of
///< a previously returned Count may be made, but the total sum of all Count
///< values must be exactly what the previous Get() has returned.
void Clear(void);
///< Clears the remuxer of all data it might still contain, keeping the PID
///< settings as they are.
};
} // namespace Streamdev
#endif // VDR_STREAMDEV_TS2PES_H

View File

@ -3,6 +3,8 @@
#include <vdr/channels.h> #include <vdr/channels.h>
#include <vdr/device.h> #include <vdr/device.h>
namespace Streamdev {
class cTS2PS { class cTS2PS {
friend void PutPES(uint8_t *Buffer, int Size, void *Data); friend void PutPES(uint8_t *Buffer, int Size, void *Data);
@ -28,6 +30,9 @@ void PutPES(uint8_t *Buffer, int Size, void *Data)
esyslog("ERROR: result buffer overflow, dropped %d out of %d byte", Size - n, Size); esyslog("ERROR: result buffer overflow, dropped %d out of %d byte", Size - n, Size);
} }
} // namespace Streamdev
using namespace Streamdev;
cTS2PS::cTS2PS(cRingBufferLinear *ResultBuffer, int Pid, uint8_t AudioCid) cTS2PS::cTS2PS(cRingBufferLinear *ResultBuffer, int Pid, uint8_t AudioCid)
{ {
m_ResultBuffer = ResultBuffer; m_ResultBuffer = ResultBuffer;
@ -74,13 +79,13 @@ void cTS2PS::PutTSPacket(const uint8_t *Buffer)
cTS2PSRemux::cTS2PSRemux(int VPid, const int *APids, const int *DPids, const int *SPids): cTS2PSRemux::cTS2PSRemux(int VPid, const int *APids, const int *DPids, const int *SPids):
m_NumTracks(0), m_NumTracks(0),
m_ResultBuffer(new cRingBufferLinear(WRITERBUFSIZE, IPACKS)), m_ResultBuffer(new cStreamdevBuffer(WRITERBUFSIZE, IPACKS)),
m_ResultSkipped(0), m_ResultSkipped(0),
m_Skipped(0), m_Skipped(0),
m_Synced(false), m_Synced(false),
m_IsRadio(VPid == 0 || VPid == 1 || VPid == 0x1FFF) m_IsRadio(VPid == 0 || VPid == 1 || VPid == 0x1FFF)
{ {
m_ResultBuffer->SetTimeouts(0, 100); m_ResultBuffer->SetTimeouts(100, 100);
if (VPid) if (VPid)
m_Remux[m_NumTracks++] = new cTS2PS(m_ResultBuffer, VPid); m_Remux[m_NumTracks++] = new cTS2PS(m_ResultBuffer, VPid);
@ -124,8 +129,10 @@ int cTS2PSRemux::Put(const uchar *Data, int Count)
break; break;
if (Data[i] != TS_SYNC_BYTE) if (Data[i] != TS_SYNC_BYTE)
break; break;
if (m_ResultBuffer->Free() < 2 * IPACKS) if (m_ResultBuffer->Free() < 2 * IPACKS) {
m_ResultBuffer->WaitForPut();
break; // A cTS2PS might write one full packet and also a small rest break; // A cTS2PS might write one full packet and also a small rest
}
int pid = GetPid(Data + i + 1); int pid = GetPid(Data + i + 1);
if (Data[i + 3] & 0x10) { // got payload if (Data[i + 3] & 0x10) { // got payload
for (int t = 0; t < m_NumTracks; t++) { for (int t = 0; t < m_NumTracks; t++) {

View File

@ -2,8 +2,9 @@
#define VDR_STREAMDEV_TS2PESREMUX_H #define VDR_STREAMDEV_TS2PESREMUX_H
#include "remux/tsremux.h" #include "remux/tsremux.h"
#include <vdr/remux.h> #include "server/streamer.h"
#include <vdr/ringbuffer.h>
namespace Streamdev {
class cTS2PS; class cTS2PS;
@ -11,7 +12,7 @@ class cTS2PSRemux: public cTSRemux {
private: private:
int m_NumTracks; int m_NumTracks;
cTS2PS *m_Remux[MAXTRACKS]; cTS2PS *m_Remux[MAXTRACKS];
cRingBufferLinear *m_ResultBuffer; cStreamdevBuffer *m_ResultBuffer;
int m_ResultSkipped; int m_ResultSkipped;
int m_Skipped; int m_Skipped;
bool m_Synced; bool m_Synced;
@ -26,4 +27,6 @@ public:
void Del(int Count) { m_ResultBuffer->Del(Count); } void Del(int Count) { m_ResultBuffer->Del(Count); }
}; };
} // namespace Streamdev
#endif // VDR_STREAMDEV_TS2PESREMUX_H #endif // VDR_STREAMDEV_TS2PESREMUX_H

View File

@ -2,10 +2,15 @@
#define SC_PICTURE 0x00 // "picture header" #define SC_PICTURE 0x00 // "picture header"
#define VIDEO_STREAM_S 0xE0
using namespace Streamdev;
void cTSRemux::SetBrokenLink(uchar *Data, int Length) void cTSRemux::SetBrokenLink(uchar *Data, int Length)
{ {
if (Length > 9 && Data[0] == 0 && Data[1] == 0 && Data[2] == 1 && (Data[3] & 0xF0) == VIDEO_STREAM_S) { int PesPayloadOffset = 0;
for (int i = Data[8] + 9; i < Length - 7; i++) { // +9 to skip video packet header if (AnalyzePesHeader(Data, Length, PesPayloadOffset) >= phMPEG1 && (Data[3] & 0xF0) == VIDEO_STREAM_S) {
for (int i = PesPayloadOffset; i < Length - 7; i++) {
if (Data[i] == 0 && Data[i + 1] == 0 && Data[i + 2] == 1 && Data[i + 3] == 0xB8) { if (Data[i] == 0 && Data[i + 1] == 0 && Data[i + 2] == 1 && Data[i + 3] == 0xB8) {
if (!(Data[i + 7] & 0x40)) // set flag only if GOP is not closed if (!(Data[i + 7] & 0x40)) // set flag only if GOP is not closed
Data[i + 7] |= 0x20; Data[i + 7] |= 0x20;
@ -39,16 +44,39 @@ int cTSRemux::ScanVideoPacket(const uchar *Data, int Count, int Offset, uchar &P
// If the return value is -1 the packet was not completely in the buffer. // If the return value is -1 the packet was not completely in the buffer.
int Length = GetPacketLength(Data, Count, Offset); int Length = GetPacketLength(Data, Count, Offset);
if (Length > 0) { if (Length > 0) {
if (Length >= 8) { int PesPayloadOffset = 0;
int i = Offset + 8; // the minimum length of the video packet header if (AnalyzePesHeader(Data + Offset, Length, PesPayloadOffset) >= phMPEG1) {
i += Data[i] + 1; // possible additional header bytes const uchar *p = Data + Offset + PesPayloadOffset + 2;
for (; i < Offset + Length - 5; i++) { const uchar *pLimit = Data + Offset + Length - 3;
if (Data[i] == 0 && Data[i + 1] == 0 && Data[i + 2] == 1) { #ifdef TEST_cVideoRepacker
switch (Data[i + 3]) { // cVideoRepacker ensures that a new PES packet is started for a new sequence,
case SC_PICTURE: PictureType = (Data[i + 5] >> 3) & 0x07; // group or picture which allows us to easily skip scanning through a huge
// amount of video data.
if (p < pLimit) {
if (p[-2] || p[-1] || p[0] != 0x01)
pLimit = 0; // skip scanning: packet doesn't start with 0x000001
else {
switch (p[1]) {
case SC_SEQUENCE:
case SC_GROUP:
case SC_PICTURE:
break;
default: // skip scanning: packet doesn't start a new sequence, group or picture
pLimit = 0;
}
}
}
#endif
while (p < pLimit && (p = (const uchar *)memchr(p, 0x01, pLimit - p))) {
if (!p[-2] && !p[-1]) { // found 0x000001
switch (p[1]) {
case SC_PICTURE: PictureType = (p[3] >> 3) & 0x07;
return Length; return Length;
} }
p += 4; // continue scanning after 0x01ssxxyy
} }
else
p += 3; // continue scanning after 0x01xxyy
} }
} }
PictureType = NO_PICTURE; PictureType = NO_PICTURE;

View File

@ -4,30 +4,22 @@
#include "libdvbmpeg/transform.h" #include "libdvbmpeg/transform.h"
#include <vdr/remux.h> #include <vdr/remux.h>
#define RESULTBUFFERSIZE KILOBYTE(256) // Picture types:
#define NO_PICTURE 0
#define I_FRAME 1
#define P_FRAME 2
#define B_FRAME 3
namespace Streamdev {
class cTSRemux { class cTSRemux {
protected:
/*uchar m_ResultBuffer[RESULTBUFFERSIZE];
int m_ResultCount;
int m_ResultDelivered;
int m_Synced;
int m_Skipped;
int m_Sync;
virtual void PutTSPacket(int Pid, const uint8_t *Data) = 0;
public: public:
cTSRemux(bool Sync = true);
virtual ~cTSRemux();
virtual uchar *Process(const uchar *Data, int &Count, int &Result);*/
static void SetBrokenLink(uchar *Data, int Length); static void SetBrokenLink(uchar *Data, int Length);
static int GetPid(const uchar *Data); static int GetPid(const uchar *Data);
static int GetPacketLength(const uchar *Data, int Count, int Offset); static int GetPacketLength(const uchar *Data, int Count, int Offset);
static int ScanVideoPacket(const uchar *Data, int Count, int Offset, uchar &PictureType); static int ScanVideoPacket(const uchar *Data, int Count, int Offset, uchar &PictureType);
}; };
} // namespace Streamdev
#endif // VDR_STREAMDEV_TSREMUX_H #endif // VDR_STREAMDEV_TSREMUX_H

View File

@ -8,11 +8,12 @@
#include "server/livestreamer.h" #include "server/livestreamer.h"
#include "server/livefilter.h" #include "server/livefilter.h"
#include "remux/ts2ps.h" #include "remux/ts2ps.h"
#include "remux/ts2pes.h"
#include "remux/ts2es.h" #include "remux/ts2es.h"
#include "remux/extern.h" #include "remux/extern.h"
#include "common.h" #include "common.h"
#define TSPATREPACKER using namespace Streamdev;
// --- cStreamdevLiveReceiver ------------------------------------------------- // --- cStreamdevLiveReceiver -------------------------------------------------
@ -73,6 +74,8 @@ private:
int pmtPid; int pmtPid;
int pmtSid; int pmtSid;
int pmtVersion; int pmtVersion;
uchar tspat_buf[TS_SIZE];
cStreamdevBuffer siBuffer;
const cChannel *m_Channel; const cChannel *m_Channel;
cStreamdevLiveStreamer *m_Streamer; cStreamdevLiveStreamer *m_Streamer;
@ -82,9 +85,11 @@ private:
int GetPid(SI::PMT::Stream& stream); int GetPid(SI::PMT::Stream& stream);
public: public:
cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel); cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel);
uchar* Get(int &Count) { return siBuffer.Get(Count); }
void Del(int Count) { return siBuffer.Del(Count); }
}; };
cStreamdevPatFilter::cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel) cStreamdevPatFilter::cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel): siBuffer(10 * TS_SIZE, TS_SIZE)
{ {
Dprintf("cStreamdevPatFilter(\"%s\")\n", Channel->Name()); Dprintf("cStreamdevPatFilter(\"%s\")\n", Channel->Name());
assert(Streamer); assert(Streamer);
@ -94,6 +99,29 @@ cStreamdevPatFilter::cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const
pmtSid = 0; pmtSid = 0;
pmtVersion = -1; pmtVersion = -1;
Set(0x00, 0x00); // PAT Set(0x00, 0x00); // PAT
// initialize PAT buffer. Only some values are dynamic (see comments)
memset(tspat_buf, 0xff, TS_SIZE);
tspat_buf[0] = TS_SYNC_BYTE; // Transport packet header sunchronization byte (1000011 = 0x47h)
tspat_buf[1] = 0x40; // Set payload unit start indicator bit
tspat_buf[2] = 0x0; // PID
tspat_buf[3] = 0x10; // Set payload flag, DYNAMIC: Continuity counter
tspat_buf[4] = 0x0; // SI pointer field
tspat_buf[5] = 0x0; // PAT table id
tspat_buf[6] = 0xb0; // Section syntax indicator bit and reserved bits set
tspat_buf[7] = 12 + 1; // Section length (12 bit): PAT_TABLE_LEN + 1
tspat_buf[8] = 0; // DYNAMIC: Transport stream ID (bits 8-15)
tspat_buf[9] = 0; // DYNAMIC: Transport stream ID (bits 0-7)
tspat_buf[10] = 0xc0; // Reserved, DYNAMIC: Version number, DYNAMIC: Current next indicator
tspat_buf[11] = 0x0; // Section number
tspat_buf[12] = 0x0; // Last section number
tspat_buf[13] = 0; // DYNAMIC: Program number (bits 8-15)
tspat_buf[14] = 0; // DYNAMIC: Program number (bits 0-7)
tspat_buf[15] = 0xe0; // Reserved, DYNAMIC: Network ID (bits 8-12)
tspat_buf[16] = 0; // DYNAMIC: Network ID (bits 0-7)
tspat_buf[17] = 0; // DYNAMIC: Checksum
tspat_buf[18] = 0; // DYNAMIC: Checksum
tspat_buf[19] = 0; // DYNAMIC: Checksum
tspat_buf[20] = 0; // DYNAMIC: Checksum
} }
static const char * const psStreamTypes[] = { static const char * const psStreamTypes[] = {
@ -233,34 +261,18 @@ void cStreamdevPatFilter::Process(u_short Pid, u_char Tid, const u_char *Data, i
if (0 != (pmtPid = assoc.getPid())) { if (0 != (pmtPid = assoc.getPid())) {
Dprintf("cStreamdevPatFilter: PMT pid for channel %s: %d\n", Channel->Name(), pmtPid); Dprintf("cStreamdevPatFilter: PMT pid for channel %s: %d\n", Channel->Name(), pmtPid);
pmtSid = assoc.getServiceId(); pmtSid = assoc.getServiceId();
if (Length < TS_SIZE-5) {
// repack PAT to TS frame and send to client // repack PAT to TS frame and send to client
#ifndef TSPATREPACKER
uint8_t pat_ts[TS_SIZE] = {TS_SYNC_BYTE, 0x40 /* pusi=1 */, 0 /* pid=0 */, 0x10 /* adaption=1 */, 0 /* pointer */};
memcpy(pat_ts + 5, Data, Length);
m_Streamer->Put(pat_ts, TS_SIZE);
#else
int ts_id; int ts_id;
unsigned int crc, i, len; unsigned int crc, i, len;
uint8_t *tmp, tspat_buf[TS_SIZE]; uint8_t *tmp;
static uint8_t ccounter = 0; static uint8_t ccounter = 0;
ccounter = (ccounter + 1) % 16; ccounter = (ccounter + 1) % 16;
memset(tspat_buf, 0xff, TS_SIZE);
ts_id = Channel->Tid(); // Get transport stream id of the channel ts_id = Channel->Tid(); // Get transport stream id of the channel
tspat_buf[0] = TS_SYNC_BYTE; // Transport packet header sunchronization byte (1000011 = 0x47h)
tspat_buf[1] = 0x40; // Set payload unit start indicator bit
tspat_buf[2] = 0x0; // PID
tspat_buf[3] = 0x10 | ccounter; // Set payload flag, Continuity counter tspat_buf[3] = 0x10 | ccounter; // Set payload flag, Continuity counter
tspat_buf[4] = 0x0; // SI pointer field
tspat_buf[5] = 0x0; // PAT table id
tspat_buf[6] = 0xb0; // Section syntax indicator bit and reserved bits set
tspat_buf[7] = 12 + 1; // Section length (12 bit): PAT_TABLE_LEN + 1
tspat_buf[8] = (ts_id >> 8); // Transport stream ID (bits 8-15) tspat_buf[8] = (ts_id >> 8); // Transport stream ID (bits 8-15)
tspat_buf[9] = (ts_id & 0xff); // Transport stream ID (bits 0-7) tspat_buf[9] = (ts_id & 0xff); // Transport stream ID (bits 0-7)
tspat_buf[10] = 0xc0 | ((pat.getVersionNumber() << 1) & 0x3e) | tspat_buf[10] = 0xc0 | ((pat.getVersionNumber() << 1) & 0x3e) |
pat.getCurrentNextIndicator();// Version number, Current next indicator pat.getCurrentNextIndicator();// Version number, Current next indicator
tspat_buf[11] = 0x0; // Section number
tspat_buf[12] = 0x0; // Last section number
tspat_buf[13] = (pmtSid >> 8); // Program number (bits 8-15) tspat_buf[13] = (pmtSid >> 8); // Program number (bits 8-15)
tspat_buf[14] = (pmtSid & 0xff); // Program number (bits 0-7) tspat_buf[14] = (pmtSid & 0xff); // Program number (bits 0-7)
tspat_buf[15] = 0xe0 | (pmtPid >> 8); // Network ID (bits 8-12) tspat_buf[15] = 0xe0 | (pmtPid >> 8); // Network ID (bits 8-12)
@ -277,10 +289,9 @@ void cStreamdevPatFilter::Process(u_short Pid, u_char Tid, const u_char *Data, i
tspat_buf[18] = crc >> 16 & 0xff; // Checksum tspat_buf[18] = crc >> 16 & 0xff; // Checksum
tspat_buf[19] = crc >> 8 & 0xff; // Checksum tspat_buf[19] = crc >> 8 & 0xff; // Checksum
tspat_buf[20] = crc & 0xff; // Checksum tspat_buf[20] = crc & 0xff; // Checksum
m_Streamer->Put(tspat_buf, TS_SIZE); int written = siBuffer.PutTS(tspat_buf, TS_SIZE);
#endif if (written != TS_SIZE)
} else siBuffer.ReportOverflow(TS_SIZE - written);
isyslog("cStreamdevPatFilter: PAT size %d too large to fit in one TS", Length);
if (pmtPid != prevPmtPid) { if (pmtPid != prevPmtPid) {
m_Streamer->SetPids(pmtPid); m_Streamer->SetPids(pmtPid);
Add(pmtPid, 0x02); Add(pmtPid, 0x02);
@ -301,7 +312,7 @@ void cStreamdevPatFilter::Process(u_short Pid, u_char Tid, const u_char *Data, i
if (pmtVersion != -1) { if (pmtVersion != -1) {
if (pmtVersion != pmt.getVersionNumber()) { if (pmtVersion != pmt.getVersionNumber()) {
Dprintf("cStreamdevPatFilter: PMT version changed, detaching all pids\n"); Dprintf("cStreamdevPatFilter: PMT version changed, detaching all pids\n");
Del(pmtPid, 0x02); cFilter::Del(pmtPid, 0x02);
pmtPid = 0; // this triggers PAT scan pmtPid = 0; // this triggers PAT scan
} }
return; return;
@ -473,8 +484,8 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType Str
} }
case stPES: case stPES:
m_PESRemux = new cRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(), m_PESRemux = new cTS2PESRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(),
m_Channel->Spids(), false); m_Channel->Spids());
return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids()); return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids());
case stPS: case stPS:
@ -512,6 +523,17 @@ int cStreamdevLiveStreamer::Put(const uchar *Data, int Count)
{ {
switch (m_StreamType) { switch (m_StreamType) {
case stTS: case stTS:
// insert si data
if (m_PatFilter) {
int got;
uchar *si = m_PatFilter->Get(got);
if (si) {
int count = cStreamdevStreamer::Put(si, got);
if (count)
m_PatFilter->Del(count);
}
}
// fall through
case stTSPIDS: case stTSPIDS:
return cStreamdevStreamer::Put(Data, Count); return cStreamdevStreamer::Put(Data, Count);

View File

@ -7,10 +7,12 @@
#include "server/streamer.h" #include "server/streamer.h"
#include "common.h" #include "common.h"
namespace Streamdev {
class cTS2PSRemux; class cTS2PSRemux;
class cTS2ESRemux; class cTS2ESRemux;
class cExternRemux; class cExternRemux;
class cRemux; class cTS2PESRemux;
}
class cStreamdevPatFilter; class cStreamdevPatFilter;
class cStreamdevLiveReceiver; class cStreamdevLiveReceiver;
@ -27,10 +29,10 @@ private:
cDevice *m_Device; cDevice *m_Device;
cStreamdevLiveReceiver *m_Receiver; cStreamdevLiveReceiver *m_Receiver;
cStreamdevPatFilter *m_PatFilter; cStreamdevPatFilter *m_PatFilter;
cRemux *m_PESRemux; Streamdev::cTS2PESRemux *m_PESRemux;
cTS2ESRemux *m_ESRemux; Streamdev::cTS2ESRemux *m_ESRemux;
cTS2PSRemux *m_PSRemux; Streamdev::cTS2PSRemux *m_PSRemux;
cExternRemux *m_ExtRemux; Streamdev::cExternRemux *m_ExtRemux;
void StartReceiver(void); void StartReceiver(void);
bool HasPid(int Pid); bool HasPid(int Pid);

View File

@ -1,5 +1,5 @@
/* /*
* $Id: streamer.c,v 1.16.2.2 2009/02/13 10:39:42 schmirl Exp $ * $Id: streamer.c,v 1.16.2.3 2009/06/29 06:25:30 schmirl Exp $
*/ */
#include <vdr/ringbuffer.h> #include <vdr/ringbuffer.h>
@ -14,6 +14,13 @@
#include "tools/select.h" #include "tools/select.h"
#include "common.h" #include "common.h"
// --- cStreamdevBuffer -------------------------------------------------------
cStreamdevBuffer::cStreamdevBuffer(int Size, int Margin, bool Statistics, const char *Description):
cRingBufferLinear(Size, Margin, Statistics, Description)
{
}
// --- cStreamdevWriter ------------------------------------------------------- // --- cStreamdevWriter -------------------------------------------------------
cStreamdevWriter::cStreamdevWriter(cTBSocket *Socket, cStreamdevWriter::cStreamdevWriter(cTBSocket *Socket,
@ -95,14 +102,13 @@ void cStreamdevWriter::Action(void)
cStreamdevStreamer::cStreamdevStreamer(const char *Name): cStreamdevStreamer::cStreamdevStreamer(const char *Name):
cThread(Name), cThread(Name),
m_Running(false),
m_Writer(NULL), m_Writer(NULL),
m_RingBuffer(new cRingBufferLinear(STREAMERBUFSIZE, TS_SIZE * 2, m_RingBuffer(new cStreamdevBuffer(STREAMERBUFSIZE, TS_SIZE * 2,
true, "streamdev-streamer")), true, "streamdev-streamer")),
m_SendBuffer(new cRingBufferLinear(WRITERBUFSIZE, TS_SIZE * 2)) m_SendBuffer(new cStreamdevBuffer(WRITERBUFSIZE, TS_SIZE * 2))
{ {
m_RingBuffer->SetTimeouts(0, 100); m_RingBuffer->SetTimeouts(0, 100);
m_SendBuffer->SetTimeouts(0, 100); m_SendBuffer->SetTimeouts(100, 100);
} }
cStreamdevStreamer::~cStreamdevStreamer() cStreamdevStreamer::~cStreamdevStreamer()
@ -116,7 +122,6 @@ void cStreamdevStreamer::Start(cTBSocket *Socket)
{ {
Dprintf("start streamer\n"); Dprintf("start streamer\n");
m_Writer = new cStreamdevWriter(Socket, this); m_Writer = new cStreamdevWriter(Socket, this);
m_Running = true;
Attach(); Attach();
} }
@ -135,9 +140,8 @@ void cStreamdevStreamer::Stop(void)
Dprintf("stopping streamer\n"); Dprintf("stopping streamer\n");
Cancel(3); Cancel(3);
} }
if (m_Running) { if (m_Writer) {
Detach(); Detach();
m_Running = false;
DELETENULL(m_Writer); DELETENULL(m_Writer);
} }
} }
@ -152,8 +156,6 @@ void cStreamdevStreamer::Action(void)
int count = Put(block, got); int count = Put(block, got);
if (count) if (count)
m_RingBuffer->Del(count); m_RingBuffer->Del(count);
else
cCondWait::SleepMs(100);
} }
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* $Id: streamer.h,v 1.8.2.2 2009/02/13 10:39:42 schmirl Exp $ * $Id: streamer.h,v 1.8.2.3 2009/06/29 06:25:30 schmirl Exp $
*/ */
#ifndef VDR_STREAMDEV_STREAMER_H #ifndef VDR_STREAMDEV_STREAMER_H
@ -16,8 +16,34 @@ class cStreamdevStreamer;
#define TS_SIZE 188 #define TS_SIZE 188
#endif #endif
#define STREAMERBUFSIZE MEGABYTE(4) #define STREAMERBUFSIZE (20000 * TS_SIZE)
#define WRITERBUFSIZE KILOBYTE(256) #define WRITERBUFSIZE (5000 * TS_SIZE)
// --- cStreamdevBuffer -------------------------------------------------------
class cStreamdevBuffer: public cRingBufferLinear {
public:
// make public
void WaitForPut(void) { cRingBuffer::WaitForPut(); }
// Always write complete TS packets
// (assumes Count is a multiple of TS_SIZE)
int PutTS(const uchar *Data, int Count);
cStreamdevBuffer(int Size, int Margin = 0, bool Statistics = false, const char *Description = NULL);
};
inline int cStreamdevBuffer::PutTS(const uchar *Data, int Count)
{
int free = Free();
if (free < Count)
Count = free;
Count -= Count % TS_SIZE;
if (Count)
Count = Put(Data, Count);
else
WaitForPut();
return Count;
}
// --- cStreamdevWriter ------------------------------------------------------- // --- cStreamdevWriter -------------------------------------------------------
@ -38,15 +64,14 @@ public:
class cStreamdevStreamer: public cThread { class cStreamdevStreamer: public cThread {
private: private:
bool m_Running;
cStreamdevWriter *m_Writer; cStreamdevWriter *m_Writer;
cRingBufferLinear *m_RingBuffer; cStreamdevBuffer *m_RingBuffer;
cRingBufferLinear *m_SendBuffer; cStreamdevBuffer *m_SendBuffer;
protected: protected:
virtual void Action(void); virtual void Action(void);
bool IsRunning(void) const { return m_Running; } bool IsRunning(void) const { return m_Writer; }
public: public:
cStreamdevStreamer(const char *Name); cStreamdevStreamer(const char *Name);
@ -57,10 +82,10 @@ public:
bool Abort(void); bool Abort(void);
void Activate(bool On); void Activate(bool On);
int Receive(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); } int Receive(uchar *Data, int Length) { return m_RingBuffer->PutTS(Data, Length); }
void ReportOverflow(int Bytes) { m_RingBuffer->ReportOverflow(Bytes); } void ReportOverflow(int Bytes) { m_RingBuffer->ReportOverflow(Bytes); }
virtual int Put(const uchar *Data, int Count) { return m_SendBuffer->Put(Data, Count); } virtual int Put(const uchar *Data, int Count) { return m_SendBuffer->PutTS(Data, Count); }
virtual uchar *Get(int &Count) { return m_SendBuffer->Get(Count); } virtual uchar *Get(int &Count) { return m_SendBuffer->Get(Count); }
virtual void Del(int Count) { m_SendBuffer->Del(Count); } virtual void Del(int Count) { m_SendBuffer->Del(Count); }

View File

@ -3,7 +3,7 @@
* *
* See the README file for copyright information and how to reach the author. * See the README file for copyright information and how to reach the author.
* *
* $Id: streamdev-server.c,v 1.7.2.2 2008/10/14 11:05:57 schmirl Exp $ * $Id: streamdev-server.c,v 1.7.2.3 2009/06/29 06:25:27 schmirl Exp $
*/ */
#include <getopt.h> #include <getopt.h>
@ -12,7 +12,6 @@
#include "server/setup.h" #include "server/setup.h"
#include "server/server.h" #include "server/server.h"
#include "server/suspend.h" #include "server/suspend.h"
#include "remux/extern.h"
#include "i18n.h" #include "i18n.h"
#if VDRVERSNUM < 10400 #if VDRVERSNUM < 10400