1
0
mirror of https://github.com/VDR4Arch/vdr.git synced 2023-10-10 13:36:52 +02:00

Improved buffer handling

This commit is contained in:
Klaus Schmidinger 2004-10-16 09:36:28 +02:00
parent 15030f6ace
commit 6415cc900d
19 changed files with 724 additions and 544 deletions

View File

@ -723,6 +723,7 @@ Sascha Volkenandt <sascha@akv-soft.de>
for suggesting to ignore unused "none" color entries in XPM files written by
some broken graphics tools
for fixing a memory leak in theme description handling
for pointing out a "near miss" condition in cCondVar
Malcolm Caldwell <malcolm.caldwell@ntu.edu.au>
for modifying LOF handling to allow for C-band reception
@ -1037,6 +1038,9 @@ Marco Schl
for suggesting to avoiding flashing effects in the OSD of full featured DVB cards
by explicitly clearing the OSD windows before opening them
for fixing a possible NULL pointer assignment in cMenuText::SetText()
for doing some testing regarding buffer performance and giving me some hints that
finally lead to finding out that the basic problem causing buffer overflows was in
EnableGet()/EnablePut() being called too often
Jürgen Schmitz <j.schmitz@web.de>
for reporting a bug in displaying the current channel when switching via the SVDRP

32
HISTORY
View File

@ -2967,7 +2967,7 @@ Video Disk Recorder Revision History
- The 'radio' channel icon is now only displayed in the ST:TNG skin if the channel
actually has an APID.
2004-08-08: Version 1.3.13
2004-10-16: Version 1.3.13
- Fixed checking for the presence of NPTL (thanks to Jouni Karvo).
- Making sure section filters are only set if the device actually has a lock
@ -2976,3 +2976,33 @@ Video Disk Recorder Revision History
Marco Schlüssler).
- Fixed a crash in case the last line in channels.conf is a group separator and
that group is selected in the channel display (thanks to Dick Streefland).
- Added cRingBufferLinear::Read() to read directly from a file handle into the
ring buffer.
- Using timeouts in ring buffers to avoid 'usleep()'.
- Clearing the 'Transfer Mode' ring buffer after clearing the device to avoid
an "almost full" ring buffer.
- Removed locking from cRingBufferLinear for better performance under high load.
- Using a cRingBufferLinear in cRemux to avoid unnecessary copying of data.
- Using a cRingBufferLinear in cTSBuffer and filling it in a separate thread
to avoid buffer overflows. Plugins using cTSBuffer will need to remove the
call to the now obsolete Read() function (see cDvbDevice::GetTSPacket() for
the new usage of cTSBuffer).
- cRemux::Process() has been split into Put(), Get() and Del() to allow for a
better decoupling of the remuxing and disk writing process. Plugins using
cRemux will need to be modified accordingly.
- The actual disk writing in recordings is now done in a separate thread to
improve the overall throughput.
- Changed cRemux so that it returns the maximum available amount of data with
each call, not just 2048 byte.
- Added a visual display of all cRingBufferLinear buffers for debugging. To
activate it, define DEBUGRINGBUFFERS in ringbuffer.h.
- Instead of cCondVar now using the new cCondWait (which also avoids a possible
"near miss" condition; thanks to Sascha Volkenandt for pointing out this one).
cCondVar is still present for plugins that use it (and VDR itself also still
uses it in cRemote).
- The cRingBuffer now does EnableGet()/EnablePut() only if the buffer is more than
one third full or empty, respectively. This dramatically improves recording
performance and reduces system load (thanks to Marco Schlüßler for doing some
testing regarding buffer performance and giving me some hints that finally led
to finding out that this was the basic problem causing buffer overflows).
- Improved Transfer Mode (thanks to Marco Schlüßler for suggestions and testing).

View File

@ -24,3 +24,7 @@ VDR Plugin 'sky' Revision History
- Added automatic DST detection to getskyepg.pl.
- Fixed handling receivers, so that a recording on the same channel
won't interrupt an ongoing Transfer mode.
2004-10-16: Version 0.3.1
- Improved buffer handling.

View File

@ -3,7 +3,7 @@
*
* See the README file for copyright information and how to reach the author.
*
* $Id: sky.c 1.6 2004/02/15 14:59:46 kls Exp $
* $Id: sky.c 1.7 2004/10/16 09:10:06 kls Exp $
*/
#include <sys/socket.h>
@ -14,7 +14,7 @@
#include <vdr/plugin.h>
#include <vdr/sources.h>
static const char *VERSION = "0.3.0";
static const char *VERSION = "0.3.1";
static const char *DESCRIPTION = "Sky Digibox interface";
// --- cDigiboxDevice --------------------------------------------------------
@ -140,7 +140,7 @@ bool cDigiboxDevice::OpenDvr(void)
CloseDvr();
fd_dvr = open("/dev/video2", O_RDONLY | O_NONBLOCK);//XXX parameter???
if (fd_dvr >= 0)
tsBuffer = new cTSBuffer(fd_dvr, KILOBYTE(256), CardIndex() + 1);
tsBuffer = new cTSBuffer(fd_dvr, MEGABYTE(2), CardIndex() + 1);
return fd_dvr >= 0;
}
@ -157,24 +157,16 @@ void cDigiboxDevice::CloseDvr(void)
bool cDigiboxDevice::GetTSPacket(uchar *&Data)
{
if (tsBuffer) {
int r = tsBuffer->Read();
if (r >= 0) {
Data = tsBuffer->Get();
if (Data) {
// insert the actual PIDs:
int Pid = (((uint16_t)Data[1] & PID_MASK_HI) << 8) | Data[2];
if (Pid == DUMMYAPID)
Pid = apid;
else if (Pid == DUMMYVPID)
Pid = vpid;
Data[1] = ((Pid >> 8) & 0xFF) | (Data[1] & ~PID_MASK_HI);
Data[2] = Pid & 0xFF;
}
return true;
}
else if (FATALERRNO) {
LOG_ERROR;
return false;
Data = tsBuffer->Get();
if (Data) {
// insert the actual PIDs:
int Pid = (((uint16_t)Data[1] & PID_MASK_HI) << 8) | Data[2];
if (Pid == DUMMYAPID)
Pid = apid;
else if (Pid == DUMMYVPID)
Pid = vpid;
Data[1] = ((Pid >> 8) & 0xFF) | (Data[1] & ~PID_MASK_HI);
Data[2] = Pid & 0xFF;
}
return true;
}

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: device.c 1.56 2004/06/19 08:51:05 kls Exp $
* $Id: device.c 1.57 2004/10/09 12:53:02 kls Exp $
*/
#include "device.h"
@ -801,79 +801,67 @@ void cDevice::Detach(cReceiver *Receiver)
cTSBuffer::cTSBuffer(int File, int Size, int CardIndex)
{
SetDescription("TS buffer on device %d", CardIndex);
f = File;
size = Size / TS_SIZE * TS_SIZE;
cardIndex = CardIndex;
tsRead = tsWrite = 0;
buf = (f >= 0 && size >= TS_SIZE) ? MALLOC(uchar, size + TS_SIZE) : NULL;
// the '+ TS_SIZE' allocates some extra space for handling packets that got split by a buffer roll-over
firstRead = true;
active = false;
delivered = false;
ringBuffer = new cRingBufferLinear(Size, TS_SIZE, true, "TS");
ringBuffer->SetTimeouts(100, 100);
Start();
}
cTSBuffer::~cTSBuffer()
{
free(buf);
active = false;
Cancel(3);
delete ringBuffer;
}
int cTSBuffer::Read(void)
void cTSBuffer::Action(void)
{
if (buf) {
cPoller Poller(f, false);
bool repeat;
int total = 0;
do {
repeat = false;
if (firstRead || Used() > TS_SIZE || Poller.Poll(100)) { // only wait if there's not enough data in the buffer
firstRead = false;
if (tsRead == tsWrite)
tsRead = tsWrite = 0; // keep the maximum buffer space available
if (tsWrite >= size && tsRead > 0)
tsWrite = 0;
int free = tsRead <= tsWrite ? size - tsWrite : tsRead - tsWrite - 1;
if (free > 0) {
int r = read(f, buf + tsWrite, free);
if (r > 0) {
total += r;
tsWrite += r;
if (tsWrite >= size && tsRead > 0) {
tsWrite = 0;
repeat = true; // read again after a boundary roll-over
}
}
}
}
} while (repeat);
return total;
if (ringBuffer) {
bool firstRead = true;
cPoller Poller(f);
active = true;
for (; active;) {
if (firstRead || Poller.Poll(100)) {
firstRead = false;
int r = ringBuffer->Read(f);
if (r < 0 && FATALERRNO) {
if (errno == EOVERFLOW)
esyslog("ERROR: driver buffer overflow on device %d", cardIndex);
else {
LOG_ERROR;
break;
}
}
}
}
}
return -1;
}
uchar *cTSBuffer::Get(void)
{
if (Used() >= TS_SIZE) {
uchar *p = buf + tsRead;
int Count = 0;
if (delivered) {
ringBuffer->Del(TS_SIZE);
delivered = false;
}
uchar *p = ringBuffer->Get(Count);
if (p && Count >= TS_SIZE) {
if (*p != TS_SYNC_BYTE) {
esyslog("ERROR: not sync'ed to TS packet on device %d", cardIndex);
int tsMax = tsRead < tsWrite ? tsWrite : size;
for (int i = tsRead; i < tsMax; i++) {
if (buf[i] == TS_SYNC_BYTE) {
esyslog("ERROR: skipped %d bytes to sync on TS packet on device %d", i - tsRead, cardIndex);
tsRead = i;
return NULL;
for (int i = 1; i < Count; i++) {
if (p[i] == TS_SYNC_BYTE) {
Count = i;
break;
}
}
if ((tsRead = tsMax) >= size)
tsRead = 0;
ringBuffer->Del(Count);
esyslog("ERROR: skipped %d bytes to sync on TS packet on device %d", Count, cardIndex);
return NULL;
}
if (tsRead + TS_SIZE > size) {
// the packet rolled over the buffer boundary, so let's fetch the rest from the beginning (which MUST be there, since Used() >= TS_SIZE)
int rest = TS_SIZE - (size - tsRead);
memcpy(buf + size, buf, rest);
tsRead = rest;
}
else if ((tsRead += TS_SIZE) >= size)
tsRead = 0;
delivered = true;
return p;
}
return NULL;

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: device.h 1.44 2004/06/19 08:50:37 kls Exp $
* $Id: device.h 1.45 2004/09/24 14:07:22 kls Exp $
*/
#ifndef __DEVICE_H
@ -15,6 +15,7 @@
#include "filter.h"
#include "nit.h"
#include "pat.h"
#include "ringbuffer.h"
#include "sdt.h"
#include "sections.h"
#include "thread.h"
@ -437,20 +438,17 @@ public:
/// sure the returned data points to a TS packet and automatically
/// re-synchronizes after broken packets.
class cTSBuffer {
class cTSBuffer : public cThread {
private:
int f;
int size;
int cardIndex;
int tsRead;
int tsWrite;
uchar *buf;
bool firstRead;
int Used(void) { return tsRead <= tsWrite ? tsWrite - tsRead : size - tsRead + tsWrite; }
bool active;
bool delivered;
cRingBufferLinear *ringBuffer;
virtual void Action(void);
public:
cTSBuffer(int File, int Size, int CardIndex);
~cTSBuffer();
int Read(void);
uchar *Get(void);
};

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: dvbdevice.c 1.93 2004/06/19 13:48:00 kls Exp $
* $Id: dvbdevice.c 1.94 2004/10/15 13:07:52 kls Exp $
*/
#include "dvbdevice.h"
@ -78,8 +78,7 @@ private:
bool useCa;
time_t startTime;
eTunerStatus tunerStatus;
cMutex mutex;
cCondVar newSet;
cCondWait newSet;
bool SetFrontend(void);
virtual void Action(void);
public:
@ -111,7 +110,7 @@ cDvbTuner::~cDvbTuner()
{
active = false;
tunerStatus = tsIdle;
newSet.Broadcast();
newSet.Signal();
Cancel(3);
}
@ -122,7 +121,7 @@ bool cDvbTuner::IsTunedTo(const cChannel *Channel) const
void cDvbTuner::Set(const cChannel *Channel, bool Tune, bool UseCa)
{
cMutexLock MutexLock(&mutex);
Lock();
if (Tune)
tunerStatus = tsSet;
else if (tunerStatus == tsCam)
@ -131,7 +130,8 @@ void cDvbTuner::Set(const cChannel *Channel, bool Tune, bool UseCa)
if (Channel->Ca() && tunerStatus != tsCam)
startTime = time(NULL);
channel = *Channel;
newSet.Broadcast();
Unlock();
newSet.Signal();
}
static unsigned int FrequencyToHz(unsigned int f)
@ -252,7 +252,7 @@ void cDvbTuner::Action(void)
{
active = true;
while (active) {
cMutexLock MutexLock(&mutex);
Lock();
if (tunerStatus == tsSet)
tunerStatus = SetFrontend() ? tsTuned : tsIdle;
if (tunerStatus == tsTuned) {
@ -267,7 +267,6 @@ void cDvbTuner::Action(void)
if (event.status & FE_REINIT) {
tunerStatus = tsSet;
esyslog("ERROR: frontend %d was reinitialized - re-tuning", cardIndex);
continue;
}
}
}
@ -292,8 +291,9 @@ void cDvbTuner::Action(void)
else if (tunerStatus > tsLocked)
tunerStatus = tsLocked;
}
Unlock();
// in the beginning we loop more often to let the CAM connection start up fast
newSet.TimedWait(mutex, (ciHandler && (time(NULL) - startTime < 20)) ? 100 : 1000);
newSet.Wait((ciHandler && (time(NULL) - startTime < 20)) ? 100 : 1000);
}
}
@ -1101,29 +1101,17 @@ bool cDvbDevice::OpenDvr(void)
void cDvbDevice::CloseDvr(void)
{
if (fd_dvr >= 0) {
close(fd_dvr);
fd_dvr = -1;
delete tsBuffer;
tsBuffer = NULL;
close(fd_dvr);
fd_dvr = -1;
}
}
bool cDvbDevice::GetTSPacket(uchar *&Data)
{
if (tsBuffer) {
int r = tsBuffer->Read();
if (r >= 0) {
Data = tsBuffer->Get();
return true;
}
else if (FATALERRNO) {
if (errno == EOVERFLOW)
esyslog("ERROR: DVB driver buffer overflow on device %d", CardIndex() + 1);
else {
LOG_ERROR;
return false;
}
}
Data = tsBuffer->Get();
return true;
}
return false;

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: dvbplayer.c 1.24 2004/06/19 08:55:49 kls Exp $
* $Id: dvbplayer.c 1.25 2004/10/15 13:07:55 kls Exp $
*/
#include "dvbplayer.h"
@ -80,8 +80,7 @@ private:
int length;
bool hasData;
bool active;
cMutex mutex;
cCondVar newSet;
cCondWait newSet;
protected:
void Action(void);
public:
@ -106,20 +105,21 @@ cNonBlockingFileReader::cNonBlockingFileReader(void)
cNonBlockingFileReader::~cNonBlockingFileReader()
{
active = false;
newSet.Broadcast();
newSet.Signal();
Cancel(3);
free(buffer);
}
void cNonBlockingFileReader::Clear(void)
{
cMutexLock MutexLock(&mutex);
Lock();
f = -1;
free(buffer);
buffer = NULL;
wanted = length = 0;
hasData = false;
newSet.Broadcast();
Unlock();
newSet.Signal();
}
int cNonBlockingFileReader::Read(int FileHandle, uchar *Buffer, int Length)
@ -139,7 +139,7 @@ int cNonBlockingFileReader::Read(int FileHandle, uchar *Buffer, int Length)
wanted = Length;
length = 0;
hasData = false;
newSet.Broadcast();
newSet.Signal();
}
errno = EAGAIN;
return -1;
@ -149,7 +149,7 @@ void cNonBlockingFileReader::Action(void)
{
active = true;
while (active) {
cMutexLock MutexLock(&mutex);
Lock();
if (!hasData && f >= 0 && buffer) {
int r = safe_read(f, buffer + length, wanted - length);
if (r >= 0) {
@ -163,16 +163,14 @@ void cNonBlockingFileReader::Action(void)
hasData = true;
}
}
newSet.TimedWait(mutex, 1000);
Unlock();
newSet.Wait(1000);
}
}
// --- cDvbPlayer ------------------------------------------------------------
//XXX+ also used in recorder.c - find a better place???
// The size of the array used to buffer video data:
// (must be larger than MINVIDEODATA - see remux.h)
#define VIDEOBUFSIZE MEGABYTE(1)
#define PLAYERBUFSIZE MEGABYTE(1)
// The number of frames to back up when resuming an interrupted replay session:
#define RESUMEBACKUP (10 * FRAMESPERSEC)
@ -257,7 +255,7 @@ cDvbPlayer::cDvbPlayer(const char *FileName)
replayFile = fileName->Open();
if (replayFile < 0)
return;
ringBuffer = new cRingBufferFrame(VIDEOBUFSIZE);
ringBuffer = new cRingBufferFrame(PLAYERBUFSIZE);
// Create the index file:
index = new cIndexFile(FileName, false);
if (!index)

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: recorder.c 1.10 2004/03/20 10:33:21 kls Exp $
* $Id: recorder.c 1.11 2004/10/16 09:23:01 kls Exp $
*/
#include <stdarg.h>
@ -12,9 +12,7 @@
#include <unistd.h>
#include "recorder.h"
// The size of the array used to buffer video data:
// (must be larger than MINVIDEODATA - see remux.h)
#define VIDEOBUFSIZE MEGABYTE(5)
#define RECORDERBUFSIZE MEGABYTE(5)
// The maximum time we wait before assuming that a recorded video data stream
// is broken:
@ -23,25 +21,35 @@
#define MINFREEDISKSPACE (512) // MB
#define DISKCHECKINTERVAL 100 // seconds
cRecorder::cRecorder(const char *FileName, int Ca, int Priority, int VPid, int APid1, int APid2, int DPid1, int DPid2)
:cReceiver(Ca, Priority, Setup.RecordDolbyDigital ? 5 : 3, VPid, APid1, APid2, DPid1, DPid2)
,cThread("recording")
class cFileWriter : public cThread {
private:
cRemux *remux;
cFileName *fileName;
cIndexFile *index;
uchar pictureType;
int fileSize;
int recordFile;
bool active;
time_t lastDiskSpaceCheck;
bool RunningLowOnDiskSpace(void);
bool NextFile(void);
protected:
virtual void Action(void);
public:
cFileWriter(const char *FileName, cRemux *Remux);
virtual ~cFileWriter();
};
cFileWriter::cFileWriter(const char *FileName, cRemux *Remux)
:cThread("file writer")
{
ringBuffer = NULL;
remux = NULL;
active = false;
fileName = NULL;
remux = Remux;
index = NULL;
pictureType = NO_PICTURE;
fileSize = 0;
active = false;
lastDiskSpaceCheck = time(NULL);
// Make sure the disk is up and running:
SpinUpDisk(FileName);
ringBuffer = new cRingBufferLinear(VIDEOBUFSIZE, TS_SIZE * 2, true);
remux = new cRemux(VPid, APid1, APid2, DPid1, DPid2, true);
fileName = new cFileName(FileName, true);
recordFile = fileName->Open();
if (recordFile < 0)
@ -53,28 +61,15 @@ cRecorder::cRecorder(const char *FileName, int Ca, int Priority, int VPid, int A
// let's continue without index, so we'll at least have the recording
}
cRecorder::~cRecorder()
cFileWriter::~cFileWriter()
{
Detach();
active = false;
Cancel(3);
delete index;
delete fileName;
delete remux;
delete ringBuffer;
}
void cRecorder::Activate(bool On)
{
if (On) {
if (recordFile >= 0)
Start();
}
else if (active) {
active = false;
Cancel(3);
}
}
bool cRecorder::RunningLowOnDiskSpace(void)
bool cFileWriter::RunningLowOnDiskSpace(void)
{
if (time(NULL) > lastDiskSpaceCheck + DISKCHECKINTERVAL) {
int Free = FreeDiskSpaceMB(fileName->Name());
@ -87,7 +82,7 @@ bool cRecorder::RunningLowOnDiskSpace(void)
return false;
}
bool cRecorder::NextFile(void)
bool cFileWriter::NextFile(void)
{
if (recordFile >= 0 && pictureType == I_FRAME) { // every file shall start with an I_FRAME
if (fileSize > MEGABYTE(Setup.MaxVideoFileSize) || RunningLowOnDiskSpace()) {
@ -98,40 +93,29 @@ bool cRecorder::NextFile(void)
return recordFile >= 0;
}
void cRecorder::Receive(uchar *Data, int Length)
{
int p = ringBuffer->Put(Data, Length);
if (p != Length && active)
ringBuffer->ReportOverflow(Length - p);
}
void cRecorder::Action(void)
void cFileWriter::Action(void)
{
time_t t = time(NULL);
active = true;
while (active) {
int r;
const uchar *b = ringBuffer->Get(r);
if (b) {
int Count = r, Result;
uchar *p = remux->Process(b, Count, Result, &pictureType);
ringBuffer->Del(Count);
if (p) {
//XXX+ active??? see old version (Busy)
if (!active && pictureType == I_FRAME) // finish the recording before the next 'I' frame
int Count;
uchar *p = remux->Get(Count, &pictureType);
if (p) {
//XXX+ active??? see old version (Busy)
if (!active && pictureType == I_FRAME) // finish the recording before the next 'I' frame
break;
if (NextFile()) {
if (index && pictureType != NO_PICTURE)
index->Write(pictureType, fileName->Number(), fileSize);
if (safe_write(recordFile, p, Count) < 0) {
LOG_ERROR_STR(fileName->Name());
break;
if (NextFile()) {
if (index && pictureType != NO_PICTURE)
index->Write(pictureType, fileName->Number(), fileSize);
if (safe_write(recordFile, p, Result) < 0) {
LOG_ERROR_STR(fileName->Name());
break;
}
fileSize += Result;
}
else
break;
fileSize += Count;
remux->Del(Count);
}
else
break;
t = time(NULL);
}
else if (time(NULL) - t > MAXBROKENTIMEOUT) {
@ -139,7 +123,65 @@ void cRecorder::Action(void)
cThread::EmergencyExit(true);
t = time(NULL);
}
else
usleep(1); // this keeps the CPU load low
}
active = false;
}
cRecorder::cRecorder(const char *FileName, int Ca, int Priority, int VPid, int APid1, int APid2, int DPid1, int DPid2)
:cReceiver(Ca, Priority, Setup.RecordDolbyDigital ? 5 : 3, VPid, APid1, APid2, DPid1, DPid2)
,cThread("recording")
{
active = false;
// Make sure the disk is up and running:
SpinUpDisk(FileName);
ringBuffer = new cRingBufferLinear(RECORDERBUFSIZE, TS_SIZE * 2, true, "Recorder");
ringBuffer->SetTimeouts(0, 100);
remux = new cRemux(VPid, APid1, APid2, DPid1, DPid2, true);
writer = new cFileWriter(FileName, remux);
}
cRecorder::~cRecorder()
{
Detach();
delete writer;
delete remux;
delete ringBuffer;
}
void cRecorder::Activate(bool On)
{
if (On) {
writer->Start();
Start();
}
else if (active) {
active = false;
Cancel(3);
}
}
void cRecorder::Receive(uchar *Data, int Length)
{
if (active) {
int p = ringBuffer->Put(Data, Length);
if (p != Length && active)
ringBuffer->ReportOverflow(Length - p);
}
}
void cRecorder::Action(void)
{
active = true;
while (active) {
int r;
uchar *b = ringBuffer->Get(r);
if (b) {
int Count = remux->Put(b, r);
if (Count)
ringBuffer->Del(Count);
}
}
}

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: recorder.h 1.1 2002/06/10 16:30:00 kls Exp $
* $Id: recorder.h 1.2 2004/10/10 11:22:38 kls Exp $
*/
#ifndef __RECORDER_H
@ -16,19 +16,14 @@
#include "ringbuffer.h"
#include "thread.h"
class cFileWriter;
class cRecorder : public cReceiver, cThread {
private:
cRingBufferLinear *ringBuffer;
cRemux *remux;
cFileName *fileName;
cIndexFile *index;
uchar pictureType;
int fileSize;
int recordFile;
cFileWriter *writer;
bool active;
time_t lastDiskSpaceCheck;
bool RunningLowOnDiskSpace(void);
bool NextFile(void);
protected:
virtual void Activate(bool On);
virtual void Receive(uchar *Data, int Length);

326
remux.c
View File

@ -8,63 +8,9 @@
* the Linux DVB driver's 'tuxplayer' example and were rewritten to suit
* VDR's needs.
*
* $Id: remux.c 1.18 2004/02/14 10:40:37 kls Exp $
* $Id: remux.c 1.19 2004/10/16 09:11:52 kls Exp $
*/
/* The calling interface of the 'cRemux::Process()' function is defined
as follows:
'Data' points to a chunk of data that consists of 'Count' bytes.
The 'Process' function shall try to remultiplex as much of the
data as possible and return a pointer to the resulting buffer.
That buffer typically is different from the incoming 'Data',
but in the simplest case (when 'Process' does nothing) might
as well point to the original 'Data'. When returning, 'Count'
shall be set to the number of bytes that have been processed
(i.e. have been taken from 'Data'), while 'Result' indicates
how many bytes the returned buffer contains. 'PictureType' shall
be set to NO_PICTURE if the returned data does not start a new
picture, or one of I_FRAME, P_FRAME or B_FRAME if a new picture
starting point has been found. This also means that the returned
data buffer may contain at most one entire video frame, because
the next frame must be returned with its own value for 'PictureType'.
'Process' shall do it's best to keep the latency time as short
as possible in order to allow a quick start of VDR's "Transfer
mode" (displaying the signal of one DVB card on another card).
In order to do that, this function may decide to first pass
through the incoming data (almost) unprocessed, and make
actual processing kick in after a few seconds (if that is at
all possible for the algorithm). This may result in a non-
optimal stream at the beginning, which won't matter for normal
recordings but may make switching through encrypted channels
in "Transfer mode" faster.
In the resulting data stream, a new packet shall always be started
when a frame border is encountered. VDR needs this in order to
be able to detect and store the frame indexes, and to easily
display single frames in fast forward/back mode. The very first
data block returned shall be the starting point of an I_FRAME.
Everything before that shall be silently dropped.
If the incoming data is not enough to do remultiplexing, a value
of NULL shall be returned ('Result' has no meaning then). This
will tell the caller to wait for more data to be presented in
the next call. If NULL is returned and 'Count' is not 0, the
caller shall remove 'Count' bytes from the beginning of 'Data'
before the next call. This is the way 'Process' indicates that
it must skip that data.
Any data that is not used during this call will appear at the
beginning of the incoming 'Data' buffer at the next call, plus
any new data that has become available.
It is guaranteed that the caller will completely process any
returned data before the next call to 'Process'. That way, 'Process'
can dynamically allocate its return buffer and be sure the caller
doesn't keep any pointers into that buffer.
*/
#include "remux.h"
#include <stdlib.h>
#include "thread.h"
@ -133,8 +79,7 @@ private:
uint8_t check;
int which;
bool done;
uint8_t *resultBuffer;
int *resultCount;
cRingBufferLinear *resultBuffer;
int tsErrors;
int ccErrors;
int ccCounter;
@ -145,7 +90,7 @@ private:
void write_ipack(const uint8_t *Data, int Count);
void instant_repack(const uint8_t *Buf, int Count);
public:
cTS2PES(uint8_t *ResultBuffer, int *ResultCount, int Size, uint8_t AudioCid = 0x00);
cTS2PES(cRingBufferLinear *ResultBuffer, int Size, uint8_t AudioCid = 0x00);
~cTS2PES();
void ts_to_pes(const uint8_t *Buf); // don't need count (=188)
void Clear(void);
@ -153,10 +98,9 @@ public:
uint8_t cTS2PES::headr[] = { 0x00, 0x00, 0x01 };
cTS2PES::cTS2PES(uint8_t *ResultBuffer, int *ResultCount, int Size, uint8_t AudioCid)
cTS2PES::cTS2PES(cRingBufferLinear *ResultBuffer, int Size, uint8_t AudioCid)
{
resultBuffer = ResultBuffer;
resultCount = ResultCount;
size = Size;
audioCid = AudioCid;
@ -184,12 +128,9 @@ void cTS2PES::Clear(void)
void cTS2PES::store(uint8_t *Data, int Count)
{
if (*resultCount + Count > RESULTBUFFERSIZE) {
esyslog("ERROR: result buffer overflow (%d + %d > %d)", *resultCount, Count, RESULTBUFFERSIZE);
Count = RESULTBUFFERSIZE - *resultCount;
}
memcpy(resultBuffer + *resultCount, Data, Count);
*resultCount += Count;
int n = resultBuffer->Put(Data, Count);
if (n != Count)
esyslog("ERROR: result buffer overflow, dropped %d out of %d byte", Count - n, Count);
}
void cTS2PES::reset_ipack(void)
@ -452,6 +393,8 @@ void cTS2PES::ts_to_pes(const uint8_t *Buf) // don't need count (=188)
// --- cRemux ----------------------------------------------------------------
#define RESULTBUFFERSIZE KILOBYTE(256)
cRemux::cRemux(int VPid, int APid1, int APid2, int DPid1, int DPid2, bool ExitOnFailure)
{
vPid = VPid;
@ -463,13 +406,15 @@ cRemux::cRemux(int VPid, int APid1, int APid2, int DPid1, int DPid2, bool ExitOn
numUPTerrors = 0;
synced = false;
skipped = 0;
resultCount = resultDelivered = 0;
vTS2PES = new cTS2PES(resultBuffer, &resultCount, IPACKS);
aTS2PES1 = new cTS2PES(resultBuffer, &resultCount, IPACKS, 0xC0);
aTS2PES2 = aPid2 ? new cTS2PES(resultBuffer, &resultCount, IPACKS, 0xC1) : NULL;
dTS2PES1 = dPid1 ? new cTS2PES(resultBuffer, &resultCount, IPACKS) : NULL;
resultSkipped = 0;
resultBuffer = new cRingBufferLinear(RESULTBUFFERSIZE, IPACKS, false, "Result");
resultBuffer->SetTimeouts(0, 100);
vTS2PES = new cTS2PES(resultBuffer, IPACKS);
aTS2PES1 = new cTS2PES(resultBuffer, IPACKS, 0xC0);
aTS2PES2 = aPid2 ? new cTS2PES(resultBuffer, IPACKS, 0xC1) : NULL;
dTS2PES1 = dPid1 ? new cTS2PES(resultBuffer, IPACKS) : NULL;
//XXX don't yet know how to tell apart primary and secondary DD data...
dTS2PES2 = /*XXX dPid2 ? new cTS2PES(resultBuffer, &resultCount, IPACKS) : XXX*/ NULL;
dTS2PES2 = /*XXX dPid2 ? new cTS2PES(resultBuffer, IPACKS) : XXX*/ NULL;
}
cRemux::~cRemux()
@ -479,6 +424,7 @@ cRemux::~cRemux()
delete aTS2PES2;
delete dTS2PES1;
delete dTS2PES2;
delete resultBuffer;
}
int cRemux::GetPid(const uchar *Data)
@ -488,27 +434,32 @@ int cRemux::GetPid(const uchar *Data)
int cRemux::GetPacketLength(const uchar *Data, int Count, int Offset)
{
// Returns the entire length of the packet starting at offset, or -1 in case of error.
return (Offset + 5 < Count) ? (Data[Offset + 4] << 8) + Data[Offset + 5] + 6 : -1;
// Returns the length of the packet starting at Offset, or -1 if Count is
// too small to contain the entire packet.
int Length = (Offset + 5 < Count) ? (Data[Offset + 4] << 8) + Data[Offset + 5] + 6 : -1;
if (Length > 0 && Offset + Length <= Count)
return Length;
return -1;
}
int cRemux::ScanVideoPacket(const uchar *Data, int Count, int Offset, uchar &PictureType)
{
// Scans the video packet starting at Offset and returns its length.
// If the return value is -1 the packet was not completely in the buffer.
int Length = GetPacketLength(Data, Count, Offset);
if (Length > 0 && Offset + Length <= Count) {
int i = Offset + 8; // the minimum length of the video packet header
i += Data[i] + 1; // possible additional header bytes
for (; i < Offset + Length; i++) {
if (Data[i] == 0 && Data[i + 1] == 0 && Data[i + 2] == 1) {
switch (Data[i + 3]) {
case SC_PICTURE: PictureType = (Data[i + 5] >> 3) & 0x07;
return Length;
}
if (Length > 0) {
if (Length >= 8) {
int i = Offset + 8; // the minimum length of the video packet header
i += Data[i] + 1; // possible additional header bytes
for (; i < Offset + Length; i++) {
if (Data[i] == 0 && Data[i + 1] == 0 && Data[i + 2] == 1) {
switch (Data[i + 3]) {
case SC_PICTURE: PictureType = (Data[i + 5] >> 3) & 0x07;
return Length;
}
}
}
}
}
PictureType = NO_PICTURE;
return Length;
}
@ -517,28 +468,8 @@ int cRemux::ScanVideoPacket(const uchar *Data, int Count, int Offset, uchar &Pic
#define TS_SYNC_BYTE 0x47
uchar *cRemux::Process(const uchar *Data, int &Count, int &Result, uchar *PictureType)
int cRemux::Put(const uchar *Data, int Count)
{
uchar dummyPictureType;
if (!PictureType)
PictureType = &dummyPictureType;
/*XXX
// test recording the raw TS:
Result = Count;
*PictureType = I_FRAME;
return Data;
XXX*/
// Remove any previously delivered data from the result buffer:
if (resultDelivered) {
if (resultDelivered < resultCount)
memmove(resultBuffer, resultBuffer + resultDelivered, resultCount - resultDelivered);
resultCount -= resultDelivered;
resultDelivered = 0;
}
int used = 0;
// Make sure we are looking at a TS packet:
@ -560,6 +491,8 @@ XXX*/
break;
if (Data[i] != TS_SYNC_BYTE)
break;
if (resultBuffer->Free() < IPACKS)
break;
int pid = GetPid(Data + i + 1);
if (Data[i + 3] & 0x10) { // got payload
if (pid == vPid) vTS2PES->ts_to_pes(Data + i);
@ -569,31 +502,9 @@ XXX*/
else if (pid == dPid2 && dTS2PES2) dTS2PES2->ts_to_pes(Data + i);
}
used += TS_SIZE;
if (resultCount > (int)sizeof(resultBuffer) / 2)
break;
}
Count = used;
/*XXX
// test recording without determining the real frame borders:
*PictureType = I_FRAME;
Result = resultDelivered = resultCount;
return Result ? resultBuffer : NULL;
XXX*/
// Special VPID case to enable recording radio channels:
if (vPid == 0 || vPid == 1 || vPid == 0x1FFF) {
// XXX actually '0' should be enough, but '1' must be used with encrypted channels (driver bug?)
// XXX also allowing 0x1FFF to not break Michael Paar's original patch,
// XXX but it would probably be best to only use '0'
*PictureType = I_FRAME;
Result = resultDelivered = resultCount;
return Result ? resultBuffer : NULL;
}
// Check if we're getting anywhere here:
if (!synced && skipped >= 0) {
if (skipped > MAXNONUSEFULDATA) {
esyslog("ERROR: no useful data seen within %d byte of video stream", skipped);
@ -602,77 +513,112 @@ XXX*/
cThread::EmergencyExit(true);
}
else
skipped += Count;
skipped += used;
}
return used;
}
uchar *cRemux::Get(int &Count, uchar *PictureType)
{
// Remove any previously skipped data from the result buffer:
if (resultSkipped > 0) {
resultBuffer->Del(resultSkipped);
resultSkipped = 0;
}
#if 0
// Test recording without determining the real frame borders:
if (PictureType)
*PictureType = I_FRAME;
return resultBuffer->Get(Count);
#endif
// Special VPID case to enable recording radio channels:
if (vPid == 0 || vPid == 1 || vPid == 0x1FFF) {
// XXX actually '0' should be enough, but '1' must be used with encrypted channels (driver bug?)
// XXX also allowing 0x1FFF to not break Michael Paar's original patch,
// XXX but it would probably be best to only use '0'
if (PictureType)
*PictureType = I_FRAME;
return resultBuffer->Get(Count);
}
// Check for frame borders:
*PictureType = NO_PICTURE;
if (PictureType)
*PictureType = NO_PICTURE;
if (resultCount >= MINVIDEODATA) {
for (int i = 0; i < resultCount; i++) {
if (resultBuffer[i] == 0 && resultBuffer[i + 1] == 0 && resultBuffer[i + 2] == 1) {
switch (resultBuffer[i + 3]) {
case VIDEO_STREAM_S ... VIDEO_STREAM_E:
{
uchar pt = NO_PICTURE;
int l = ScanVideoPacket(resultBuffer, resultCount, i, pt);
if (l < 0)
return NULL; // no useful data found, wait for more
if (pt != NO_PICTURE) {
if (pt < I_FRAME || B_FRAME < pt) {
esyslog("ERROR: unknown picture type '%d'", pt);
if (++numUPTerrors > MAXNUMUPTERRORS && exitOnFailure)
cThread::EmergencyExit(true);
}
else if (!synced) {
if (pt == I_FRAME) {
resultDelivered = i; // will drop everything before this position
SetBrokenLink(resultBuffer + i, l);
synced = true;
}
else {
resultDelivered = i + l; // will drop everything before and including this packet
return NULL;
}
}
Count = 0;
uchar *resultData = NULL;
int resultCount = 0;
uchar *data = resultBuffer->Get(resultCount);
if (data) {
for (int i = 0; i < resultCount - 3; i++) {
if (data[i] == 0 && data[i + 1] == 0 && data[i + 2] == 1) {
int l = 0;
uchar StreamType = data[i + 3];
if (VIDEO_STREAM_S <= StreamType && StreamType <= VIDEO_STREAM_E) {
uchar pt = NO_PICTURE;
l = ScanVideoPacket(data, resultCount, i, pt);
if (l < 0)
return resultData;
if (pt != NO_PICTURE) {
if (pt < I_FRAME || B_FRAME < pt) {
esyslog("ERROR: unknown picture type '%d'", pt);
if (++numUPTerrors > MAXNUMUPTERRORS && exitOnFailure)
cThread::EmergencyExit(true);
}
else if (!synced) {
if (pt == I_FRAME) {
if (PictureType)
*PictureType = pt;
resultSkipped = i; // will drop everything before this position
SetBrokenLink(data + i, l);
synced = true;
}
if (synced) {
*PictureType = pt;
Result = l;
uchar *p = resultBuffer + resultDelivered;
resultDelivered += l;
return p;
}
else {
resultDelivered = i + l; // will drop everything before and including this packet
return NULL;
}
}
break;
case PRIVATE_STREAM1:
case AUDIO_STREAM_S ... AUDIO_STREAM_E:
{
int l = GetPacketLength(resultBuffer, resultCount, i);
if (l < 0)
return NULL; // no useful data found, wait for more
if (synced) {
Result = l;
uchar *p = resultBuffer + resultDelivered;
resultDelivered += l;
return p;
}
else {
resultDelivered = i + l; // will drop everything before and including this packet
return NULL;
}
}
break;
}
}
else if (Count)
return resultData;
else if (PictureType)
*PictureType = pt;
}
}
else { //if (AUDIO_STREAM_S <= StreamType && StreamType <= AUDIO_STREAM_E || StreamType == PRIVATE_STREAM1) {
l = GetPacketLength(data, resultCount, i);
if (l < 0)
return resultData;
}
if (synced) {
if (!Count)
resultData = data + i;
Count += l;
}
else
resultSkipped = i;
if (l > 0)
i += l - 1; // the loop increments, too
}
}
}
return NULL; // no useful data found, wait for more
return resultData;
}
void cRemux::Del(int Count)
{
resultBuffer->Del(Count);
}
void cRemux::Clear(void)
{
if (vTS2PES) vTS2PES->Clear();
if (aTS2PES1) aTS2PES1->Clear();
if (aTS2PES2) aTS2PES2->Clear();
if (dTS2PES1) dTS2PES1->Clear();
if (dTS2PES2) dTS2PES2->Clear();
resultBuffer->Clear();
}
void cRemux::SetBrokenLink(uchar *Data, int Length)

30
remux.h
View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: remux.h 1.11 2004/02/14 10:40:41 kls Exp $
* $Id: remux.h 1.12 2004/10/15 12:31:16 kls Exp $
*/
#ifndef __REMUX_H
@ -12,6 +12,7 @@
#include <time.h> //XXX FIXME: DVB/linux/dvb/dmx.h should include <time.h> itself!!!
#include <linux/dvb/dmx.h>
#include "ringbuffer.h"
#include "tools.h"
// Picture types:
@ -20,11 +21,6 @@
#define P_FRAME 2
#define B_FRAME 3
// The minimum amount of video data necessary to identify frames:
#define MINVIDEODATA (16*1024) // just a safe guess (max. size of any frame block, plus some safety)
#define RESULTBUFFERSIZE (MINVIDEODATA * 4)
class cTS2PES;
class cRemux {
@ -35,16 +31,30 @@ private:
int skipped;
int vPid, aPid1, aPid2, dPid1, dPid2;
cTS2PES *vTS2PES, *aTS2PES1, *aTS2PES2, *dTS2PES1, *dTS2PES2;
uchar resultBuffer[RESULTBUFFERSIZE];
int resultCount;
int resultDelivered;
cRingBufferLinear *resultBuffer;
int resultSkipped;
int GetPid(const uchar *Data);
int GetPacketLength(const uchar *Data, int Count, int Offset);
int ScanVideoPacket(const uchar *Data, int Count, int Offset, uchar &PictureType);
public:
cRemux(int VPid, int APid1, int APid2, int DPid1, int DPid2, bool ExitOnFailure = false);
~cRemux();
uchar *Process(const uchar *Data, int &Count, int &Result, uchar *PictureType = NULL);
int Put(const uchar *Data, int Count);
///< Puts at most Count bytes of Data into the remuxer.
///< \return Returns the number of bytes actually consumed from Data.
uchar *Get(int &Count, uchar *PictureType = NULL);
///< Gets all currently available data from the remuxer.
///< \return Count contains the number of bytes the result points to, and
///< PictureType (if not NULL) will contain one of NO_PICTURE, I_FRAME, P_FRAME
///< or B_FRAME.
void Del(int Count);
///< Deletes Count bytes from the remuxer. Count must be the number returned
///< from a previous call to Get(). Several calls to Del() with fractions of
///< a previously returned Count may be made, but the total sum of all Count
///< values must be exactly what the previous Get() has returned.
void Clear(void);
///< Clears the remuxer of all data it might still contain, keeping the PID
///< settings as they are.
static void SetBrokenLink(uchar *Data, int Length);
};

View File

@ -7,7 +7,7 @@
* Parts of this file were inspired by the 'ringbuffy.c' from the
* LinuxDVB driver (see linuxtv.org).
*
* $Id: ringbuffer.c 1.20 2004/06/19 12:27:56 kls Exp $
* $Id: ringbuffer.c 1.21 2004/10/15 13:49:25 kls Exp $
*/
#include "ringbuffer.h"
@ -18,11 +18,14 @@
// --- cRingBuffer -----------------------------------------------------------
#define OVERFLOWREPORTDELTA 5 // seconds between reports
#define PERCENTAGEDELTA 10
#define PERCENTAGETHRESHOLD 70
cRingBuffer::cRingBuffer(int Size, bool Statistics)
{
size = Size;
statistics = Statistics;
getThreadTid = 0;
maxFill = 0;
lastPercent = 0;
putTimeout = getTimeout = 0;
@ -36,34 +39,41 @@ cRingBuffer::~cRingBuffer()
dsyslog("buffer stats: %d (%d%%) used", maxFill, maxFill * 100 / (size - 1));
}
void cRingBuffer::UpdatePercentage(int Fill)
{
if (Fill > maxFill)
maxFill = Fill;
int percent = Fill * 100 / (Size() - 1) / PERCENTAGEDELTA * PERCENTAGEDELTA;
if (percent != lastPercent) {
if (percent >= PERCENTAGETHRESHOLD && percent > lastPercent || percent < PERCENTAGETHRESHOLD && lastPercent >= PERCENTAGETHRESHOLD) {
dsyslog("buffer usage: %d%% (tid=%ld)", percent, getThreadTid);
lastPercent = percent;
}
}
}
void cRingBuffer::WaitForPut(void)
{
if (putTimeout) {
putMutex.Lock();
readyForPut.TimedWait(putMutex, putTimeout);
putMutex.Unlock();
}
if (putTimeout)
readyForPut.Wait(putTimeout);
}
void cRingBuffer::WaitForGet(void)
{
if (getTimeout) {
getMutex.Lock();
readyForGet.TimedWait(getMutex, getTimeout);
getMutex.Unlock();
}
if (getTimeout)
readyForGet.Wait(getTimeout);
}
void cRingBuffer::EnablePut(void)
{
if (putTimeout)
readyForPut.Broadcast();
if (putTimeout && Free() > Size() / 3)
readyForPut.Signal();
}
void cRingBuffer::EnableGet(void)
{
if (getTimeout)
readyForGet.Broadcast();
if (getTimeout && Available() > Size() / 3)
readyForGet.Signal();
}
void cRingBuffer::SetTimeouts(int PutTimeout, int GetTimeout)
@ -85,70 +95,168 @@ void cRingBuffer::ReportOverflow(int Bytes)
// --- cRingBufferLinear -----------------------------------------------------
cRingBufferLinear::cRingBufferLinear(int Size, int Margin, bool Statistics)
#ifdef DEBUGRINGBUFFERS
#define MAXRBLS 30
#define DEBUGRBLWIDTH 45
cRingBufferLinear *cRingBufferLinear::RBLS[MAXRBLS] = { NULL };
void cRingBufferLinear::AddDebugRBL(cRingBufferLinear *RBL)
{
for (int i = 0; i < MAXRBLS; i++) {
if (!RBLS[i]) {
RBLS[i] = RBL;
break;
}
}
}
void cRingBufferLinear::DelDebugRBL(cRingBufferLinear *RBL)
{
for (int i = 0; i < MAXRBLS; i++) {
if (RBLS[i] == RBL) {
RBLS[i] = NULL;
break;
}
}
}
void cRingBufferLinear::PrintDebugRBL(void)
{
bool printed = false;
for (int i = 0; i < MAXRBLS; i++) {
cRingBufferLinear *p = RBLS[i];
if (p) {
printed = true;
int lh = p->lastHead;
int lt = p->lastTail;
int h = lh * DEBUGRBLWIDTH / p->Size();
int t = lt * DEBUGRBLWIDTH / p->Size();
char buf[DEBUGRBLWIDTH + 10];
memset(buf, '-', DEBUGRBLWIDTH);
if (lt <= lh)
memset(buf + t, '*', max(h - t, 1));
else {
memset(buf, '*', h);
memset(buf + t, '*', DEBUGRBLWIDTH - t);
}
buf[t] = '<';
buf[h] = '>';
buf[DEBUGRBLWIDTH] = 0;
printf("%2d %s %8d %8d %s\n", i, buf, p->lastPut, p->lastGet, p->description);
}
}
if (printed)
printf("\n");
}
#endif
cRingBufferLinear::cRingBufferLinear(int Size, int Margin, bool Statistics, const char *Description)
:cRingBuffer(Size, Statistics)
{
margin = Margin;
description = Description ? strdup(Description) : NULL;
tail = head = margin = Margin;
buffer = NULL;
getThreadTid = 0;
if (Size > 1) { // 'Size - 1' must not be 0!
buffer = MALLOC(uchar, Size);
if (!buffer)
esyslog("ERROR: can't allocate ring buffer (size=%d)", Size);
Clear();
if (Margin <= Size / 2) {
buffer = MALLOC(uchar, Size);
if (!buffer)
esyslog("ERROR: can't allocate ring buffer (size=%d)", Size);
Clear();
}
else
esyslog("ERROR: illegal margin for ring buffer (%d > %d)", Margin, Size / 2);
}
else
esyslog("ERROR: illegal size for ring buffer (%d)", Size);
#ifdef DEBUGRINGBUFFERS
lastHead = head;
lastTail = tail;
lastPut = lastGet = -1;
AddDebugRBL(this);
#endif
}
cRingBufferLinear::~cRingBufferLinear()
{
#ifdef DEBUGRINGBUFFERS
DelDebugRBL(this);
#endif
free(buffer);
free(description);
}
int cRingBufferLinear::Available(void)
{
Lock();
int diff = head - tail;
Unlock();
return (diff >= 0) ? diff : Size() + diff - margin;
}
void cRingBufferLinear::Clear(void)
{
Lock();
head = tail = margin;
lastGet = -1;
Unlock();
tail = head;
#ifdef DEBUGRINGBUFFERS
lastHead = head;
lastTail = tail;
lastPut = lastGet = -1;
#endif
maxFill = 0;
EnablePut();
}
int cRingBufferLinear::Read(int FileHandle, int Max)
{
int Tail = tail;
int diff = Tail - head;
int free = (diff > 0) ? diff - 1 : Size() - head;
if (Tail <= margin)
free--;
int Count = 0;
if (free > 0) {
if (0 < Max && Max < free)
free = Max;
Count = safe_read(FileHandle, buffer + head, free);
if (Count > 0) {
int Head = head + Count;
if (Head >= Size())
Head = margin;
head = Head;
if (statistics) {
int fill = head - Tail;
if (fill < 0)
fill = Size() + fill;
else if (fill >= Size())
fill = Size() - 1;
UpdatePercentage(fill);
}
}
}
#ifdef DEBUGRINGBUFFERS
lastHead = head;
lastPut = Count;
#endif
EnableGet();
if (free == 0)
WaitForPut();
return Count;
}
int cRingBufferLinear::Put(const uchar *Data, int Count)
{
if (Count > 0) {
Lock();
int Tail = tail;
int rest = Size() - head;
int diff = tail - head;
int free = ((tail < margin) ? rest : (diff > 0) ? diff : Size() + diff - margin) - 1;
int diff = Tail - head;
int free = ((Tail < margin) ? rest : (diff > 0) ? diff : Size() + diff - margin) - 1;
if (statistics) {
int fill = Size() - free - 1 + Count;
if (fill >= Size())
fill = Size() - 1;
if (fill > maxFill)
maxFill = fill;
int percent = maxFill * 100 / (Size() - 1) / 5 * 5;
if (abs(lastPercent - percent) >= 5) {
if (percent > 75)
dsyslog("buffer usage: %d%% (tid=%ld)", percent, getThreadTid);
lastPercent = percent;
}
UpdatePercentage(fill);
}
if (free > 0) {
if (free < Count)
Count = free;
if (Count > maxFill)
maxFill = Count;
if (Count >= rest) {
memcpy(buffer + head, Data, rest);
if (Count - rest)
@ -162,7 +270,10 @@ int cRingBufferLinear::Put(const uchar *Data, int Count)
}
else
Count = 0;
Unlock();
#ifdef DEBUGRINGBUFFERS
lastHead = head;
lastPut = Count;
#endif
EnableGet();
if (Count == 0)
WaitForPut();
@ -173,25 +284,24 @@ int cRingBufferLinear::Put(const uchar *Data, int Count)
uchar *cRingBufferLinear::Get(int &Count)
{
uchar *p = NULL;
Lock();
int Head = head;
if (getThreadTid <= 0)
getThreadTid = pthread_self();
int rest = Size() - tail;
if (rest < margin && head < tail) {
if (rest < margin && Head < tail) {
int t = margin - rest;
memcpy(buffer + t, buffer + tail, rest);
tail = t;
rest = head - tail;
rest = Head - tail;
}
int diff = head - tail;
int diff = Head - tail;
int cont = (diff >= 0) ? diff : Size() + diff - margin;
if (cont > rest)
cont = rest;
if (cont >= margin) {
p = buffer + tail;
Count = lastGet = cont;
Count = gotten = cont;
}
Unlock();
if (!p)
WaitForGet();
return p;
@ -199,17 +309,23 @@ uchar *cRingBufferLinear::Get(int &Count)
void cRingBufferLinear::Del(int Count)
{
if (Count > 0 && Count <= lastGet) {
Lock();
tail += Count;
lastGet -= Count;
if (tail >= Size())
tail = margin;
Unlock();
if (Count > gotten) {
esyslog("ERROR: invalid Count in cRingBufferLinear::Del: %d (limited to %d)", Count, gotten);
Count = gotten;
}
if (Count > 0) {
int Tail = tail;
Tail += Count;
gotten -= Count;
if (Tail >= Size())
Tail = margin;
tail = Tail;
EnablePut();
}
else
esyslog("ERROR: invalid Count in cRingBufferLinear::Del: %d", Count);
#ifdef DEBUGRINGBUFFERS
lastTail = tail;
lastGet = Count;
#endif
}
// --- cFrame ----------------------------------------------------------------

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: ringbuffer.h 1.15 2004/06/19 10:32:15 kls Exp $
* $Id: ringbuffer.h 1.16 2004/10/15 13:50:46 kls Exp $
*/
#ifndef __RINGBUFFER_H
@ -15,9 +15,7 @@
class cRingBuffer {
private:
cMutex mutex;
cCondVar readyForPut, readyForGet;
cMutex putMutex, getMutex;
cCondWait readyForPut, readyForGet;
int putTimeout;
int getTimeout;
int size;
@ -25,18 +23,18 @@ private:
int overflowCount;
int overflowBytes;
protected:
pthread_t getThreadTid;
int maxFill;//XXX
int lastPercent;
bool statistics;//XXX
void UpdatePercentage(int Fill);
void WaitForPut(void);
void WaitForGet(void);
void EnablePut(void);
void EnableGet(void);
virtual void Clear(void) = 0;
virtual int Available(void) = 0;
int Free(void) { return size - Available() - 1; }
void Lock(void) { mutex.Lock(); }
void Unlock(void) { mutex.Unlock(); }
virtual int Free(void) { return Size() - Available() - 1; }
int Size(void) { return size; }
public:
cRingBuffer(int Size, bool Statistics = false);
@ -46,20 +44,39 @@ public:
};
class cRingBufferLinear : public cRingBuffer {
//#define DEBUGRINGBUFFERS
#ifdef DEBUGRINGBUFFERS
private:
int lastHead, lastTail;
int lastPut, lastGet;
static cRingBufferLinear *RBLS[];
static void AddDebugRBL(cRingBufferLinear *RBL);
static void DelDebugRBL(cRingBufferLinear *RBL);
public:
static void PrintDebugRBL(void);
#endif
private:
int margin, head, tail;
int lastGet;
int gotten;
uchar *buffer;
pthread_t getThreadTid;
char *description;
public:
cRingBufferLinear(int Size, int Margin = 0, bool Statistics = false);
cRingBufferLinear(int Size, int Margin = 0, bool Statistics = false, const char *Description = NULL);
///< Creates a linear ring buffer.
///< The buffer will be able to hold at most Size-Margin-1 bytes of data, and will
///< be guaranteed to return at least Margin bytes in one consecutive block.
///< The optional Description is used for debugging only.
virtual ~cRingBufferLinear();
virtual int Available(void);
virtual int Free(void) { return Size() - Available() - 1 - margin; }
virtual void Clear(void);
///< Immediately clears the ring buffer.
int Read(int FileHandle, int Max = 0);
///< Reads at most Max bytes from FileHandle and stores them in the
///< ring buffer. If Max is 0, reads as many bytes as possible.
///< Only one actual read() call is done.
///< \return Returns the number of bytes actually read and stored, or
///< an error value from the actual read() call.
int Put(const uchar *Data, int Count);
///< Puts at most Count bytes of Data into the ring buffer.
///< \return Returns the number of bytes actually stored.
@ -67,7 +84,7 @@ public:
///< Gets data from the ring buffer.
///< The data will remain in the buffer until a call to Del() deletes it.
///< \return Returns a pointer to the data, and stores the number of bytes
///< actually retrieved in Count. If the returned pointer is NULL, Count has no meaning.
///< actually available in Count. If the returned pointer is NULL, Count has no meaning.
void Del(int Count);
///< Deletes at most Count bytes from the ring buffer.
///< Count must be less or equal to the number that was returned by a previous
@ -98,9 +115,12 @@ public:
class cRingBufferFrame : public cRingBuffer {
private:
cMutex mutex;
cFrame *head;
int currentFill;
void Delete(cFrame *Frame);
void Lock(void) { mutex.Lock(); }
void Unlock(void) { mutex.Unlock(); }
public:
cRingBufferFrame(int Size, bool Statistics = false);
virtual ~cRingBufferFrame();

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: thread.c 1.31 2004/03/14 16:48:30 kls Exp $
* $Id: thread.c 1.32 2004/10/15 13:15:02 kls Exp $
*/
#include "thread.h"
@ -18,6 +18,58 @@
#include <unistd.h>
#include "tools.h"
// --- cCondWait -------------------------------------------------------------
cCondWait::cCondWait(void)
{
signaled = false;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
}
cCondWait::~cCondWait()
{
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
}
bool cCondWait::Wait(int TimeoutMs)
{
pthread_mutex_lock(&mutex);
if (!signaled) {
if (TimeoutMs) {
struct timeval now;
if (gettimeofday(&now, NULL) == 0) { // get current time
now.tv_usec += TimeoutMs * 1000; // add the timeout
int sec = now.tv_usec / 1000000;
now.tv_sec += sec;
now.tv_usec -= sec * 1000000;
struct timespec abstime; // build timespec for timedwait
abstime.tv_sec = now.tv_sec; // seconds
abstime.tv_nsec = now.tv_usec * 1000; // nano seconds
while (!signaled) {
if (pthread_cond_timedwait(&cond, &mutex, &abstime) == ETIMEDOUT)
break;
}
}
}
else
pthread_cond_wait(&cond, &mutex);
}
bool r = signaled;
signaled = false;
pthread_mutex_unlock(&mutex);
return r;
}
void cCondWait::Signal(void)
{
pthread_mutex_lock(&mutex);
signaled = true;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
// --- cCondVar --------------------------------------------------------------
cCondVar::cCondVar(void)
@ -73,13 +125,6 @@ void cCondVar::Broadcast(void)
pthread_cond_broadcast(&cond);
}
/*
void cCondVar::Signal(void)
{
pthread_cond_signal(&cond);
}
*/
// --- cRwLock ---------------------------------------------------------------
cRwLock::cRwLock(bool PreferWriter)

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: thread.h 1.20 2004/01/03 16:58:50 kls Exp $
* $Id: thread.h 1.21 2004/10/15 13:16:39 kls Exp $
*/
#ifndef __THREAD_H
@ -14,6 +14,23 @@
#include <stdio.h>
#include <sys/types.h>
class cCondWait {
private:
pthread_mutex_t mutex;
pthread_cond_t cond;
bool signaled;
public:
cCondWait(void);
~cCondWait();
bool Wait(int TimeoutMs = 0);
///< Waits at most TimeoutMs milliseconds for a call to Signal(), or
///< forever if TimeoutMs is 0.
///< \return Returns true if Signal() has been called, false it the given
///< timeout has expired.
void Signal(void);
///< Signals a caller of Wait() that the condition it is waiting for is met.
};
class cMutex;
class cCondVar {
@ -25,7 +42,6 @@ public:
void Wait(cMutex &Mutex);
bool TimedWait(cMutex &Mutex, int TimeoutMs);
void Broadcast(void);
//void Signal(void);
};
class cRwLock {

View File

@ -4,15 +4,12 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: transfer.c 1.16 2004/03/07 14:40:15 kls Exp $
* $Id: transfer.c 1.17 2004/10/16 09:22:58 kls Exp $
*/
#include "transfer.h"
//XXX+ also used in recorder.c - find a better place???
// The size of the array used to buffer video data:
// (must be larger than MINVIDEODATA - see remux.h)
#define VIDEOBUFSIZE MEGABYTE(1)
#define TRANSFERBUFSIZE MEGABYTE(2)
#define POLLTIMEOUTS_BEFORE_DEVICECLEAR 3
// --- cTransfer -------------------------------------------------------------
@ -21,11 +18,10 @@ cTransfer::cTransfer(int VPid, int APid1, int APid2, int DPid1, int DPid2)
:cReceiver(0, -1, 5, VPid, APid1, APid2, DPid1, DPid2)
,cThread("transfer")
{
ringBuffer = new cRingBufferLinear(VIDEOBUFSIZE, TS_SIZE * 2, true);
ringBuffer = new cRingBufferLinear(TRANSFERBUFSIZE, TS_SIZE * 2, true, "Transfer");
remux = new cRemux(VPid, APid1, APid2, DPid1, DPid2);
canToggleAudioTrack = false;
audioTrack = 0xC0;
gotBufferReserve = false;
active = false;
}
@ -51,79 +47,69 @@ void cTransfer::Activate(bool On)
void cTransfer::Receive(uchar *Data, int Length)
{
if (IsAttached()) {
int i = 0;
while (active && Length > 0) {
if (i++ > 10) {
ringBuffer->ReportOverflow(Length);
break;
}
int p = ringBuffer->Put(Data, Length);
Length -= p;
Data += p;
}
if (IsAttached() && active) {
int p = ringBuffer->Put(Data, Length);
if (p != Length && active)
ringBuffer->ReportOverflow(Length - p);
return;
}
}
void cTransfer::Action(void)
{
int PollTimeouts = 0;
uchar *p = NULL;
int Result = 0;
active = true;
while (active) {
//XXX+ Maybe we need this to avoid buffer underruns in driver.
//XXX+ But then again, it appears to play just fine without this...
/*
if (!gotBufferReserve) {
if (ringBuffer->Available() < 4 * MAXFRAMESIZE) {
usleep(100000); // allow the buffer to collect some reserve
int Count;
uchar *b = ringBuffer->Get(Count);
if (b) {
if (Count > TRANSFERBUFSIZE * 2 / 3) {
// If the buffer runs full, we have no chance of ever catching up
// since the data comes in at the same rate as it goes out (it's "live").
// So let's clear the buffer instead of suffering from permanent
// overflows.
dsyslog("clearing transfer buffer to avoid overflows");
ringBuffer->Clear();
remux->Clear();
p = NULL;
continue;
}
else
gotBufferReserve = true;
Count = remux->Put(b, Count);
if (Count)
ringBuffer->Del(Count);
}
*/
// Get data from the buffer:
int r;
const uchar *b = ringBuffer->Get(r);
// Play the data:
if (b) {
int Count = r, Result;
uchar *p = remux->Process(b, Count, Result);
ringBuffer->Del(Count);
if (p) {
StripAudioPackets(p, Result, audioTrack);
while (Result > 0 && active) {
cPoller Poller;
if (DevicePoll(Poller, 100)) {
PollTimeouts = 0;
int w = PlayVideo(p, Result);
if (w > 0) {
p += w;
Result -= w;
}
else if (w < 0 && FATALERRNO) {
LOG_ERROR;
break;
}
}
else {
PollTimeouts++;
if (PollTimeouts == POLLTIMEOUTS_BEFORE_DEVICECLEAR) {
dsyslog("clearing device because of consecutive poll timeouts");
DeviceClear();
}
}
}
if (!p && (p = remux->Get(Result)) != NULL)
StripAudioPackets(p, Result, audioTrack);
if (p) {
cPoller Poller;
if (DevicePoll(Poller, 100)) {
PollTimeouts = 0;
int w = PlayVideo(p, Result);
if (w > 0) {
p += w;
Result -= w;
remux->Del(w);
if (Result <= 0)
p = NULL;
}
else if (w < 0 && FATALERRNO)
LOG_ERROR;
}
else {
PollTimeouts++;
if (PollTimeouts == POLLTIMEOUTS_BEFORE_DEVICECLEAR) {
dsyslog("clearing device because of consecutive poll timeouts");
DeviceClear();
ringBuffer->Clear();
remux->Clear();
p = NULL;
}
}
}
else
usleep(1); // this keeps the CPU load low
}
active = false;
}
void cTransfer::StripAudioPackets(uchar *b, int Length, uchar Except)

View File

@ -4,7 +4,7 @@
* See the main source file 'vdr.c' for copyright information and
* how to reach the author.
*
* $Id: transfer.h 1.4 2003/05/11 08:48:36 kls Exp $
* $Id: transfer.h 1.5 2004/10/15 12:39:54 kls Exp $
*/
#ifndef __TRANSFER_H
@ -22,7 +22,6 @@ private:
cRemux *remux;
bool canToggleAudioTrack;
uchar audioTrack;
bool gotBufferReserve;
bool active;
void StripAudioPackets(uchar *b, int Length, uchar Except = 0x00);
protected:

5
vdr.c
View File

@ -22,7 +22,7 @@
*
* The project's page is at http://www.cadsoft.de/vdr
*
* $Id: vdr.c 1.185 2004/07/27 07:21:22 kls Exp $
* $Id: vdr.c 1.186 2004/10/10 12:47:56 kls Exp $
*/
#include <getopt.h>
@ -516,6 +516,9 @@ int main(int argc, char *argv[])
esyslog("emergency exit requested - shutting down");
break;
}
#ifdef DEBUGRINGBUFFERS
cRingBufferLinear::PrintDebugRBL();
#endif
// Attach launched player control:
cControl::Attach();
// Make sure we have a visible programme in case device usage has changed: