diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 833abad..7dfd07d 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -94,3 +94,6 @@ Jori Hamalainen Joachim König-Baltes for fixing Min/MaxPriority parsing + +Artem Makhutov + for suggesting and heavy testing IGMP based multicast streaming diff --git a/HISTORY b/HISTORY index 877202d..5416df4 100644 --- a/HISTORY +++ b/HISTORY @@ -1,6 +1,7 @@ eDR Plugin 'streamdev' Revision History --------------------------------------- +- added IGMP based multicast streaming - ignore trailing blank lines in HTTP requests - fixed parsing Min/MaxPriority from config (thanks to Joachim König-Baltes) - updated Finnish translation (thanks to Rolf Ahrenberg) diff --git a/Makefile b/Makefile index 9ca4cad..ea5eeee 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # # Makefile for a Video Disk Recorder plugin # -# $Id: Makefile,v 1.15 2008/04/07 14:50:32 schmirl Exp $ +# $Id: Makefile,v 1.15.2.1 2009/02/13 10:39:40 schmirl Exp $ # The official name of this plugin. # This name will be used in the '-P...' option of VDR to load the plugin. @@ -57,11 +57,11 @@ CLIENTOBJS = $(PLUGIN)-client.o \ SERVEROBJS = $(PLUGIN)-server.o \ \ - server/server.o server/connectionVTP.o server/connectionHTTP.o \ - server/componentHTTP.o server/componentVTP.o server/connection.o \ - server/component.o server/suspend.o server/setup.o server/streamer.o \ - server/livestreamer.o server/livefilter.o server/menuHTTP.o \ - \ + server/server.o server/component.o server/connection.o \ + server/componentVTP.o server/componentHTTP.o server/componentIGMP.o \ + server/connectionVTP.o server/connectionHTTP.o server/connectionIGMP.o \ + server/streamer.o server/livestreamer.o server/livefilter.o \ + server/suspend.o server/setup.o server/menuHTTP.o \ remux/tsremux.o remux/ts2ps.o remux/ts2es.o remux/extern.o ifdef DEBUG diff --git a/README b/README index e7f1846..c27dc2f 100644 --- a/README +++ b/README @@ -20,8 +20,9 @@ Contents: 2.3 Updating from streamdev 0.3.x 3. Usage 3.1 Usage HTTP server -3.2 Usage VDR-to-VDR server -3.3 Usage VDR-to-VDR client +3.2 Usage IGMP multicast server +3.3 Usage VDR-to-VDR server +3.4 Usage VDR-to-VDR client 4. Other useful Plugins 4.1 Plugins for VDR-to-VDR clients 4.2 Plugins for Server @@ -219,7 +220,66 @@ Note the single quotes, as otherwise "-a" will be passed to VDR and not to streamdev-server. The login ("vdr" in the example above) doesn't have to exist as a system account. -3.2 Usage VDR-to-VDR server: +3.2 Usage IGMP multicast server: +-------------------------------- + +IGMP based multicast streaming is often used by settop boxes to receive IP TV. +Streamdev's multicast server allows you to feed live TV from VDR to such a +settop box. VLC is known to work well if you look for a software client. + +The advantage of multicasting is that the actual stream is sent out only once, +regardless of how many clients want to receive it. The downside is, that you +cannot simply multicast across network boundaries. You need multicast routers. +For multicast streaming over the public Internet you would even need to register +for your own IP range. So don't even think of multicasting via Internet with +streamdev! Streamdev will send the stream only to one local ethernet segment and +all clients must be connected to this same segment. There must not be a router +inbetween. Also note that the client must not run on the streamdev-server +machine. + +Each channel is offered on a different multicast IP. Channel 1 is available from +multicast IP 239.255.0.1, channel 2 from 239.255.0.2 and so on. The upper limit +is 239.255.254.255 which corresponds to channel 65279 (239.255.255.0/24 is +reserved according to RFC-2365). + +Before you can use streamdev's multicast server, you might need to patch VDR. +Binding an IGMP socket is a privileged operation, so you must start VDR as root. +If you pass the -u option to VDR, it will drop almost all priviledges before +streamdev is even loaded. Apply vdr-cap_net_raw.diff to keep VDR from dropping +the CAP_NET_RAW capability required to bind the IGMP socket. The patch is part +of streamdev's source distribution. Check the patches subdirectory. There's no +need to patch VDR if it is kept running as root (not recommended). + +The multicast server is disabled by default. Enter the streamdev-server setup +menu to enable it and - IMPORTANT - bind the multicast server to the IP of your +VDR server's LAN ethernet card. The multicast server will refuse to start with +the default bind adresse "0.0.0.0". + +Now edit your streamdevhosts.conf. To allow streaming of all channels, it must +contain "239.255.0.0/16". Note that you cannot limit connections by client IP +here. You can however restrict which channels are allowed to be multicasted. +Enter individual multicast IPs instead of "239.255.0.0/16". + +By default, the linux kernel will refuse to join more than 20 multicast groups. +You might want to increase this up to "number_of_channels + 1". Note that it's +"number_of_channels", not "maximum_channel_number". + + #First 100 channels: + bash# sysctl -w sys.net.ipv4.igmp_max_memberships=101 + + #All channels: + bash# COUNT=$(grep -c '^[^:]' PATH_TO_YOUR/channels.conf) + bash# sysctl -w sys.net.ipv4.igmp_max_memberships=$COUNT + +A multicast server never knows how many clients are actually receiving a stream. +If a client signals that it leaves a multicast group, the server has to query +for other listeners before it can stop the stream. This may delay zapping from +one transponder to an other. The client will probably requests the new channel +before the previous stream has been stopped. If there's no free DVB card, VDR +won't be able to fulfill the request until a DVB card becomes available and the +client resends the request. + +3.3 Usage VDR-to-VDR server: ---------------------------- You can activate the VDR-to-VDR server part in the PlugIn's Setup Menu. It is @@ -228,7 +288,7 @@ port where you want the server to listen for incoming connections. The server will be activated when you push the OK button inside the setup menu, so there's no need to restart VDR. -3.3 Usage VDR-to-VDR client: +3.4 Usage VDR-to-VDR client: ---------------------------- Streamdev-client adds a "Suspend Server" item to VDR's mainmenu. With the diff --git a/i18n.c b/i18n.c index a09d94e..9a66468 100644 --- a/i18n.c +++ b/i18n.c @@ -1,5 +1,5 @@ /* - * $Id: i18n.c,v 1.8.2.5 2009/02/02 11:53:05 schmirl Exp $ + * $Id: i18n.c,v 1.8.2.6 2009/02/13 10:39:40 schmirl Exp $ */ #include "i18n.h" @@ -755,6 +755,106 @@ const tI18nPhrase Phrases[] = { "", // Czech #if VDRVERSNUM >= 10502 "", // Türkçe +#endif + }, + { "Multicast Streaming Server", // English + "Multicast Streaming Server", // Deutsch + "", // Slovenski + "", // Italiano + "", // Nederlands + "", // Português + "", // Français + "", // Norsk + "", // suomi + "", // Polski + "", // Español + "", // Ellinika + "", // Svenska + "", // Romaneste + "", // Magyar + "", // Catala + "", // Russian + "", // Hrvatski + "", // Eesti + "", // Dansk + "", // Czech +#if VDRVERSNUM >= 10502 + "", // Türkçe +#endif + }, + { "Start IGMP Server", // English + "IGMP Server starten", // Deutsch + "", // Slovenski + "", // Italiano + "", // Nederlands + "", // Português + "", // Français + "", // Norsk + "", // suomi + "", // Polski + "", // Español + "", // Ellinika + "", // Svenska + "", // Romaneste + "", // Magyar + "", // Catala + "", // Russian + "", // Hrvatski + "", // Eesti + "", // Dansk + "", // Czech +#if VDRVERSNUM >= 10502 + "", // Türkçe +#endif + }, + { "Multicast Client Port", // English + "Port des Multicast Clients", // Deutsch + "", // Slovenski + "", // Italiano + "", // Nederlands + "", // Português + "", // Français + "", // Norsk + "", // suomi + "", // Polski + "", // Español + "", // Ellinika + "", // Svenska + "", // Romaneste + "", // Magyar + "", // Catala + "", // Russian + "", // Hrvatski + "", // Eesti + "", // Dansk + "", // Czech +#if VDRVERSNUM >= 10502 + "", // Türkçe +#endif + }, + { "Multicast Streamtype", // English + "Multicast Streamtyp", // Deutsch + "", // Slovenski + "", // Italiano + "", // Nederlands + "", // Português + "", // Français + "", // Norsk + "", // suomi + "", // Polski + "", // Español + "", // Ellinika + "", // Svenska + "", // Romaneste + "", // Magyar + "", // Catala + "", // Russian + "", // Hrvatski + "", // Eesti + "", // Dansk + "", // Czech +#if VDRVERSNUM >= 10502 + "", // Türkçe #endif }, { NULL } diff --git a/patches/vdr-cap_net_raw.diff b/patches/vdr-cap_net_raw.diff new file mode 100644 index 0000000..2f714b1 --- /dev/null +++ b/patches/vdr-cap_net_raw.diff @@ -0,0 +1,11 @@ +--- vdr.c.orig 2009-02-13 09:45:55.000000000 +0100 ++++ vdr.c 2009-02-13 09:46:24.000000000 +0100 +@@ -115,7 +115,7 @@ + static bool SetCapSysTime(void) + { + // drop all capabilities except cap_sys_time +- cap_t caps = cap_from_text("= cap_sys_time=ep"); ++ cap_t caps = cap_from_text("= cap_sys_time,cap_net_raw=ep"); + if (!caps) { + fprintf(stderr, "vdr: cap_from_text failed: %s\n", strerror(errno)); + return false; diff --git a/server/component.c b/server/component.c index 1a584b5..b8f0de3 100644 --- a/server/component.c +++ b/server/component.c @@ -1,13 +1,14 @@ /* - * $Id: component.c,v 1.3 2005/05/09 20:22:29 lordjaxom Exp $ + * $Id: component.c,v 1.3.2.1 2009/02/13 10:39:42 schmirl Exp $ */ #include "server/component.h" #include "server/connection.h" cServerComponent::cServerComponent(const char *Protocol, const char *ListenIp, - uint ListenPort): + uint ListenPort, int Type, int IpProto): m_Protocol(Protocol), + m_Listen(Type, IpProto), m_ListenIp(ListenIp), m_ListenPort(ListenPort) { diff --git a/server/component.h b/server/component.h index 8703348..c7c90fd 100644 --- a/server/component.h +++ b/server/component.h @@ -1,5 +1,5 @@ /* - * $Id: component.h,v 1.2 2005/05/09 20:22:29 lordjaxom Exp $ + * $Id: component.h,v 1.2.2.1 2009/02/13 10:39:42 schmirl Exp $ */ #ifndef VDR_STREAMDEV_SERVERS_COMPONENT_H @@ -17,8 +17,8 @@ class cServerConnection; class cServerComponent: public cListObject { private: - cTBSocket m_Listen; const char *m_Protocol; + cTBSocket m_Listen; const char *m_ListenIp; uint m_ListenPort; @@ -27,7 +27,7 @@ protected: virtual cServerConnection *NewClient(void) = 0; public: - cServerComponent(const char *Protocol, const char *ListenIp, uint ListenPort); + cServerComponent(const char *Protocol, const char *ListenIp, uint ListenPort, int Type = SOCK_STREAM, int IpProto = 0); virtual ~cServerComponent(); /* Starts listening on the specified Port, override if you want to do things diff --git a/server/componentIGMP.c b/server/componentIGMP.c new file mode 100644 index 0000000..53e78d1 --- /dev/null +++ b/server/componentIGMP.c @@ -0,0 +1,447 @@ +/* + * $Id: componentIGMP.c,v 1.1.2.2 2009/02/13 10:39:42 schmirl Exp $ + */ +#include +#include + +#include "server/componentIGMP.h" +#include "server/connectionIGMP.h" +#include "server/setup.h" + +#ifndef IGMP_ALL_HOSTS +#define IGMP_ALL_HOSTS htonl(0xE0000001L) +#endif +#ifndef IGMP_ALL_ROUTER +#define IGMP_ALL_ROUTER htonl(0xE0000002L) +#endif + +// IGMP parameters according to RFC2236. All time values in seconds. +#define IGMP_ROBUSTNESS 2 +#define IGMP_QUERY_INTERVAL 125 +#define IGMP_QUERY_RESPONSE_INTERVAL 10 +#define IGMP_GROUP_MEMBERSHIP_INTERVAL (2 * IGMP_QUERY_INTERVAL + IGMP_QUERY_RESPONSE_INTERVAL) +#define IGMP_OTHER_QUERIER_PRESENT_INTERVAL (2 * IGMP_QUERY_INTERVAL + IGMP_QUERY_RESPONSE_INTERVAL / 2) +#define IGMP_STARTUP_QUERY_INTERVAL (IGMP_QUERY_INTERVAL / 4) +#define IGMP_STARTUP_QUERY_COUNT IGMP_ROBUSTNESS +// This value is 1/10 sec. RFC default is 10. Reduced to minimum to free unused channels ASAP +#define IGMP_LAST_MEMBER_QUERY_INTERVAL_TS 1 +#define IGMP_LAST_MEMBER_QUERY_COUNT IGMP_ROBUSTNESS + +// operations on struct timeval +#define TV_CMP(a, cmp, b) (a.tv_sec == b.tv_sec ? a.tv_usec cmp b.tv_usec : a.tv_sec cmp b.tv_sec) +#define TV_SET(tv) (tv.tv_sec || tv.tv_usec) +#define TV_CLR(tv) memset(&tv, 0, sizeof(tv)) +#define TV_CPY(dst, src) memcpy(&dst, &src, sizeof(dst)) +#define TV_ADD(dst, ts) dst.tv_sec += ts / 10; dst.tv_usec += (ts % 10) * 100000; if (dst.tv_usec >= 1000000) { dst.tv_usec -= 1000000; dst.tv_sec++; } + +class cMulticastGroup: public cListObject +{ +public: + cConnectionIGMP *connection; + in_addr_t group; + in_addr_t reporter; + struct timeval timeout; + struct timeval v1timer; + struct timeval retransmit; + + cMulticastGroup(in_addr_t Group); +}; + +cMulticastGroup::cMulticastGroup(in_addr_t Group) : + connection(NULL), + group(Group), + reporter(0) +{ + TV_CLR(timeout); + TV_CLR(v1timer); + TV_CLR(retransmit); +} + +void logIGMP(uint8_t type, struct in_addr Src, struct in_addr Dst, struct in_addr Grp) +{ + const char* msg; + switch (type) { + case IGMP_MEMBERSHIP_QUERY: msg = "membership query"; break; + case IGMP_V1_MEMBERSHIP_REPORT: msg = "V1 membership report"; break; + case IGMP_V2_MEMBERSHIP_REPORT: msg = "V2 membership report"; break; + case IGMP_V2_LEAVE_GROUP: msg = "leave group"; break; + default: msg = "unknown"; break; + } + char* s = strdup(inet_ntoa(Src)); + char* d = strdup(inet_ntoa(Dst)); + dsyslog("streamdev-server IGMP: Received %s from %s (dst %s) for %s", msg, s, d, inet_ntoa(Grp)); + free(s); + free(d); +} + +/* Taken from http://tools.ietf.org/html/rfc1071 */ +uint16_t inetChecksum(uint16_t *addr, int count) +{ + uint32_t sum = 0; + while (count > 1) { + sum += *addr++; + count -= 2; + } + + if( count > 0 ) + sum += * (uint8_t *) addr; + + while (sum>>16) + sum = (sum & 0xffff) + (sum >> 16); + + return ~sum; +} + +cComponentIGMP::cComponentIGMP(void): + cServerComponent("IGMP", "0.0.0.0", 0, SOCK_RAW, IPPROTO_IGMP), + cThread("IGMP timeout handler"), + m_BindIp(inet_addr(StreamdevServerSetup.IGMPBindIP)), + m_MaxChannelNumber(0), + m_StartupQueryCount(IGMP_STARTUP_QUERY_COUNT), + m_Querier(true) +{ +} + +cComponentIGMP::~cComponentIGMP(void) +{ +} + +cMulticastGroup* cComponentIGMP::FindGroup(in_addr_t Group) const +{ + cMulticastGroup *group = m_Groups.First(); + while (group && group->group != Group) + group = m_Groups.Next(group); + return group; +} + +bool cComponentIGMP::Initialize(void) +{ + if (cServerComponent::Initialize() && IGMPMembership(IGMP_ALL_ROUTER)) + { + for (cChannel *channel = Channels.First(); channel; channel = Channels.Next(channel)) + { + if (channel->GroupSep()) + continue; + int num = channel->Number(); + if (!IGMPMembership(htonl(MULTICAST_PRIV_MIN + num))) + break; + m_MaxChannelNumber = num; + } + if (m_MaxChannelNumber == 0) + { + IGMPMembership(IGMP_ALL_ROUTER, false); + esyslog("streamdev-server IGMP: no multicast group joined"); + } + else + { + Start(); + } + } + return m_MaxChannelNumber > 0; +} + +void cComponentIGMP::Destruct(void) +{ + if (m_MaxChannelNumber > 0) + { + Cancel(3); + for (cChannel *channel = Channels.First(); channel; channel = Channels.Next(channel)) + { + if (channel->GroupSep()) + continue; + int num = channel->Number(); + if (num > m_MaxChannelNumber) + break; + IGMPMembership(htonl(MULTICAST_PRIV_MIN + num), false); + } + IGMPMembership(IGMP_ALL_ROUTER, false); + } + m_MaxChannelNumber = 0; + cServerComponent::Destruct(); +} + +cServerConnection *cComponentIGMP::NewClient(void) +{ + return new cConnectionIGMP("IGMP", StreamdevServerSetup.IGMPClientPort, (eStreamType) StreamdevServerSetup.IGMPStreamType); +} + +cServerConnection* cComponentIGMP::Accept(void) +{ + ssize_t recv_len; + int ip_hdrlen, ip_datalen; + struct ip *ip; + struct igmp *igmp; + + while ((recv_len = ::recvfrom(Socket(), m_ReadBuffer, sizeof(m_ReadBuffer), 0, NULL, NULL)) < 0 && errno == EINTR) + errno = 0; + + if (recv_len < 0) { + esyslog("streamdev-server IGMP: read failed: %m"); + return NULL; + } + else if (recv_len < (ssize_t) sizeof(struct ip)) { + esyslog("streamdev-server IGMP: IP packet too short"); + return NULL; + } + + ip = (struct ip*) m_ReadBuffer; + + // filter out my own packets + if (ip->ip_src.s_addr == m_BindIp) + return NULL; + + ip_hdrlen = ip->ip_hl << 2; +#ifdef __FreeBSD__ + ip_datalen = ip->ip_len; +#else + ip_datalen = ntohs(ip->ip_len) - ip_hdrlen; +#endif + if (ip->ip_p != IPPROTO_IGMP) { + esyslog("streamdev-server IGMP: Unexpected protocol %hhu", ip->ip_p); + return NULL; + } + if (recv_len < ip_hdrlen + IGMP_MINLEN) { + esyslog("streamdev-server IGMP: packet too short"); + return NULL; + } + igmp = (struct igmp*) (m_ReadBuffer + ip_hdrlen); + uint16_t chksum = igmp->igmp_cksum; + igmp->igmp_cksum = 0; + if (chksum != inetChecksum((uint16_t *)igmp, ip_datalen)) + { + esyslog("INVALID CHECKSUM %d %d %d %d 0x%x 0x%x", ntohs(ip->ip_len), ip_hdrlen, ip_datalen, recv_len, chksum, inetChecksum((uint16_t *)igmp, ip_datalen)); + return NULL; + } + logIGMP(igmp->igmp_type, ip->ip_src, ip->ip_dst, igmp->igmp_group); + return ProcessMessage(igmp, igmp->igmp_group.s_addr, ip->ip_src.s_addr); +} + +cServerConnection* cComponentIGMP::ProcessMessage(struct igmp *Igmp, in_addr_t Group, in_addr_t Sender) +{ + cServerConnection* conn = NULL; + cMulticastGroup* group; + LOCK_THREAD; + switch (Igmp->igmp_type) { + case IGMP_MEMBERSHIP_QUERY: + if (ntohl(Sender) < ntohl(m_BindIp)) + IGMPStartOtherQuerierPresentTimer(); + break; + case IGMP_V1_MEMBERSHIP_REPORT: + case IGMP_V2_MEMBERSHIP_REPORT: + group = FindGroup(Group); + if (!group) { + group = new cMulticastGroup(Group); + m_Groups.Add(group); + } + if (!group->connection) { + IGMPStartMulticast(group); + conn = group->connection; + } + IGMPStartTimer(group, Sender); + if (Igmp->igmp_type == IGMP_V1_MEMBERSHIP_REPORT) + IGMPStartV1HostTimer(group); + break; + case IGMP_V2_LEAVE_GROUP: + group = FindGroup(Group); + if (group && !TV_SET(group->v1timer)) { + if (group->reporter == Sender) { + IGMPStartTimerAfterLeave(group, m_Querier ? IGMP_LAST_MEMBER_QUERY_INTERVAL_TS : Igmp->igmp_code); + if (m_Querier) + IGMPSendGroupQuery(group); + IGMPStartRetransmitTimer(group); + } + m_CondWait.Signal(); + } + break; + default: + break; + } + return conn; +} + +void cComponentIGMP::Action() +{ + while (Running()) { + struct timeval now; + struct timeval next; + + gettimeofday(&now, NULL); + TV_CPY(next, now); + next.tv_sec += IGMP_QUERY_INTERVAL; + + cMulticastGroup *del = NULL; + { + LOCK_THREAD; + if (TV_CMP(m_GeneralQueryTimer, <, now)) { + dsyslog("General Query"); + IGMPSendGeneralQuery(); + IGMPStartGeneralQueryTimer(); + } + if (TV_CMP(next, >, m_GeneralQueryTimer)) + TV_CPY(next, m_GeneralQueryTimer); + + for (cMulticastGroup *group = m_Groups.First(); group; group = m_Groups.Next(group)) { + if (TV_CMP(group->timeout, <, now)) { + IGMPStopMulticast(group); + IGMPClearRetransmitTimer(group); + if (del) + m_Groups.Del(del); + del = group; + } + else if (m_Querier && TV_SET(group->retransmit) && TV_CMP(group->retransmit, <, now)) { + IGMPSendGroupQuery(group); + IGMPStartRetransmitTimer(group); + if (TV_CMP(next, >, group->retransmit)) + TV_CPY(next, group->retransmit); + } + else if (TV_SET(group->v1timer) && TV_CMP(group->v1timer, <, now)) { + TV_CLR(group->v1timer); + } + else { + if (TV_CMP(next, >, group->timeout)) + TV_CPY(next, group->timeout); + if (TV_SET(group->retransmit) && TV_CMP(next, >, group->retransmit)) + TV_CPY(next, group->retransmit); + if (TV_SET(group->v1timer) && TV_CMP(next, >, group->v1timer)) + TV_CPY(next, group->v1timer); + } + } + if (del) + m_Groups.Del(del); + } + + int sleep = (next.tv_sec - now.tv_sec) * 1000; + sleep += (next.tv_usec - now.tv_usec) / 1000; + if (next.tv_usec < now.tv_usec) + sleep += 1000; + dsyslog("Sleeping %d ms", sleep); + m_CondWait.Wait(sleep); + } +} + +bool cComponentIGMP::IGMPMembership(in_addr_t Group, bool Add) +{ + struct ip_mreqn mreq; + mreq.imr_multiaddr.s_addr = Group; + mreq.imr_address.s_addr = INADDR_ANY; + mreq.imr_ifindex = 0; + if (setsockopt(Socket(), IPPROTO_IP, Add ? IP_ADD_MEMBERSHIP : IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) + { + esyslog("streamdev-server IGMP: unable to %s %s: %m", Add ? "join" : "leave", inet_ntoa(mreq.imr_multiaddr)); + if (errno == ENOBUFS) + esyslog("consider increasing sys.net.ipv4.igmp_max_memberships"); + return false; + } + return true; +} + +void cComponentIGMP::IGMPSendQuery(in_addr_t Group, int Timeout) +{ + struct sockaddr_in dst; + struct igmp query; + + dst.sin_family = AF_INET; + dst.sin_port = IPPROTO_IGMP; + dst.sin_addr.s_addr = Group; + query.igmp_type = IGMP_MEMBERSHIP_QUERY; + query.igmp_code = Timeout * 10; + query.igmp_cksum = 0; + query.igmp_group.s_addr = (Group == IGMP_ALL_HOSTS) ? 0 : Group; + query.igmp_cksum = inetChecksum((uint16_t *) &query, sizeof(query)); + + for (int i = 0; i < 5 && ::sendto(Socket(), &query, sizeof(query), 0, (sockaddr*)&dst, sizeof(dst)) == -1; i++) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + esyslog("streamdev-server IGMP: unable to query group %s: %m", inet_ntoa(dst.sin_addr)); + break; + } + cCondWait::SleepMs(10); + } +} + +// Querier state actions +void cComponentIGMP::IGMPStartGeneralQueryTimer() +{ + m_Querier = true; + if (m_StartupQueryCount) { + gettimeofday(&m_GeneralQueryTimer, NULL); + m_GeneralQueryTimer.tv_sec += IGMP_STARTUP_QUERY_INTERVAL; + m_StartupQueryCount--; + } + else { + gettimeofday(&m_GeneralQueryTimer, NULL); + m_GeneralQueryTimer.tv_sec += IGMP_QUERY_INTERVAL; + } +} + +void cComponentIGMP::IGMPStartOtherQuerierPresentTimer() +{ + m_Querier = false; + m_StartupQueryCount = 0; + gettimeofday(&m_GeneralQueryTimer, NULL); + m_GeneralQueryTimer.tv_sec += IGMP_OTHER_QUERIER_PRESENT_INTERVAL; +} + +void cComponentIGMP::IGMPSendGeneralQuery() +{ + IGMPSendQuery(IGMP_ALL_HOSTS, IGMP_QUERY_RESPONSE_INTERVAL); +} + +// Group state actions +void cComponentIGMP::IGMPStartTimer(cMulticastGroup* Group, in_addr_t Member) +{ + gettimeofday(&Group->timeout, NULL); + Group->timeout.tv_sec += IGMP_GROUP_MEMBERSHIP_INTERVAL; + TV_CLR(Group->retransmit); + Group->reporter = Member; + +} + +void cComponentIGMP::IGMPStartV1HostTimer(cMulticastGroup* Group) +{ + gettimeofday(&Group->v1timer, NULL); + Group->v1timer.tv_sec += IGMP_GROUP_MEMBERSHIP_INTERVAL; +} + +void cComponentIGMP::IGMPStartTimerAfterLeave(cMulticastGroup* Group, unsigned int MaxResponseTimeTs) +{ + //Group->Update(time(NULL) + MaxResponseTime * IGMP_LAST_MEMBER_QUERY_COUNT / 10); + MaxResponseTimeTs *= IGMP_LAST_MEMBER_QUERY_COUNT; + gettimeofday(&Group->timeout, NULL); + TV_ADD(Group->timeout, MaxResponseTimeTs); + TV_CLR(Group->retransmit); + Group->reporter = 0; +} + +void cComponentIGMP::IGMPStartRetransmitTimer(cMulticastGroup* Group) +{ + gettimeofday(&Group->retransmit, NULL); + TV_ADD(Group->retransmit, IGMP_LAST_MEMBER_QUERY_INTERVAL_TS); +} + +void cComponentIGMP::IGMPClearRetransmitTimer(cMulticastGroup* Group) +{ + TV_CLR(Group->retransmit); +} + +void cComponentIGMP::IGMPSendGroupQuery(cMulticastGroup* Group) +{ + IGMPSendQuery(Group->group, IGMP_LAST_MEMBER_QUERY_INTERVAL_TS); +} + +void cComponentIGMP::IGMPStartMulticast(cMulticastGroup* Group) +{ + in_addr_t g = ntohl(Group->group); + if (g > MULTICAST_PRIV_MIN && g <= MULTICAST_PRIV_MAX) { + cChannel *channel = Channels.GetByNumber(g - MULTICAST_PRIV_MIN); + Group->connection = (cConnectionIGMP*) NewClient(); + if (!Group->connection->Start(channel, Group->group)) { + DELETENULL(Group->connection); + } + } +} + +void cComponentIGMP::IGMPStopMulticast(cMulticastGroup* Group) +{ + if (Group->connection) + Group->connection->Stop(); +} diff --git a/server/componentIGMP.h b/server/componentIGMP.h new file mode 100644 index 0000000..ee78aa1 --- /dev/null +++ b/server/componentIGMP.h @@ -0,0 +1,62 @@ +/* + * $Id: componentIGMP.h,v 1.1.2.2 2009/02/13 10:39:42 schmirl Exp $ + */ + +#ifndef VDR_STREAMDEV_IGMPSERVER_H +#define VDR_STREAMDEV_IGMPSERVER_H + +#include +#include +#include +#include "server/component.h" + +class cConnectionIGMP; +class cMulticastGroup; + +class cComponentIGMP: public cServerComponent, public cThread { +private: + char m_ReadBuffer[2048]; + cList m_Groups; + in_addr_t m_BindIp; + int m_MaxChannelNumber; + struct timeval m_GeneralQueryTimer; + int m_StartupQueryCount; + bool m_Querier; + cCondWait m_CondWait; + + cMulticastGroup* FindGroup(in_addr_t Group) const; + + /* Add or remove local host to multicast group */ + bool IGMPMembership(in_addr_t Group, bool Add = true); + void IGMPSendQuery(in_addr_t Group, int Timeout); + + cServerConnection* ProcessMessage(struct igmp *Igmp, in_addr_t Group, in_addr_t Sender); + + void IGMPStartGeneralQueryTimer(); + void IGMPStartOtherQuerierPresentTimer(); + void IGMPSendGeneralQuery(); + + void IGMPStartTimer(cMulticastGroup* Group, in_addr_t Member); + void IGMPStartV1HostTimer(cMulticastGroup* Group); + void IGMPStartTimerAfterLeave(cMulticastGroup* Group, unsigned int MaxResponseTime); + void IGMPStartRetransmitTimer(cMulticastGroup* Group); + void IGMPClearRetransmitTimer(cMulticastGroup* Group); + void IGMPSendGroupQuery(cMulticastGroup* Group); + void IGMPStartMulticast(cMulticastGroup* Group); + void IGMPStopMulticast(cMulticastGroup* Group); + + virtual void Action(); + +protected: + virtual cServerConnection *NewClient(void); + +public: + virtual bool Initialize(void); + virtual void Destruct(void); + virtual cServerConnection* Accept(void); + + cComponentIGMP(void); + ~cComponentIGMP(void); +}; + +#endif // VDR_STREAMDEV_IGMPSERVER_H diff --git a/server/connection.c b/server/connection.c index 629ed1d..23a75be 100644 --- a/server/connection.c +++ b/server/connection.c @@ -1,5 +1,5 @@ /* - * $Id: connection.c,v 1.10 2007/05/07 12:25:11 schmirl Exp $ + * $Id: connection.c,v 1.10.2.1 2009/02/13 10:39:42 schmirl Exp $ */ #include "server/connection.h" @@ -12,7 +12,8 @@ #include #include -cServerConnection::cServerConnection(const char *Protocol): +cServerConnection::cServerConnection(const char *Protocol, int Type): + cTBSocket(Type), m_Protocol(Protocol), m_DeferClose(false), m_Pending(false), diff --git a/server/connection.h b/server/connection.h index 0ca7153..6239c28 100644 --- a/server/connection.h +++ b/server/connection.h @@ -1,5 +1,5 @@ /* - * $Id: connection.h,v 1.5.2.1 2008/10/14 11:05:59 schmirl Exp $ + * $Id: connection.h,v 1.5.2.2 2009/02/13 10:39:42 schmirl Exp $ */ #ifndef VDR_STREAMDEV_SERVER_CONNECTION_H @@ -44,7 +44,7 @@ protected: public: /* If you derive, specify a short string such as HTTP for Protocol, which will be displayed in error messages */ - cServerConnection(const char *Protocol); + cServerConnection(const char *Protocol, int Type = SOCK_STREAM); virtual ~cServerConnection(); /* If true, any client IP will be accepted */ diff --git a/server/connectionIGMP.c b/server/connectionIGMP.c new file mode 100644 index 0000000..b579754 --- /dev/null +++ b/server/connectionIGMP.c @@ -0,0 +1,64 @@ +/* + * $Id: connectionIGMP.c,v 1.1.2.2 2009/02/13 10:39:42 schmirl Exp $ + */ + +#include + +#include "server/connectionIGMP.h" +#include "server/server.h" +#include "server/setup.h" +#include + +cConnectionIGMP::cConnectionIGMP(const char* Name, int ClientPort, eStreamType StreamType) : + cServerConnection(Name, SOCK_DGRAM), + m_LiveStreamer(NULL), + m_ClientPort(ClientPort), + m_StreamType(StreamType) +{ +} + +cConnectionIGMP::~cConnectionIGMP() +{ + delete m_LiveStreamer; +} + +bool cConnectionIGMP::Start(cChannel *Channel, in_addr_t Dst) +{ + if (Channel != NULL) { + cDevice *device = GetDevice(Channel, 0); + if (device != NULL) { + device->SwitchChannel(Channel, false); + struct in_addr ip; + ip.s_addr = Dst; + if (Connect(inet_ntoa(ip), m_ClientPort)) { + m_LiveStreamer = new cStreamdevLiveStreamer(0); + if (m_LiveStreamer->SetChannel(Channel, m_StreamType)) { + m_LiveStreamer->SetDevice(device); + if (!SetDSCP()) + LOG_ERROR_STR("unable to set DSCP sockopt"); + Dprintf("streamer start\n"); + m_LiveStreamer->Start(this); + return true; + } + else + esyslog("streamdev-server IGMP: SetDevice failed"); + DELETENULL(m_LiveStreamer); + } + else + esyslog("streamdev-server IGMP: Connect failed: %m"); + } + else + esyslog("streamdev-server IGMP: GetDevice failed"); + } + else + esyslog("streamdev-server IGMP: Channel not found"); + return false; +} + +void cConnectionIGMP::Stop() +{ + if (m_LiveStreamer) { + m_LiveStreamer->Stop(); + DELETENULL(m_LiveStreamer); + } +} diff --git a/server/connectionIGMP.h b/server/connectionIGMP.h new file mode 100644 index 0000000..7e936aa --- /dev/null +++ b/server/connectionIGMP.h @@ -0,0 +1,45 @@ +/* + * $Id: connectionIGMP.h,v 1.1.2.2 2009/02/13 10:39:42 schmirl Exp $ + */ + +#ifndef VDR_STREAMDEV_SERVERS_CONNECTIONIGMP_H +#define VDR_STREAMDEV_SERVERS_CONNECTIONIGMP_H + +#include "connection.h" +#include "server/livestreamer.h" + +#include + +#define MULTICAST_PRIV_MIN ((uint32_t) 0xefff0000) +#define MULTICAST_PRIV_MAX ((uint32_t) 0xeffffeff) + +class cStreamdevLiveStreamer; + +class cConnectionIGMP: public cServerConnection { +private: + cStreamdevLiveStreamer *m_LiveStreamer; + int m_ClientPort; + eStreamType m_StreamType; + +public: + cConnectionIGMP(const char* Name, int ClientPort, eStreamType StreamType); + virtual ~cConnectionIGMP(); + + bool Start(cChannel *Channel, in_addr_t Dst); + void Stop(); + + /* Not used here */ + virtual bool Command(char *Cmd) { return false; } + + virtual void Attach(void) { if (m_LiveStreamer != NULL) m_LiveStreamer->Attach(); } + virtual void Detach(void) { if (m_LiveStreamer != NULL) m_LiveStreamer->Detach(); } + + virtual bool Abort(void) const; +}; + +inline bool cConnectionIGMP::Abort(void) const +{ + return !m_LiveStreamer || m_LiveStreamer->Abort(); +} + +#endif // VDR_STREAMDEV_SERVERS_CONNECTIONIGMP_H diff --git a/server/livefilter.c b/server/livefilter.c index ac9c284..68d1160 100644 --- a/server/livefilter.c +++ b/server/livefilter.c @@ -1,14 +1,11 @@ /* - * $Id: livefilter.c,v 1.5 2008/04/07 14:27:31 schmirl Exp $ + * $Id: livefilter.c,v 1.5.2.1 2009/02/13 10:39:42 schmirl Exp $ */ #include "server/livefilter.h" #include "server/streamer.h" #include "common.h" -#ifndef TS_SIZE -# define TS_SIZE 188 -#endif #ifndef TS_SYNC_BYTE # define TS_SYNC_BYTE 0x47 #endif diff --git a/server/server.c b/server/server.c index 7cb0c60..2ad6388 100644 --- a/server/server.c +++ b/server/server.c @@ -1,10 +1,11 @@ /* - * $Id: server.c,v 1.5.2.4 2008/10/31 12:20:06 schmirl Exp $ + * $Id: server.c,v 1.5.2.5 2009/02/13 10:39:42 schmirl Exp $ */ #include "server/server.h" #include "server/componentVTP.h" #include "server/componentHTTP.h" +#include "server/componentIGMP.h" #include "server/setup.h" #include @@ -36,6 +37,12 @@ void cStreamdevServer::Initialize(void) if (m_Instance == NULL) { if (StreamdevServerSetup.StartVTPServer) Register(new cComponentVTP); if (StreamdevServerSetup.StartHTTPServer) Register(new cComponentHTTP); + if (StreamdevServerSetup.StartIGMPServer) { + if (strcmp(StreamdevServerSetup.IGMPBindIP, "0.0.0.0") == 0) + esyslog("streamdev-server: Not starting IGMP. IGMP must be bound to a local IP"); + else + Register(new cComponentIGMP); + } m_Instance = new cStreamdevServer; } diff --git a/server/setup.c b/server/setup.c index 4b43adf..216268a 100644 --- a/server/setup.c +++ b/server/setup.c @@ -1,5 +1,5 @@ /* - * $Id: setup.c,v 1.3 2008/04/07 14:50:33 schmirl Exp $ + * $Id: setup.c,v 1.3.2.1 2009/02/13 10:39:42 schmirl Exp $ */ #include @@ -17,10 +17,14 @@ cStreamdevServerSetup::cStreamdevServerSetup(void) { StartHTTPServer = true; HTTPServerPort = 3000; HTTPStreamType = stPES; + StartIGMPServer = false; + IGMPClientPort = 1234; + IGMPStreamType = stTS; SuspendMode = smAlways; AllowSuspend = false; strcpy(VTPBindIP, "0.0.0.0"); strcpy(HTTPBindIP, "0.0.0.0"); + strcpy(IGMPBindIP, "0.0.0.0"); } bool cStreamdevServerSetup::SetupParse(const char *Name, const char *Value) { @@ -32,6 +36,10 @@ bool cStreamdevServerSetup::SetupParse(const char *Name, const char *Value) { else if (strcmp(Name, "HTTPServerPort") == 0) HTTPServerPort = atoi(Value); else if (strcmp(Name, "HTTPStreamType") == 0) HTTPStreamType = atoi(Value); else if (strcmp(Name, "HTTPBindIP") == 0) strcpy(HTTPBindIP, Value); + else if (strcmp(Name, "StartIGMPServer") == 0) StartIGMPServer = atoi(Value); + else if (strcmp(Name, "IGMPClientPort") == 0) IGMPClientPort = atoi(Value); + else if (strcmp(Name, "IGMPStreamType") == 0) IGMPStreamType = atoi(Value); + else if (strcmp(Name, "IGMPBindIP") == 0) strcpy(IGMPBindIP, Value); else if (strcmp(Name, "SuspendMode") == 0) SuspendMode = atoi(Value); else if (strcmp(Name, "AllowSuspend") == 0) AllowSuspend = atoi(Value); else return false; @@ -56,7 +64,11 @@ cStreamdevServerMenuSetupPage::cStreamdevServerMenuSetupPage(void) { AddShortEdit(tr("HTTP Server Port"), m_NewSetup.HTTPServerPort); AddTypeEdit (tr("HTTP Streamtype"), m_NewSetup.HTTPStreamType); AddIpEdit (tr("Bind to IP"), m_NewSetup.HTTPBindIP); - + AddCategory (tr("Multicast Streaming Server")); + AddBoolEdit (tr("Start IGMP Server"), m_NewSetup.StartIGMPServer); + AddShortEdit(tr("Multicast Client Port"), m_NewSetup.IGMPClientPort); + AddTypeEdit (tr("Multicast Streamtype"), m_NewSetup.IGMPStreamType); + AddIpEdit (tr("Bind to IP"), m_NewSetup.IGMPBindIP); SetCurrent(Get(1)); } @@ -70,7 +82,10 @@ void cStreamdevServerMenuSetupPage::Store(void) { || strcmp(m_NewSetup.VTPBindIP, StreamdevServerSetup.VTPBindIP) != 0 || m_NewSetup.StartHTTPServer != StreamdevServerSetup.StartHTTPServer || m_NewSetup.HTTPServerPort != StreamdevServerSetup.HTTPServerPort - || strcmp(m_NewSetup.HTTPBindIP, StreamdevServerSetup.HTTPBindIP) != 0) { + || strcmp(m_NewSetup.HTTPBindIP, StreamdevServerSetup.HTTPBindIP) != 0 + || m_NewSetup.StartIGMPServer != StreamdevServerSetup.StartIGMPServer + || m_NewSetup.IGMPClientPort != StreamdevServerSetup.IGMPClientPort + || strcmp(m_NewSetup.IGMPBindIP, StreamdevServerSetup.IGMPBindIP) != 0) { restart = true; cStreamdevServer::Destruct(); } @@ -83,6 +98,10 @@ void cStreamdevServerMenuSetupPage::Store(void) { SetupStore("HTTPServerPort", m_NewSetup.HTTPServerPort); SetupStore("HTTPStreamType", m_NewSetup.HTTPStreamType); SetupStore("HTTPBindIP", m_NewSetup.HTTPBindIP); + SetupStore("StartIGMPServer", m_NewSetup.StartIGMPServer); + SetupStore("IGMPClientPort", m_NewSetup.IGMPClientPort); + SetupStore("IGMPStreamType", m_NewSetup.IGMPStreamType); + SetupStore("IGMPBindIP", m_NewSetup.IGMPBindIP); SetupStore("SuspendMode", m_NewSetup.SuspendMode); SetupStore("AllowSuspend", m_NewSetup.AllowSuspend); diff --git a/server/setup.h b/server/setup.h index ff27618..ff2a5d9 100644 --- a/server/setup.h +++ b/server/setup.h @@ -1,5 +1,5 @@ /* - * $Id: setup.h,v 1.1 2004/12/30 22:44:21 lordjaxom Exp $ + * $Id: setup.h,v 1.1.1.1.2.1 2009/02/13 10:39:42 schmirl Exp $ */ #ifndef VDR_STREAMDEV_SETUPSERVER_H @@ -20,6 +20,10 @@ struct cStreamdevServerSetup { int HTTPServerPort; int HTTPStreamType; char HTTPBindIP[20]; + int StartIGMPServer; + int IGMPClientPort; + int IGMPStreamType; + char IGMPBindIP[20]; int SuspendMode; int AllowSuspend; }; diff --git a/server/streamer.c b/server/streamer.c index 7ee7f12..7c5cb50 100644 --- a/server/streamer.c +++ b/server/streamer.c @@ -1,5 +1,5 @@ /* - * $Id: streamer.c,v 1.16.2.1 2008/10/22 11:59:37 schmirl Exp $ + * $Id: streamer.c,v 1.16.2.2 2009/02/13 10:39:42 schmirl Exp $ */ #include @@ -55,16 +55,33 @@ void cStreamdevWriter::Action(void) if (sel.CanWrite(*m_Socket)) { int written; - if ((written = m_Socket->Write(block + offset, count)) == -1) { - esyslog("ERROR: streamdev-server: couldn't send data: %m"); + int pkgsize = count; + // SOCK_DGRAM indicates multicast + if (m_Socket->Type() == SOCK_DGRAM) { + // don't fragment multicast packets + // max. payload on standard local ethernet is 1416 to 1456 bytes + // and some STBs expect complete TS packets + // so let's always limit to 7 * TS_SIZE = 1316 + if (pkgsize > 7 * TS_SIZE) + pkgsize = 7 * TS_SIZE; + else + pkgsize -= pkgsize % TS_SIZE; + } + if ((written = m_Socket->Write(block + offset, pkgsize)) == -1) { + esyslog("ERROR: streamdev-server: couldn't send %d bytes: %m", pkgsize); break; } + + // statistics if (count > max) max = count; offset += written; count -= written; - if (count == 0) { + + // less than one TS packet left: + // delete what we've written so far and get next chunk + if (count < TS_SIZE) { m_Streamer->Del(offset); block = NULL; } diff --git a/server/streamer.h b/server/streamer.h index 384e002..40c794d 100644 --- a/server/streamer.h +++ b/server/streamer.h @@ -1,5 +1,5 @@ /* - * $Id: streamer.h,v 1.8.2.1 2008/10/22 11:59:37 schmirl Exp $ + * $Id: streamer.h,v 1.8.2.2 2009/02/13 10:39:42 schmirl Exp $ */ #ifndef VDR_STREAMDEV_STREAMER_H @@ -12,6 +12,10 @@ class cTBSocket; class cStreamdevStreamer; +#ifndef TS_SIZE +#define TS_SIZE 188 +#endif + #define STREAMERBUFSIZE MEGABYTE(4) #define WRITERBUFSIZE KILOBYTE(256) diff --git a/streamdev/streamdevhosts.conf b/streamdev/streamdevhosts.conf index 6c13598..49882a2 100644 --- a/streamdev/streamdevhosts.conf +++ b/streamdev/streamdevhosts.conf @@ -10,4 +10,5 @@ 127.0.0.1 # always accept localhost #192.168.100.0/24 # any host on the local net #204.152.189.113 # a specific host +#239.255.0.0/16 # uncomment for IGMP multicast streaming #0.0.0.0/0 # any host on any net (USE THIS WITH CARE!) diff --git a/tools/socket.c b/tools/socket.c index 7f4ce0a..3523c29 100644 --- a/tools/socket.c +++ b/tools/socket.c @@ -1,5 +1,6 @@ #include "tools/socket.h" +#include #include #include #include @@ -15,10 +16,11 @@ // actual DSCP value used #define STREAMDEV_DSCP DSCP_AF41 -cTBSocket::cTBSocket(int Type) { +cTBSocket::cTBSocket(int Type, int Protocol) { memset(&m_LocalAddr, 0, sizeof(m_LocalAddr)); memset(&m_RemoteAddr, 0, sizeof(m_RemoteAddr)); m_Type = Type; + m_Protocol = Protocol; } cTBSocket::~cTBSocket() { @@ -31,7 +33,7 @@ bool cTBSocket::Connect(const std::string &Host, unsigned int Port) { if (IsOpen()) Close(); - if ((socket = ::socket(PF_INET, m_Type, IPPROTO_IP)) == -1) + if ((socket = ::socket(PF_INET, m_Type, m_Protocol)) == -1) return false; m_LocalAddr.sin_family = AF_INET; @@ -52,10 +54,12 @@ bool cTBSocket::Connect(const std::string &Host, unsigned int Port) { return false; } - len = sizeof(struct sockaddr_in); - if (::getpeername(socket, (struct sockaddr*)&m_RemoteAddr, &len) == -1) { - ::close(socket); - return false; + if (m_Type == SOCK_STREAM) { + len = sizeof(struct sockaddr_in); + if (::getpeername(socket, (struct sockaddr*)&m_RemoteAddr, &len) == -1) { + ::close(socket); + return false; + } } len = sizeof(struct sockaddr_in); @@ -64,7 +68,11 @@ bool cTBSocket::Connect(const std::string &Host, unsigned int Port) { return false; } - return cTBSource::Open(socket); + if (!cTBSource::Open(socket)) { + ::close(socket); + return false; + } + return true; } bool cTBSocket::Listen(const std::string &Ip, unsigned int Port, int BackLog) { @@ -74,7 +82,7 @@ bool cTBSocket::Listen(const std::string &Ip, unsigned int Port, int BackLog) { if (IsOpen()) Close(); - if ((socket = ::socket(PF_INET, m_Type, IPPROTO_IP)) == -1) + if ((socket = ::socket(PF_INET, m_Type, m_Protocol)) == -1) return false; val = 1; diff --git a/tools/socket.h b/tools/socket.h index 23272ec..3dc7a33 100644 --- a/tools/socket.h +++ b/tools/socket.h @@ -18,9 +18,10 @@ private: struct sockaddr_in m_RemoteAddr; int m_Type; + int m_Protocol; public: - cTBSocket(int Type = SOCK_STREAM); + cTBSocket(int Type = SOCK_STREAM, int Protocol = 0); virtual ~cTBSocket(); /* See cTBSource::SysRead() @@ -97,15 +98,22 @@ public: }; inline ssize_t cTBSocket::SysRead(void *Buffer, size_t Length) const { - if (m_Type == SOCK_DGRAM) { + if (m_Type == SOCK_STREAM) + return ::recv(*this, Buffer, Length, 0); + else { socklen_t len = sizeof(m_RemoteAddr); return ::recvfrom(*this, Buffer, Length, 0, (sockaddr*)&m_RemoteAddr, &len); - } else - return ::recv(*this, Buffer, Length, 0); + } } inline ssize_t cTBSocket::SysWrite(const void *Buffer, size_t Length) const { return ::send(*this, Buffer, Length, 0); + if (m_Type == SOCK_STREAM) + return ::send(*this, Buffer, Length, 0); + else { + socklen_t len = sizeof(m_RemoteAddr); + return ::sendto(*this, Buffer, Length, 0, (sockaddr*)&m_RemoteAddr, len); + } } #endif // TOOLBOX_SOCKET_H