From 525574f9b01275aaff3e1c923cc91404365d501e Mon Sep 17 00:00:00 2001 From: schmirl Date: Mon, 2 Apr 2007 10:32:34 +0000 Subject: [PATCH] Close connection when client is gone. Fixes high CPU load problem (#201) Modified Files: server/connection.h server/connectionHTTP.h server/connectionVTP.h server/server.c server/streamer.c server/streamer.h tools/select.c tools/select.h tools/source.c --- server/connection.h | 6 +++++- server/connectionHTTP.h | 8 +++++++- server/connectionVTP.h | 8 +++++++- server/server.c | 33 ++++++++++++++++++++++----------- server/streamer.c | 9 +++++---- server/streamer.h | 10 +++++++++- tools/select.c | 14 ++++++++------ tools/select.h | 14 +++++++------- tools/source.c | 12 ++++++------ 9 files changed, 76 insertions(+), 38 deletions(-) diff --git a/server/connection.h b/server/connection.h index f68f84a..2df850e 100644 --- a/server/connection.h +++ b/server/connection.h @@ -1,5 +1,5 @@ /* - * $Id: connection.h,v 1.3 2005/05/09 20:22:29 lordjaxom Exp $ + * $Id: connection.h,v 1.4 2007/04/02 10:32:34 schmirl Exp $ */ #ifndef VDR_STREAMDEV_SERVER_CONNECTION_H @@ -69,6 +69,10 @@ public: the connection shall be closed and removed by the server */ virtual bool Read(void); + /* Is polled regularely by the server. Returns true if the connection + needs to be terminated. */ + virtual bool Abort(void) const = 0; + /* Will make the socket close after sending all queued output data */ void DeferClose(void) { m_DeferClose = true; } diff --git a/server/connectionHTTP.h b/server/connectionHTTP.h index d12c418..11e97b7 100644 --- a/server/connectionHTTP.h +++ b/server/connectionHTTP.h @@ -1,5 +1,5 @@ /* - * $Id: connectionHTTP.h,v 1.3 2005/02/11 16:44:15 lordjaxom Exp $ + * $Id: connectionHTTP.h,v 1.4 2007/04/02 10:32:34 schmirl Exp $ */ #ifndef VDR_STREAMDEV_SERVERS_CONNECTIONHTTP_H @@ -52,7 +52,13 @@ public: virtual bool Command(char *Cmd); bool CmdGET(const std::string &Opts); + virtual bool Abort(void) const; virtual void Flushed(void); }; +inline bool cConnectionHTTP::Abort(void) const +{ + return m_LiveStreamer && m_LiveStreamer->Abort(); +} + #endif // VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H diff --git a/server/connectionVTP.h b/server/connectionVTP.h index a8e76eb..c6ab223 100644 --- a/server/connectionVTP.h +++ b/server/connectionVTP.h @@ -2,9 +2,9 @@ #define VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H #include "server/connection.h" +#include "server/livestreamer.h" class cTBSocket; -class cStreamdevLiveStreamer; class cLSTEHandler; class cLSTCHandler; class cLSTTHandler; @@ -39,6 +39,7 @@ public: virtual void Welcome(void); virtual void Reject(void); + virtual bool Abort(void) const; virtual void Detach(void); virtual void Attach(void); @@ -72,4 +73,9 @@ public: __attribute__ ((format (printf, 3, 4))); }; +inline bool cConnectionVTP::Abort(void) const +{ + return m_LiveStreamer && m_LiveStreamer->Abort(); +} + #endif // VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H diff --git a/server/server.c b/server/server.c index b00f70d..5db895e 100644 --- a/server/server.c +++ b/server/server.c @@ -1,5 +1,5 @@ /* - * $Id: server.c,v 1.4 2006/11/10 11:52:41 schmirl Exp $ + * $Id: server.c,v 1.5 2007/04/02 10:32:34 schmirl Exp $ */ #include "server/server.h" @@ -91,20 +91,29 @@ void cStreamdevServer::Action(void) select.Add(s->Socket(), true); } - int result; - while ((result = select.Select(100)) < 0 && errno == ETIMEDOUT) { - if (!m_Active) break; - } + int sel; + do + { + sel = select.Select(400); + if (sel < 0 && errno == ETIMEDOUT) { + // check for aborted clients + for (cServerConnection *s = m_Clients.First(); s; s = m_Clients.Next(s)) { + if (s->Abort()) + sel = 0; + } + } + } while (sel < 0 && errno == ETIMEDOUT && m_Active); - if (result < 0) { - if (m_Active) // no exit was requested while polling - esyslog("fatal error, server exiting: %m"); + if (!m_Active) + break; + if (sel < 0) { + esyslog("fatal error, server exiting: %m"); break; } /* Ask all Server components to act on signalled sockets */ for (cServerComponent *c = m_Servers.First(); c; c = m_Servers.Next(c)){ - if (select.CanRead(c->Socket())) { + if (sel && select.CanRead(c->Socket())) { cServerConnection *client = c->Accept(); m_Clients.Add(client); @@ -125,11 +134,13 @@ void cStreamdevServer::Action(void) for (cServerConnection *s = m_Clients.First(); s;) { bool result = true; - if (select.CanWrite(s->Socket())) + if (sel && select.CanWrite(s->Socket())) result = s->Write(); - if (result && select.CanRead(s->Socket())) + if (sel && result && select.CanRead(s->Socket())) result = s->Read(); + + result &= !s->Abort(); cServerConnection *next = m_Clients.Next(s); if (!result) { diff --git a/server/streamer.c b/server/streamer.c index 582fc6a..63d2f60 100644 --- a/server/streamer.c +++ b/server/streamer.c @@ -1,5 +1,5 @@ /* - * $Id: streamer.c,v 1.14 2005/05/09 20:22:29 lordjaxom Exp $ + * $Id: streamer.c,v 1.15 2007/04/02 10:32:34 schmirl Exp $ */ #include @@ -40,6 +40,9 @@ void cStreamdevWriter::Action(void) uchar *block = NULL; int count, offset = 0; m_Active = true; + + sel.Clear(); + sel.Add(*m_Socket, true); while (m_Active) { if (block == NULL) { block = m_Streamer->Get(count); @@ -47,9 +50,7 @@ void cStreamdevWriter::Action(void) } if (block != NULL) { - sel.Clear(); - sel.Add(*m_Socket, true); - if (sel.Select(500) == -1) { + if (sel.Select(15000) == -1) { esyslog("ERROR: streamdev-server: couldn't send data: %m"); break; } diff --git a/server/streamer.h b/server/streamer.h index c27677c..e557d55 100644 --- a/server/streamer.h +++ b/server/streamer.h @@ -1,5 +1,5 @@ /* - * $Id: streamer.h,v 1.7 2005/03/12 12:54:19 lordjaxom Exp $ + * $Id: streamer.h,v 1.8 2007/04/02 10:32:34 schmirl Exp $ */ #ifndef VDR_STREAMDEV_STREAMER_H @@ -29,6 +29,8 @@ protected: public: cStreamdevWriter(cTBSocket *Socket, cStreamdevStreamer *Streamer); virtual ~cStreamdevWriter(); + + bool IsActive(void) const { return m_Active; } }; // --- cStreamdevStreamer ----------------------------------------------------- @@ -52,6 +54,7 @@ public: virtual void Start(cTBSocket *Socket); virtual void Stop(void); + bool Abort(void) const; void Activate(bool On); int Receive(uchar *Data, int Length) { return m_RingBuffer->Put(Data, Length); } @@ -65,5 +68,10 @@ public: virtual void Attach(void) {} }; +inline bool cStreamdevStreamer::Abort(void) const +{ + return m_Active && !m_Writer->IsActive(); +} + #endif // VDR_STREAMDEV_STREAMER_H diff --git a/tools/select.c b/tools/select.c index 0ab5f9b..9568110 100644 --- a/tools/select.c +++ b/tools/select.c @@ -21,19 +21,21 @@ int cTBSelect::Select(uint TimeoutMs) { tv.tv_usec = (TimeoutMs % 1000) * 1000; tv.tv_sec = TimeoutMs / 1000; + memcpy(m_FdsResult, m_FdsQuery, sizeof(m_FdsResult)); if (TimeoutMs == 0) - return ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL, &tv); + return ::select(m_MaxFiled + 1, &m_FdsResult[0], &m_FdsResult[1], NULL, &tv); cTimeMs starttime; ms = TimeoutMs; - while (ms > 0 && (res = ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL, + while (ms > 0 && (res = ::select(m_MaxFiled + 1, &m_FdsResult[0], &m_FdsResult[1], NULL, &tv)) == -1 && errno == EINTR) { ms = TimeoutMs - starttime.Elapsed(); tv.tv_usec = (ms % 1000) * 1000; tv.tv_sec = ms / 1000; + memcpy(m_FdsResult, m_FdsQuery, sizeof(m_FdsResult)); } - if (ms <= 0) { + if (ms <= 0 || res == 0) { errno = ETIMEDOUT; return -1; } @@ -42,8 +44,8 @@ int cTBSelect::Select(uint TimeoutMs) { int cTBSelect::Select(void) { ssize_t res; - while ((res = ::select(m_MaxFiled + 1, &m_Rfds, &m_Wfds, NULL, NULL)) == -1 - && errno == EINTR) - ; + do { + memcpy(m_FdsResult, m_FdsQuery, sizeof(m_FdsResult)); + } while ((res = ::select(m_MaxFiled + 1, &m_FdsResult[0], &m_FdsResult[1], NULL, NULL)) == -1 && errno == EINTR); return res; } diff --git a/tools/select.h b/tools/select.h index 7e873e2..a3622fd 100644 --- a/tools/select.h +++ b/tools/select.h @@ -11,8 +11,8 @@ class cTBSelect { private: int m_MaxFiled; - fd_set m_Rfds; - fd_set m_Wfds; + fd_set m_FdsQuery[2]; + fd_set m_FdsResult[2]; public: cTBSelect(void); @@ -50,26 +50,26 @@ public: }; inline void cTBSelect::Clear(void) { - FD_ZERO(&m_Rfds); - FD_ZERO(&m_Wfds); + FD_ZERO(&m_FdsQuery[0]); + FD_ZERO(&m_FdsQuery[1]); m_MaxFiled = -1; } inline bool cTBSelect::Add(int Filed, bool Output /* = false */) { if (Filed < 0) return false; - FD_SET(Filed, Output ? &m_Wfds : &m_Rfds); + FD_SET(Filed, &m_FdsQuery[Output ? 1 : 0]); if (Filed > m_MaxFiled) m_MaxFiled = Filed; return true; } inline bool cTBSelect::CanRead(int FileNo) const { if (FileNo < 0) return false; - return FD_ISSET(FileNo, &m_Rfds); + return FD_ISSET(FileNo, &m_FdsResult[0]); } inline bool cTBSelect::CanWrite(int FileNo) const { if (FileNo < 0) return false; - return FD_ISSET(FileNo, &m_Wfds); + return FD_ISSET(FileNo, &m_FdsResult[1]); } #endif // TOOLBOX_SELECT_H diff --git a/tools/source.c b/tools/source.c index c832e2f..80625e5 100644 --- a/tools/source.c +++ b/tools/source.c @@ -61,11 +61,11 @@ bool cTBSource::TimedWrite(const void *Buffer, size_t Length, uint TimeoutMs) { cTimeMs starttime; ms = TimeoutMs; offs = 0; + sel.Clear(); + sel.Add(m_Filed, true); while (Length > 0) { int b; - sel.Clear(); - sel.Add(m_Filed, true); if (sel.Select(ms) == -1) return false; @@ -90,11 +90,11 @@ bool cTBSource::SafeWrite(const void *Buffer, size_t Length) { int offs; offs = 0; + sel.Clear(); + sel.Add(m_Filed, true); while (Length > 0) { int b; - sel.Clear(); - sel.Add(m_Filed, true); if (sel.Select() == -1) return false; @@ -128,9 +128,9 @@ ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq, cTimeMs starttime; ms = TimeoutMs; + sel.Clear(); + sel.Add(m_Filed, false); while (m_LineBuffer.size() < BUFSIZ) { - sel.Clear(); - sel.Add(m_Filed, false); if (sel.Select(ms) == -1) return -1;