#!/usr/bin/python3 # # Tiny SAT>IP client which activates multicast streaming. # # Use command line parameters of # Create /etc/sysconfig/multicast file with syntax: # # : # # You can modify this file when the program is active and # reload the configuration using the SIGHUP signal. # # Configuration Example: # # # Channel 1 on multicast port 5554, using first orbital position # 5554:src=1&freq=11347&pol=v&ro=0.35&msys=dvbs2&\ # mtype=8psk&plts=on&sr=22000&fec=23&\ # pids=0,17,18,6600,6610,6620,6630 # # The SAT>IP parameters should match the SAT>IP specification: # http://www.satip.info/resources # # Note: The multicast address must be specified through minisatip # option -r or --remote-rtp . By default, # this address is 239.255.255.250. # # Command-line example: # # ./multicast-rtp \ # -s 192.168.1.100:554 -d 239.0.0.1 -p 5554 \ # -u "src=1&freq=11347&pol=v&ro=0.35&msys=dvbs2&mtype=8psk&plts=on&sr=22000&fec=23&pids=0,17,18,6600,6610,6620,6630" # # Note: The SAT>IP server needs to support "target" spoofing # and multicast address as "unicast" (minisatip project does it) # import os import signal import functools import asyncio import syslog import argparse,sys import signal VERSION='0.2' SYSLOG=False SERVER='127.0.0.1:554' CONFIG='' REQUEST='' TARGET_ADDR='' TARGET_PORT=0 USE_CONFIG=True QUIET=False # # Command line parameters # parser = argparse.ArgumentParser(description='Request multicast streaming from a SAT>IP server.') parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', help='quiet display option') parser.add_argument('-l', '--syslog', dest='syslog', action='store_true', help='enable System LOG') parser.add_argument('-s', '--server', dest='server', required=True, help='server to connect') parser.add_argument('-d', '--destination', dest='addr', default='', help='target address (Note: Only if server accept it!)') parser.add_argument('-p', '--port', dest='port', type=int, default=10000, help='target port (default: 10000)') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-u', '--uri', dest='uri', default='', help='URI to request') group.add_argument('-c', '--config', dest='config', default='/etc/sysconfig/multicast', help='config file with static multicast streams (default: /etc/sysconfig/multicast)') args = parser.parse_args() SYSLOG = args.syslog if args.port > 0 and args.port < 65536 : TARGET_PORT = args.port else : parser.print_help() sys.exit() if len(args.server) > 0 : SERVER = args.server else : parser.print_help() sys.exit() if len(args.uri) > 0 : REQUEST = args.uri elif len(args.config) > 0 : CONFIG = args.config else : parser.print_help() sys.exit() TARGET_ADDR = args.addr QUIET = args.quiet if len(REQUEST) > 0 : USE_CONFIG = False # # # def log(msg): if QUIET : return if SYSLOG: syslog.syslog(msg) else: print('LOG:', msg) # # Basic RTSP Protocol class # class RTSP_Protocol: def __init__(self, client, params): self.mport = 0 try: self.client = client client.protocol = self self.mport, self.params = params.split(':')[:2] self.mport = int(self.mport) & ~1 self.params = self.params.strip().rstrip() self.action = 'SETUP' except: log('Invalid parameters: %s' % params) client.fatal() self.timeout = 30 self.session = '' self.streamID = 0 self.transport = None self.cseq = 0 def connection_made(self, transport): self.transport = transport peer = transport._sock.getpeername() if self.action == 'SETUP': s = 'rtsp://%s:%d/?%s' % (peer[0], peer[1], self.params) log('%d (open): %s' % (self.mport, s)) msg = 'SETUP ' + s + ' RTSP/1.0\r\n' if len(REQUEST) > 0 : msg += 'Transport: RTP/AVP;unicast;destination=%s;client_port=%d-%d\r\n' % (TARGET_ADDR, self.mport, self.mport + 1) else : msg += 'Transport: RTP/AVP;multicast;port=%d-%d\r\n' % (self.mport, self.mport + 1) msg += 'CSeq: 1\r\n\r\n' log(' -----> : %s' % msg) transport.write(msg.encode()) self.action = 'PLAY' def data_received(self, data): if not self.transport: return log(' <----- : %s' % data) lines = data.decode("utf-8").splitlines() if lines[0] != 'RTSP/1.0 200 OK': return self.retry() try: cseq = 0 streamID = 0 session = '' timeout = 0 for line in lines[1:]: if line == '': break pos = line.find(':') if pos >= 0: header = line[:pos].lower() data = line[pos+1:].strip().lstrip() if header == 'cseq': cseq = int(data) elif header == 'com.ses.streamid': streamID = int(data) elif header == 'session': a = data.split(';') session = a[0] for b in a[1:]: if b.startswith('timeout='): timeout = int(b[8:]) if timeout: self.timeout = timeout if session: self.session = session if streamID: self.streamID = streamID except: self.retry() return peer = self.transport._sock.getpeername() if self.action == 'PLAY': log('%d (setup): Session %s, streamID %d' % (self.mport, self.session, self.streamID)) self.action = 'OPTIONS' msg = 'PLAY rtsp://%s:%d/stream=%d RTSP/1.0\r\n' % (peer[0], peer[1], self.streamID) msg += 'Session: %s\r\n' % self.session msg += 'CSeq: 2\r\n\r\n' self.transport.write(msg.encode()) self.client.loop.call_at(loop.time() + self.timeout/2, self.options) self.cseq = 2 elif self.action == 'OPTIONS' and self.cseq == 2: log('%d (play): OK' % self.mport) elif self.action == 'TEARDOWN': log('%d (close): OK' % self.mport) self.retry() return def options(self): if not self.transport: return peer = self.transport._sock.getpeername() if self.action == 'OPTIONS': self.cseq += 1 msg = 'OPTIONS rtsp://%s:%d/stream=%d RTSP/1.0\r\n' % (peer[0], peer[1], self.streamID) msg += 'Session: %s\r\n' % self.session msg += 'CSeq: %d\r\n\r\n' % self.cseq self.transport.write(msg.encode()) self.client.loop.call_at(loop.time() + self.timeout/2, self.options) def teardown(self): self.action = 'TEARDOWN' if not self.transport: return peer = self.transport._sock.getpeername() log('%d (teardown): Session %s, streamID %d' % (self.mport, self.session, self.streamID)) self.cseq += 1 msg = 'TEARDOWN rtsp://%s:%d/stream=%d RTSP/1.0\r\n' % (peer[0], peer[1], self.streamID) msg += 'Session: %s\r\n' % self.session msg += 'CSeq: %d\r\n\r\n' % self.cseq self.transport.write(msg.encode()) self.client.loop.call_at(loop.time() + 1, self.retry) def eof_received(self): self.connection_lost(None) def connection_lost(self, exc): if self.transport: log('%d (connection lost)' % self.mport) self.client.connection_lost() self.transport = None def retry(self): if self.transport: self.transport.close() def fatal(self): if self.transport: self.teardown() self.client.protocol = None self.client.fatal() # # Basic RTSP Client class # class RTSP_Client: def __init__(self, clients, params): self.clients = clients self.loop = clients.loop self.protocol = None self.params = params self.dead = False self.deleteme = False log('New multicast client: %s' % (params)) self.create_connection() def create_connection(self): srv = SERVER.split(':') coro = self.loop.create_connection(lambda: RTSP_Protocol(self, self.params), srv[0], srv[1] and int(srv[1]) or 554) asyncio.async(coro, loop=self.loop) def connection_lost(self): self.protocol = None self.loop.call_at(loop.time() + 30, self.create_connection) def fatal(self): if self.dead: return log("Remove multicast client: %s" % self.params) self.dead = True if self.protocol: self.protocol.teardown() self.clients.remove_client(self) # # RTSP clients # class RTSP_Clients: def __init__(self, loop): self.loop = loop self.clients = [] if USE_CONFIG : self.parse_config_file() hupfcn = lambda: self.parse_config_file() else : self.parse_request() hupfcn = lambda: self.parse_request() loop.add_signal_handler(signal.SIGHUP, functools.partial(hupfcn)) def remove_client(self, client): if client in self.clients: self.clients.remove(client) client.fatal() def add_client(self, params): for c in self.clients: if c.params == params: c.deleteme = False return self.clients.append(RTSP_Client(self, params)) async def close_all(self): for c in self.clients: c.fatal() def parse_request(self): # mark clients for c in self.clients: c.deleteme = True # parse command line l = str(TARGET_PORT) + ':' + REQUEST #print("parse_request: " + l) self.add_client(l) # remove marked clients for c in self.clients: if c.deleteme: self.remove_client(c) def parse_config_file(self): # mark clients for c in self.clients: c.deleteme = True # parse config lines = [] try: a = open(CONFIG) lines = a.readlines() a.close() except: pass prev = '' for l in lines: l = prev + l.strip().rstrip() prev = '' if not l or (l[0] in ['#', ';']): continue if l[-1] == '\\': prev = l[:-1].rstrip() else: self.add_client(l) # remove marked clients for c in self.clients: if c.deleteme: self.remove_client(c) # # configuration # if SYSLOG: syslog.openlog('multicast-rtp', 0, syslog.LOG_LOCAL7) silent = QUIET if QUIET : QUIET = False log('Version %s' % VERSION) log('SERVER: %s' % SERVER) log('CONFIG: %s' % CONFIG) log('TARGET: %s:%d' % (TARGET_ADDR, TARGET_PORT)) log('REQUEST: %s' % REQUEST) log('------------') if silent : log(' Start.') QUIET = True loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGTERM, signal.getsignal(signal.SIGINT)) clients = RTSP_Clients(loop) try: loop.run_forever() except (KeyboardInterrupt, SystemExit) as e: ## Send TEARDOWN before exit! log(' Closing.') loop.run_until_complete(clients.close_all()) loop.run_until_complete(asyncio.sleep(1.0)) except: print('*******') finally: QUIET = False log(' Stop.') loop.close() if SYSLOG: syslog.closelog() sys.exit(0)