- transfer

This commit is contained in:
lordjaxom 2005-02-08 19:54:52 +00:00
parent b2b925d1a9
commit 78b9b7c6ba
7 changed files with 148 additions and 73 deletions

View File

@ -1,5 +1,5 @@
/* /*
* $Id: connectionHTTP.c,v 1.4 2005/02/08 17:22:35 lordjaxom Exp $ * $Id: connectionHTTP.c,v 1.5 2005/02/08 19:54:52 lordjaxom Exp $
*/ */
#include "server/connectionHTTP.h" #include "server/connectionHTTP.h"
@ -56,8 +56,8 @@ bool cConnectionHTTP::Command(char *Cmd) {
cDevice *device = GetDevice(m_Channel, 0); cDevice *device = GetDevice(m_Channel, 0);
if (device != NULL) { if (device != NULL) {
device->SwitchChannel(m_Channel, false); device->SwitchChannel(m_Channel, false);
m_LiveStreamer->SetDevice(device);
if (m_LiveStreamer->SetChannel(m_Channel, m_StreamType)) { if (m_LiveStreamer->SetChannel(m_Channel, m_StreamType)) {
m_LiveStreamer->SetDevice(device);
m_Startup = true; m_Startup = true;
if (m_StreamType == stES && (m_Channel->Vpid() == 0 if (m_StreamType == stES && (m_Channel->Vpid() == 0
|| m_Channel->Vpid() == 1 || m_Channel->Vpid() == 0x1FFF)) { || m_Channel->Vpid() == 1 || m_Channel->Vpid() == 0x1FFF)) {

View File

@ -18,23 +18,25 @@ cStreamdevLiveReceiver::~cStreamdevLiveReceiver()
Detach(); Detach();
} }
void cStreamdevLiveReceiver::Activate(bool On)
{
m_Streamer->Activate(On);
}
void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) { void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) {
int p = m_Streamer->Put(Data, Length); int p = m_Streamer->Receive(Data, Length);
if (p != Length) if (p != Length)
m_Streamer->ReportOverflow(Length - p); m_Streamer->ReportOverflow(Length - p);
} }
cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority): cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority):
cStreamdevStreamer("Live streamer") { cStreamdevStreamer("streamdev-livestreaming") {
m_Priority = Priority; m_Priority = Priority;
m_NumPids = 0; m_NumPids = 0;
m_Channel = NULL; m_Channel = NULL;
m_Device = NULL; m_Device = NULL;
m_Receiver = NULL; m_Receiver = NULL;
m_Remux = NULL; m_Remux = NULL;
m_Buffer = NULL;
m_Sequence = 0;
memset(m_Pids, 0, sizeof(m_Pids));
} }
cStreamdevLiveStreamer::~cStreamdevLiveStreamer() { cStreamdevLiveStreamer::~cStreamdevLiveStreamer() {
@ -44,7 +46,6 @@ cStreamdevLiveStreamer::~cStreamdevLiveStreamer() {
#if VDRVERSNUM >= 10300 #if VDRVERSNUM >= 10300
//delete m_Filter; TODO //delete m_Filter; TODO
#endif #endif
free(m_Buffer);
} }
void cStreamdevLiveStreamer::Detach(void) { void cStreamdevLiveStreamer::Detach(void) {
@ -91,7 +92,7 @@ bool cStreamdevLiveStreamer::SetPid(int Pid, bool On) {
m_Receiver = new cStreamdevLiveReceiver(this, m_Channel->Ca(), m_Priority, m_Pids); m_Receiver = new cStreamdevLiveReceiver(this, m_Channel->Ca(), m_Priority, m_Pids);
if (m_Device != NULL) { if (m_Device != NULL) {
Dprintf("Attaching new receiver\n"); Dprintf("Attaching new receiver\n");
m_Device->AttachReceiver(m_Receiver); Attach();
} }
} }
return true; return true;
@ -155,6 +156,8 @@ bool cStreamdevLiveStreamer::SetFilter(u_short Pid, u_char Tid, u_char Mask,
#endif #endif
} }
// TODO: Remuxer einbinden
#if 0
uchar *cStreamdevLiveStreamer::Process(const uchar *Data, int &Count, int &Result) { uchar *cStreamdevLiveStreamer::Process(const uchar *Data, int &Count, int &Result) {
uchar *remuxed = m_Remux != NULL ? m_Remux->Process(Data, Count, Result) uchar *remuxed = m_Remux != NULL ? m_Remux->Process(Data, Count, Result)
: cStreamdevStreamer::Process(Data, Count, Result); : cStreamdevStreamer::Process(Data, Count, Result);
@ -184,6 +187,7 @@ uchar *cStreamdevLiveStreamer::Process(const uchar *Data, int &Count, int &Resul
} }
return NULL; return NULL;
} }
#endif
std::string cStreamdevLiveStreamer::Report(void) { std::string cStreamdevLiveStreamer::Report(void) {
std::string result; std::string result;

View File

@ -21,6 +21,7 @@ private:
cStreamdevLiveStreamer *m_Streamer; cStreamdevLiveStreamer *m_Streamer;
protected: protected:
virtual void Activate(bool On);
virtual void Receive(uchar *Data, int Length); virtual void Receive(uchar *Data, int Length);
public: public:
@ -39,10 +40,9 @@ private:
cStreamdevLiveReceiver *m_Receiver; cStreamdevLiveReceiver *m_Receiver;
cTSRemux *m_Remux; cTSRemux *m_Remux;
uchar *m_Buffer; uchar *m_Buffer;
int m_Sequence;
protected: protected:
virtual uchar *Process(const uchar *Data, int &Count, int &Result); //virtual uchar *Process(const uchar *Data, int &Count, int &Result);
public: public:
cStreamdevLiveStreamer(int Priority); cStreamdevLiveStreamer(int Priority);

View File

@ -1,5 +1,5 @@
/* /*
* $Id: streamer.c,v 1.3 2005/02/08 17:22:35 lordjaxom Exp $ * $Id: streamer.c,v 1.4 2005/02/08 19:54:52 lordjaxom Exp $
*/ */
#include <vdr/ringbuffer.h> #include <vdr/ringbuffer.h>
@ -13,34 +13,74 @@
#include "tools/socket.h" #include "tools/socket.h"
#include "common.h" #include "common.h"
#define VIDEOBUFSIZE MEGABYTE(4) cStreamdevWriter::cStreamdevWriter(cTBSocket *Socket, cStreamdevStreamer *Streamer):
#define MAXBLOCKSIZE TS_SIZE*10 cThread("streamdev-writer"),
m_Streamer(Streamer),
m_Socket(Socket),
m_Active(false)
{
}
cStreamdevWriter::~cStreamdevWriter()
{
m_Active = false;
Cancel(3);
}
void cStreamdevWriter::Action(void)
{
int max = 0;
m_Active = true;
while (m_Active) {
int count;
uchar *block = m_Streamer->Get(count);
if (!m_Socket->SafeWrite(block, count)) {
esyslog("ERROR: streamdev-server: couldn't send data: %m");
break;
}
m_Streamer->Del(count);
}
m_Active = false;
Dprintf("Max. Transmit Blocksize was: %d\n", max);
}
cStreamdevStreamer::cStreamdevStreamer(const char *Name): cStreamdevStreamer::cStreamdevStreamer(const char *Name):
cThread(((std::string)"Streamdev: " + Name).c_str()) cThread(Name),
m_Active(false),
m_Writer(NULL),
m_RingBuffer(new cRingBufferLinear(STREAMERBUFSIZE, TS_SIZE * 2, true,
"streamdev-streamer")),
m_SendBuffer(new cRingBufferLinear(WRITERBUFSIZE, MAXTRANSMITBLOCKSIZE))
{ {
m_Active = false; m_RingBuffer->SetTimeouts(0, 100);
m_Receivers = 0; m_SendBuffer->SetTimeouts(0, 100);
m_Buffer = NULL;
m_Name = Name;
m_Socket = NULL;
m_RingBuffer = new cRingBufferLinear(VIDEOBUFSIZE, TS_SIZE * 2, true);
} }
cStreamdevStreamer::~cStreamdevStreamer() { cStreamdevStreamer::~cStreamdevStreamer()
{
Stop(); Stop();
if (m_Buffer != NULL) delete[] m_Buffer;
delete m_RingBuffer; delete m_RingBuffer;
delete m_Writer;
delete m_SendBuffer;
} }
void cStreamdevStreamer::Start(cTBSocket *Socket) { void cStreamdevStreamer::Start(cTBSocket *Socket)
m_Socket = Socket; {
m_Writer = new cStreamdevWriter(Socket, this);
Attach(); Attach();
if (!m_Active)
cThread::Start();
} }
void cStreamdevStreamer::Stop(void) { void cStreamdevStreamer::Activate(bool On)
{
if (On && !m_Active) {
m_Writer->Start();
cThread::Start();
}
}
void cStreamdevStreamer::Stop(void)
{
if (m_Active) { if (m_Active) {
Dprintf("stopping live streamer\n"); Dprintf("stopping live streamer\n");
m_Active = false; m_Active = false;
@ -48,50 +88,35 @@ void cStreamdevStreamer::Stop(void) {
} }
} }
uchar *cStreamdevStreamer::Process(const uchar *Data, int &Count, int &Result) { int cStreamdevStreamer::Put(const uchar *Data, int Count)
if (m_Buffer == NULL) {
m_Buffer = new uchar[MAXBLOCKSIZE]; return m_SendBuffer->Put(Data, Count);
if (Count > MAXBLOCKSIZE)
Count = MAXBLOCKSIZE;
memcpy(m_Buffer, Data, Count);
Result = Count;
return m_Buffer;
} }
void cStreamdevStreamer::Action(void) { uchar *cStreamdevStreamer::Get(int &Count)
int max = 0; {
return m_SendBuffer->Get(Count);
}
#if VDRVERSNUM < 10300 void cStreamdevStreamer::Del(int Count)
isyslog("Streamdev: %s thread started (pid=%d)", m_Name, getpid()); {
#endif return m_SendBuffer->Del(Count);
}
void cStreamdevStreamer::Action(void)
{
int max = 0;
m_Active = true; m_Active = true;
while (m_Active) { while (m_Active) {
int recvd; int got;
const uchar *block = m_RingBuffer->Get(recvd); uchar *block = m_RingBuffer->Get(got);
if (block && recvd > 0) { if (block && got > 0) {
int result = 0; int count = Put(block, got);
uchar *sendBlock = Process(block, recvd, result); if (count)
m_RingBuffer->Del(count);
m_RingBuffer->Del(recvd); }
if (result > max) max = result;
if (!m_Socket->TimedWrite(sendBlock, result, 150)) {
if (errno != ETIMEDOUT) {
esyslog("ERROR: Streamdev: Couldn't write data: %s", strerror(errno));
m_Active = false;
}
}
} else
usleep(1); // this keeps the CPU load low (XXX: waiting buffers)
} }
Dprintf("Max. Transmit Blocksize was: %d\n", max);
#if VDRVERSNUM < 10300
isyslog("Streamdev: %s thread stopped", m_Name);
#endif
} }

View File

@ -1,5 +1,5 @@
/* /*
* $Id: streamer.h,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $ * $Id: streamer.h,v 1.3 2005/02/08 19:54:52 lordjaxom Exp $
*/ */
#ifndef VDR_STREAMDEV_STREAMER_H #ifndef VDR_STREAMDEV_STREAMER_H
@ -10,21 +10,37 @@
#include <vdr/tools.h> #include <vdr/tools.h>
class cTBSocket; class cTBSocket;
class cStreamdevStreamer;
#define MAXTRANSMITBLOCKSIZE TS_SIZE*10
#define STREAMERBUFSIZE MEGABYTE(4)
#define WRITERBUFSIZE KILOBYTE(192)
class cStreamdevWriter: public cThread {
private:
cStreamdevStreamer *m_Streamer;
cTBSocket *m_Socket;
bool m_Active;
protected:
virtual void Action(void);
public:
cStreamdevWriter(cTBSocket *Socket, cStreamdevStreamer *Streamer);
virtual ~cStreamdevWriter();
};
class cStreamdevStreamer: public cThread { class cStreamdevStreamer: public cThread {
private: private:
bool m_Active; bool m_Active;
int m_Receivers; cStreamdevWriter *m_Writer;
uchar *m_Buffer;
const char *m_Name;
cTBSocket *m_Socket;
cRingBufferLinear *m_RingBuffer; cRingBufferLinear *m_RingBuffer;
cRingBufferLinear *m_SendBuffer;
protected: protected:
virtual uchar *Process(const uchar *Data, int &Count, int &Result);
virtual void Action(void); virtual void Action(void);
const cTBSocket *Socket(void) const { return m_Socket; } //const cTBSocket *Socket(void) const { return m_Socket; }
public: public:
cStreamdevStreamer(const char *Name); cStreamdevStreamer(const char *Name);
@ -33,9 +49,14 @@ public:
virtual void Start(cTBSocket *Socket); virtual void Start(cTBSocket *Socket);
virtual void Stop(void); virtual void Stop(void);
int Put(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); } void Activate(bool On);
int Receive(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); }
void ReportOverflow(int Bytes) { m_RingBuffer->ReportOverflow(Bytes); } void ReportOverflow(int Bytes) { m_RingBuffer->ReportOverflow(Bytes); }
virtual int Put(const uchar *Data, int Count);
virtual uchar *Get(int &Count);
virtual void Del(int Count);
virtual void Detach(void) = 0; virtual void Detach(void) = 0;
virtual void Attach(void) = 0; virtual void Attach(void) = 0;
}; };

View File

@ -85,6 +85,29 @@ bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) {
return true; return true;
} }
bool cTBSource::SafeWrite(const void *Buffer, size_t Length) {
cTBSelect sel;
int offs;
offs = 0;
while (Length > 0) {
int b;
sel.Clear();
sel.Add(m_Filed, true);
if (sel.Select() == -1)
return false;
if (sel.CanWrite(m_Filed)) {
if ((b = Write((char*)Buffer + offs, Length)) == -1)
return false;
offs += b;
Length -= b;
}
}
return true;
}
ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq, ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq,
uint TimeoutMs) { uint TimeoutMs) {
int seqlen, ms; int seqlen, ms;

View File

@ -79,6 +79,8 @@ public:
descriptor sources. */ descriptor sources. */
bool TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs); bool TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs);
bool SafeWrite(const void *Buffer, size_t Length);
/* ReadUntil() tries to read at most Length bytes into the storage pointed /* ReadUntil() tries to read at most Length bytes into the storage pointed
to by Buffer, which must be at least Length bytes in size, within the to by Buffer, which must be at least Length bytes in size, within the
time specified by TimeoutMs, using the Read()-Interface. Reading stops time specified by TimeoutMs, using the Read()-Interface. Reading stops