1
0
mirror of https://github.com/rofafor/vdr-plugin-satip.git synced 2023-10-10 13:37:42 +02:00

Refactor the server detection script.

This commit is contained in:
Rolf Ahrenberg 2017-07-31 16:18:02 +03:00
parent 732e28d0f0
commit 537c0ad000

View File

@ -1,70 +1,91 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
""" Simple tool to detect SAT>IP devices as JSON.
"""
import json
import socket
import sys
import urllib2
import xml.etree.ElementTree as ET
import requests
SSDP_ADDR = "239.255.255.250";
SSDP_PORT = 1900;
SSDP_MX = 1;
SSDP_ST = "urn:ses-com:device:SatIPServer:1";
SSDP_BIND = '0.0.0.0'
SSDP_ADDR = '239.255.255.250'
SSDP_PORT = 1900
SSDP_MX = 1
SSDP_ST = 'urn:ses-com:device:SatIPServer:1'
SSDP_REQUEST = 'M-SEARCH * HTTP/1.1\r\n' + \
'HOST: %s:%d\r\n' % (SSDP_ADDR, SSDP_PORT) + \
'MAN: "ssdp:discover"\r\n' + \
'MX: %d\r\n' % (SSDP_MX, ) + \
'ST: %s\r\n' % (SSDP_ST, ) + \
'\r\n'
ssdpRequest = "M-SEARCH * HTTP/1.1\r\n" + \
"HOST: %s:%d\r\n" % (SSDP_ADDR, SSDP_PORT) + \
"MAN: \"ssdp:discover\"\r\n" + \
"MX: %d\r\n" % (SSDP_MX, ) + \
"ST: %s\r\n" % (SSDP_ST, ) + "\r\n";
data = None
urls = list()
devices = list()
def parse_satip_xml(data):
""" Parse SAT>IP XML data.
# find out available SAT>IP servers
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except:
pass
sock.settimeout(1)
sock.bind(("192.168.0.6", SSDP_PORT))
sock.sendto(ssdpRequest, (SSDP_ADDR, SSDP_PORT))
try:
while 1:
data = sock.recv(1024)
if data:
# find out their capabilities
for row in data.split("\r\n"):
if "LOCATION:" in row:
url = row.replace("LOCATION:","").strip()
if url in urls:
continue
urls.append(url)
f = urllib2.urlopen(url, timeout=2)
xml = f.read()
if xml:
root = ET.fromstring(xml)
name = root.find(".//*/{urn:schemas-upnp-org:device-1-0}friendlyName")
satipcap = root.find(".//*/{urn:ses-com:satip}X_SATIPCAP")
caps = dict()
for systems in satipcap.text.split(","):
cap = systems.split("-")
if cap:
# prepare the output data
count = int(cap[1])
if cap[0] in caps:
count = count + caps[cap[0]]
caps[cap[0]] = count
devices.append({"name": name.text, "frontends": caps})
else:
break
except:
pass
sock.close()
Args:
data (str): XML input data..
# print out the output data
json.dump(devices, fp=sys.stdout, sort_keys=True, indent=2)
Returns:
dict: Parsed SAT>IP device name and frontend information.
"""
result = {'name': '', 'frontends': {}}
if data:
root = ET.fromstring(data)
name = root.find('.//*/{urn:schemas-upnp-org:device-1-0}friendlyName')
result['name'] = name.text
satipcap = root.find('.//*/{urn:ses-com:satip}X_SATIPCAP')
caps = {}
for system in satipcap.text.split(","):
cap = system.split("-")
if cap:
count = int(cap[1])
if cap[0] in caps:
count = count + caps[cap[0]]
caps[cap[0]] = count
result['frontends'] = caps
return result
def detect_satip_devices():
""" Detect available SAT>IP devices by sending a broadcast message.
Returns:
list: Found SAT>IP devices.
"""
urls = []
devices = []
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except BaseException:
pass
sock.settimeout(1)
sock.bind((SSDP_BIND, SSDP_PORT))
sock.sendto(SSDP_REQUEST, (SSDP_ADDR, SSDP_PORT))
try:
while 1:
data = sock.recv(1024)
if data:
for row in data.split('\r\n'):
if 'LOCATION:' in row:
url = row.replace('LOCATION:', '').strip()
if url in urls:
continue
urls.append(url)
info = requests.get(url, timeout=2)
devices.append(parse_satip_xml(info.text))
else:
break
except BaseException:
pass
sock.close()
return devices
if __name__ == '__main__':
json.dump(detect_satip_devices(), fp=sys.stdout, sort_keys=True, indent=2)