Close connection when client is gone. Fixes high CPU load problem (#201)

Modified Files:
	server/connection.h server/connectionHTTP.h
	server/connectionVTP.h server/server.c server/streamer.c
	server/streamer.h tools/select.c tools/select.h tools/source.c
This commit is contained in:
schmirl 2007-04-02 10:32:34 +00:00
parent cd7d4e3588
commit 525574f9b0
9 changed files with 76 additions and 38 deletions

View File

@ -1,5 +1,5 @@
/* /*
* $Id: connection.h,v 1.3 2005/05/09 20:22:29 lordjaxom Exp $ * $Id: connection.h,v 1.4 2007/04/02 10:32:34 schmirl Exp $
*/ */
#ifndef VDR_STREAMDEV_SERVER_CONNECTION_H #ifndef VDR_STREAMDEV_SERVER_CONNECTION_H
@ -69,6 +69,10 @@ public:
the connection shall be closed and removed by the server */ the connection shall be closed and removed by the server */
virtual bool Read(void); virtual bool Read(void);
/* Is polled regularely by the server. Returns true if the connection
needs to be terminated. */
virtual bool Abort(void) const = 0;
/* Will make the socket close after sending all queued output data */ /* Will make the socket close after sending all queued output data */
void DeferClose(void) { m_DeferClose = true; } void DeferClose(void) { m_DeferClose = true; }

View File

@ -1,5 +1,5 @@
/* /*
* $Id: connectionHTTP.h,v 1.3 2005/02/11 16:44:15 lordjaxom Exp $ * $Id: connectionHTTP.h,v 1.4 2007/04/02 10:32:34 schmirl Exp $
*/ */
#ifndef VDR_STREAMDEV_SERVERS_CONNECTIONHTTP_H #ifndef VDR_STREAMDEV_SERVERS_CONNECTIONHTTP_H
@ -52,7 +52,13 @@ public:
virtual bool Command(char *Cmd); virtual bool Command(char *Cmd);
bool CmdGET(const std::string &Opts); bool CmdGET(const std::string &Opts);
virtual bool Abort(void) const;
virtual void Flushed(void); virtual void Flushed(void);
}; };
inline bool cConnectionHTTP::Abort(void) const
{
return m_LiveStreamer && m_LiveStreamer->Abort();
}
#endif // VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H #endif // VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H

View File

@ -2,9 +2,9 @@
#define VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H #define VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H
#include "server/connection.h" #include "server/connection.h"
#include "server/livestreamer.h"
class cTBSocket; class cTBSocket;
class cStreamdevLiveStreamer;
class cLSTEHandler; class cLSTEHandler;
class cLSTCHandler; class cLSTCHandler;
class cLSTTHandler; class cLSTTHandler;
@ -39,6 +39,7 @@ public:
virtual void Welcome(void); virtual void Welcome(void);
virtual void Reject(void); virtual void Reject(void);
virtual bool Abort(void) const;
virtual void Detach(void); virtual void Detach(void);
virtual void Attach(void); virtual void Attach(void);
@ -72,4 +73,9 @@ public:
__attribute__ ((format (printf, 3, 4))); __attribute__ ((format (printf, 3, 4)));
}; };
inline bool cConnectionVTP::Abort(void) const
{
return m_LiveStreamer && m_LiveStreamer->Abort();
}
#endif // VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H #endif // VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H

View File

@ -1,5 +1,5 @@
/* /*
* $Id: server.c,v 1.4 2006/11/10 11:52:41 schmirl Exp $ * $Id: server.c,v 1.5 2007/04/02 10:32:34 schmirl Exp $
*/ */
#include "server/server.h" #include "server/server.h"
@ -91,20 +91,29 @@ void cStreamdevServer::Action(void)
select.Add(s->Socket(), true); select.Add(s->Socket(), true);
} }
int result; int sel;
while ((result = select.Select(100)) < 0 && errno == ETIMEDOUT) { do
if (!m_Active) break; {
} sel = select.Select(400);
if (sel < 0 && errno == ETIMEDOUT) {
// check for aborted clients
for (cServerConnection *s = m_Clients.First(); s; s = m_Clients.Next(s)) {
if (s->Abort())
sel = 0;
}
}
} while (sel < 0 && errno == ETIMEDOUT && m_Active);
if (result < 0) { if (!m_Active)
if (m_Active) // no exit was requested while polling break;
esyslog("fatal error, server exiting: %m"); if (sel < 0) {
esyslog("fatal error, server exiting: %m");
break; break;
} }
/* Ask all Server components to act on signalled sockets */ /* Ask all Server components to act on signalled sockets */
for (cServerComponent *c = m_Servers.First(); c; c = m_Servers.Next(c)){ for (cServerComponent *c = m_Servers.First(); c; c = m_Servers.Next(c)){
if (select.CanRead(c->Socket())) { if (sel && select.CanRead(c->Socket())) {
cServerConnection *client = c->Accept(); cServerConnection *client = c->Accept();
m_Clients.Add(client); m_Clients.Add(client);
@ -125,12 +134,14 @@ void cStreamdevServer::Action(void)
for (cServerConnection *s = m_Clients.First(); s;) { for (cServerConnection *s = m_Clients.First(); s;) {
bool result = true; bool result = true;
if (select.CanWrite(s->Socket())) if (sel && select.CanWrite(s->Socket()))
result = s->Write(); result = s->Write();
if (result && select.CanRead(s->Socket())) if (sel && result && select.CanRead(s->Socket()))
result = s->Read(); result = s->Read();
result &= !s->Abort();
cServerConnection *next = m_Clients.Next(s); cServerConnection *next = m_Clients.Next(s);
if (!result) { if (!result) {
isyslog("streamdev: closing streamdev connection to %s:%d", isyslog("streamdev: closing streamdev connection to %s:%d",

View File

@ -1,5 +1,5 @@
/* /*
* $Id: streamer.c,v 1.14 2005/05/09 20:22:29 lordjaxom Exp $ * $Id: streamer.c,v 1.15 2007/04/02 10:32:34 schmirl Exp $
*/ */
#include <vdr/ringbuffer.h> #include <vdr/ringbuffer.h>
@ -40,6 +40,9 @@ void cStreamdevWriter::Action(void)
uchar *block = NULL; uchar *block = NULL;
int count, offset = 0; int count, offset = 0;
m_Active = true; m_Active = true;
sel.Clear();
sel.Add(*m_Socket, true);
while (m_Active) { while (m_Active) {
if (block == NULL) { if (block == NULL) {
block = m_Streamer->Get(count); block = m_Streamer->Get(count);
@ -47,9 +50,7 @@ void cStreamdevWriter::Action(void)
} }
if (block != NULL) { if (block != NULL) {
sel.Clear(); if (sel.Select(15000) == -1) {
sel.Add(*m_Socket, true);
if (sel.Select(500) == -1) {
esyslog("ERROR: streamdev-server: couldn't send data: %m"); esyslog("ERROR: streamdev-server: couldn't send data: %m");
break; break;
} }

View File

@ -1,5 +1,5 @@
/* /*
* $Id: streamer.h,v 1.7 2005/03/12 12:54:19 lordjaxom Exp $ * $Id: streamer.h,v 1.8 2007/04/02 10:32:34 schmirl Exp $
*/ */
#ifndef VDR_STREAMDEV_STREAMER_H #ifndef VDR_STREAMDEV_STREAMER_H
@ -29,6 +29,8 @@ protected:
public: public:
cStreamdevWriter(cTBSocket *Socket, cStreamdevStreamer *Streamer); cStreamdevWriter(cTBSocket *Socket, cStreamdevStreamer *Streamer);
virtual ~cStreamdevWriter(); virtual ~cStreamdevWriter();
bool IsActive(void) const { return m_Active; }
}; };
// --- cStreamdevStreamer ----------------------------------------------------- // --- cStreamdevStreamer -----------------------------------------------------
@ -52,6 +54,7 @@ public:
virtual void Start(cTBSocket *Socket); virtual void Start(cTBSocket *Socket);
virtual void Stop(void); virtual void Stop(void);
bool Abort(void) const;
void Activate(bool On); void Activate(bool On);
int Receive(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); } int Receive(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); }
@ -65,5 +68,10 @@ public:
virtual void Attach(void) {} virtual void Attach(void) {}
}; };
inline bool cStreamdevStreamer::Abort(void) const
{
return m_Active && !m_Writer->IsActive();
}
#endif // VDR_STREAMDEV_STREAMER_H #endif // VDR_STREAMDEV_STREAMER_H

View File

@ -21,19 +21,21 @@ int cTBSelect::Select(uint TimeoutMs) {
tv.tv_usec = (TimeoutMs % 1000) * 1000; tv.tv_usec = (TimeoutMs % 1000) * 1000;
tv.tv_sec = TimeoutMs / 1000; tv.tv_sec = TimeoutMs / 1000;
memcpy(m_FdsResult, m_FdsQuery, sizeof(m_FdsResult));
if (TimeoutMs == 0) if (TimeoutMs == 0)
return ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL, &tv); return ::select(m_MaxFiled + 1, &m_FdsResult[0], &m_FdsResult[1], NULL, &tv);
cTimeMs starttime; cTimeMs starttime;
ms = TimeoutMs; ms = TimeoutMs;
while (ms > 0 && (res = ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL, while (ms > 0 && (res = ::select(m_MaxFiled + 1, &m_FdsResult[0], &m_FdsResult[1], NULL,
&tv)) == -1 && errno == EINTR) { &tv)) == -1 && errno == EINTR) {
ms = TimeoutMs - starttime.Elapsed(); ms = TimeoutMs - starttime.Elapsed();
tv.tv_usec = (ms % 1000) * 1000; tv.tv_usec = (ms % 1000) * 1000;
tv.tv_sec = ms / 1000; tv.tv_sec = ms / 1000;
memcpy(m_FdsResult, m_FdsQuery, sizeof(m_FdsResult));
} }
if (ms <= 0) { if (ms <= 0 || res == 0) {
errno = ETIMEDOUT; errno = ETIMEDOUT;
return -1; return -1;
} }
@ -42,8 +44,8 @@ int cTBSelect::Select(uint TimeoutMs) {
int cTBSelect::Select(void) { int cTBSelect::Select(void) {
ssize_t res; ssize_t res;
while ((res = ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL, NULL)) == -1 do {
&& errno == EINTR) memcpy(m_FdsResult, m_FdsQuery, sizeof(m_FdsResult));
; } while ((res = ::select(m_MaxFiled + 1, &m_FdsResult[0], &m_FdsResult[1], NULL, NULL)) == -1 && errno == EINTR);
return res; return res;
} }

View File

@ -11,8 +11,8 @@ class cTBSelect {
private: private:
int m_MaxFiled; int m_MaxFiled;
fd_set m_Rfds; fd_set m_FdsQuery[2];
fd_set m_Wfds; fd_set m_FdsResult[2];
public: public:
cTBSelect(void); cTBSelect(void);
@ -50,26 +50,26 @@ public:
}; };
inline void cTBSelect::Clear(void) { inline void cTBSelect::Clear(void) {
FD_ZERO(&m_Rfds); FD_ZERO(&m_FdsQuery[0]);
FD_ZERO(&m_Wfds); FD_ZERO(&m_FdsQuery[1]);
m_MaxFiled = -1; m_MaxFiled = -1;
} }
inline bool cTBSelect::Add(int Filed, bool Output /* = false */) { inline bool cTBSelect::Add(int Filed, bool Output /* = false */) {
if (Filed < 0) return false; if (Filed < 0) return false;
FD_SET(Filed, Output ? &m_Wfds : &m_Rfds); FD_SET(Filed, &m_FdsQuery[Output ? 1 : 0]);
if (Filed > m_MaxFiled) m_MaxFiled = Filed; if (Filed > m_MaxFiled) m_MaxFiled = Filed;
return true; return true;
} }
inline bool cTBSelect::CanRead(int FileNo) const { inline bool cTBSelect::CanRead(int FileNo) const {
if (FileNo < 0) return false; if (FileNo < 0) return false;
return FD_ISSET(FileNo, &m_Rfds); return FD_ISSET(FileNo, &m_FdsResult[0]);
} }
inline bool cTBSelect::CanWrite(int FileNo) const { inline bool cTBSelect::CanWrite(int FileNo) const {
if (FileNo < 0) return false; if (FileNo < 0) return false;
return FD_ISSET(FileNo, &m_Wfds); return FD_ISSET(FileNo, &m_FdsResult[1]);
} }
#endif // TOOLBOX_SELECT_H #endif // TOOLBOX_SELECT_H

View File

@ -61,11 +61,11 @@ bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) {
cTimeMs starttime; cTimeMs starttime;
ms = TimeoutMs; ms = TimeoutMs;
offs = 0; offs = 0;
sel.Clear();
sel.Add(m_Filed, true);
while (Length > 0) { while (Length > 0) {
int b; int b;
sel.Clear();
sel.Add(m_Filed, true);
if (sel.Select(ms) == -1) if (sel.Select(ms) == -1)
return false; return false;
@ -90,11 +90,11 @@ bool cTBSource::SafeWrite(const void *Buffer, size_t Length) {
int offs; int offs;
offs = 0; offs = 0;
sel.Clear();
sel.Add(m_Filed, true);
while (Length > 0) { while (Length > 0) {
int b; int b;
sel.Clear();
sel.Add(m_Filed, true);
if (sel.Select() == -1) if (sel.Select() == -1)
return false; return false;
@ -128,9 +128,9 @@ ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq,
cTimeMs starttime; cTimeMs starttime;
ms = TimeoutMs; ms = TimeoutMs;
sel.Clear();
sel.Add(m_Filed, false);
while (m_LineBuffer.size() < BUFSIZ) { while (m_LineBuffer.size() < BUFSIZ) {
sel.Clear();
sel.Add(m_Filed, false);
if (sel.Select(ms) == -1) if (sel.Select(ms) == -1)
return -1; return -1;