mirror of
https://github.com/FrodoVDR/UdpPipe.git
synced 2023-10-10 13:36:54 +02:00
Add files via upload
This commit is contained in:
parent
92597b5534
commit
366b96118a
252
Utility.py
Normal file
252
Utility.py
Normal file
@ -0,0 +1,252 @@
|
||||
'''
|
||||
Created on 29.12.2010
|
||||
|
||||
@author: mark
|
||||
'''
|
||||
import cStringIO
|
||||
import exceptions
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
class SockIOException(exceptions.Exception):
|
||||
|
||||
def __init__(self):
|
||||
return
|
||||
|
||||
class SockIOData:
|
||||
|
||||
typeString=1
|
||||
typeNumber=2
|
||||
typeCommand=3
|
||||
typeBinary=4
|
||||
typeLongDirect=64
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class SockWrite(SockIOData):
|
||||
'''
|
||||
classdocs
|
||||
'''
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
|
||||
def writeString(self, key, value, strgIO):
|
||||
strgIO.write(chr(SockIOData.typeString))
|
||||
self.__writeRawString(key, strgIO)
|
||||
self.__writeRawString(value, strgIO)
|
||||
|
||||
def __writeRawString(self, strg, strgIO):
|
||||
length=len(strg)
|
||||
hiByte=abs(length / 256)
|
||||
loByte=length % 256
|
||||
strgIO.write(chr(hiByte))
|
||||
strgIO.write(chr(loByte))
|
||||
strgIO.write(strg)
|
||||
|
||||
def writeLongDirect(self, value, strgIO):
|
||||
strgIO.write(chr(SockIOData.typeLongDirect))
|
||||
Byte0=abs(value / 16777216)
|
||||
value=value % 16777216
|
||||
Byte1=abs(value / 65536)
|
||||
value=value % 65536
|
||||
Byte2=abs(value / 256)
|
||||
Byte3=value % 256
|
||||
strgIO.write(chr(Byte0))
|
||||
strgIO.write(chr(Byte1))
|
||||
strgIO.write(chr(Byte2))
|
||||
strgIO.write(chr(Byte3))
|
||||
|
||||
|
||||
def writeBinaryDirect(self, value, strgIO):
|
||||
strgIO.write(value)
|
||||
|
||||
def writeBinary(self, key, value, strgIO):
|
||||
strgIO.write(chr(SockIOData.typeBinary))
|
||||
self.__writeRawString(key, strgIO)
|
||||
ln=len(value)
|
||||
Byte0=abs(ln / 16777216)
|
||||
ln=ln % 16777216
|
||||
Byte1=abs(ln / 65536)
|
||||
ln=ln % 65536
|
||||
Byte2=abs(ln / 256)
|
||||
Byte3=ln % 256
|
||||
strgIO.write(chr(Byte0))
|
||||
strgIO.write(chr(Byte1))
|
||||
strgIO.write(chr(Byte2))
|
||||
strgIO.write(chr(Byte3))
|
||||
strgIO.write(value)
|
||||
|
||||
def writeLong(self, key, value, strgIO):
|
||||
strgIO.write(chr(SockIOData.typeNumber))
|
||||
self.__writeRawString(key, strgIO)
|
||||
Byte0=abs(value / 16777216)
|
||||
value=value % 16777216
|
||||
Byte1=abs(value / 65536)
|
||||
value=value % 65536
|
||||
Byte2=abs(value / 256)
|
||||
Byte3=value % 256
|
||||
strgIO.write(chr(Byte0))
|
||||
strgIO.write(chr(Byte1))
|
||||
strgIO.write(chr(Byte2))
|
||||
strgIO.write(chr(Byte3))
|
||||
|
||||
|
||||
class SockRead(SockIOData):
|
||||
|
||||
|
||||
###
|
||||
# Returns a tuple
|
||||
# dataType, key, value
|
||||
def read(self, strgIO):
|
||||
tmp=strgIO.read(1)
|
||||
if len(tmp)==0:
|
||||
raise SockIOException()
|
||||
typ=ord(tmp)
|
||||
key, value = { SockIOData.typeString : lambda : (self.__readRawString(strgIO), self.__readRawString(strgIO)),
|
||||
SockIOData.typeNumber : lambda : (self.__readRawString(strgIO), self.__readRawLong(strgIO)),
|
||||
SockIOData.typeBinary : lambda : (self.__readRawString(strgIO), self.__readRawBinary(strgIO)),
|
||||
SockIOData.typeLongDirect : lambda : ( "", self.__readRawLong(strgIO))
|
||||
} [typ]()
|
||||
return (typ, key, value)
|
||||
|
||||
|
||||
def __readRawString(self, strgIO):
|
||||
hiByte=ord(strgIO.read(1))
|
||||
loByte=ord(strgIO.read(1))
|
||||
length=(hiByte<<8)+loByte
|
||||
strg=strgIO.read(length)
|
||||
return (strg)
|
||||
|
||||
def __readRawLong(self, strgIO):
|
||||
byte0=ord(strgIO.read(1))
|
||||
byte1=ord(strgIO.read(1))
|
||||
byte2=ord(strgIO.read(1))
|
||||
byte3=ord(strgIO.read(1))
|
||||
value=(byte0 * 16777216) + (byte1*65536) + (byte2*256) + byte3
|
||||
return value
|
||||
|
||||
def __readRawBinary(self, strgIO):
|
||||
length=self.__readRawLong(strgIO)
|
||||
binary=strgIO.read(length)
|
||||
return binary
|
||||
|
||||
|
||||
class ReadDictionary:
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def read(self, data):
|
||||
d={}
|
||||
sockRd=SockRead()
|
||||
buf=cStringIO.StringIO(data)
|
||||
try:
|
||||
while True:
|
||||
_, key, value=sockRd.read(buf)
|
||||
d[key]=value
|
||||
except SockIOException:
|
||||
pass
|
||||
buf.close()
|
||||
return d
|
||||
|
||||
class WriteDictionary:
|
||||
|
||||
def write(self, data):
|
||||
sockWt=SockWrite()
|
||||
buf=cStringIO.StringIO(data)
|
||||
for k in data.keys:
|
||||
if (type(data[k]) is int) or (type(data[k]) is long):
|
||||
sockWt.writeLong(k, data[k], buf)
|
||||
if type(data[k]) is str:
|
||||
sockWt.writeString(k, data[k], buf)
|
||||
if type(data[k] is dict):
|
||||
sockWt.writeBinary(k, WriteDictionary.write(data[k]), buf)
|
||||
|
||||
|
||||
|
||||
import binascii
|
||||
import StringIO
|
||||
|
||||
class PKCS7Encoder(object):
|
||||
'''
|
||||
RFC 2315: PKCS#7 page 21
|
||||
Some content-encryption algorithms assume the
|
||||
input length is a multiple of k octets, where k > 1, and
|
||||
let the application define a method for handling inputs
|
||||
whose lengths are not a multiple of k octets. For such
|
||||
algorithms, the method shall be to pad the input at the
|
||||
trailing end with k - (l mod k) octets all having value k -
|
||||
(l mod k), where l is the length of the input. In other
|
||||
words, the input is padded at the trailing end with one of
|
||||
the following strings:
|
||||
|
||||
01 -- if l mod k = k-1
|
||||
02 02 -- if l mod k = k-2
|
||||
.
|
||||
.
|
||||
.
|
||||
k k ... k k -- if l mod k = 0
|
||||
|
||||
The padding can be removed unambiguously since all input is
|
||||
padded and no padding string is a suffix of another. This
|
||||
padding method is well-defined if and only if k < 256;
|
||||
methods for larger k are an open issue for further study.
|
||||
'''
|
||||
def __init__(self, k=16):
|
||||
self.k = k
|
||||
|
||||
## @param text The padded text for which the padding is to be removed.
|
||||
# @exception ValueError Raised when the input padding is missing or corrupt.
|
||||
def decode(self, text):
|
||||
'''
|
||||
Remove the PKCS#7 padding from a text string
|
||||
'''
|
||||
nl = len(text)
|
||||
val = int(binascii.hexlify(text[-1]), 16)
|
||||
if val > self.k:
|
||||
raise ValueError('Input is not padded or padding is corrupt')
|
||||
|
||||
l = nl - val
|
||||
return text[:l]
|
||||
|
||||
## @param text The text to encode.
|
||||
def encode(self, text):
|
||||
'''
|
||||
Pad an input string according to PKCS#7
|
||||
'''
|
||||
l = len(text)
|
||||
output = StringIO.StringIO()
|
||||
val = self.k - (l % self.k)
|
||||
for _ in xrange(val):
|
||||
output.write('%02x' % val)
|
||||
return text + binascii.unhexlify(output.getvalue())
|
||||
|
||||
def formatExceptionInfo(log, maxTBlevel=5):
|
||||
cla, exc, trbk = sys.exc_info()
|
||||
excName = cla.__name__
|
||||
try:
|
||||
excArgs = exc.__dict__["args"]
|
||||
except KeyError:
|
||||
excArgs = "<no args>"
|
||||
excTb = traceback.format_tb(trbk, maxTBlevel)
|
||||
log.debug(excName)
|
||||
log.debug(excArgs)
|
||||
log.debug(excTb)
|
||||
|
||||
|
||||
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
|
||||
|
||||
def dump(src, length=8):
|
||||
N=0; result=''
|
||||
while src:
|
||||
s,src = src[:length],src[length:]
|
||||
hexa = ' '.join(["%02X"%ord(x) for x in s])
|
||||
s = s.translate(FILTER)
|
||||
result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
|
||||
N+=length
|
||||
return result
|
||||
|
||||
|
620
__init__.py
Normal file
620
__init__.py
Normal file
@ -0,0 +1,620 @@
|
||||
import socket
|
||||
import threading
|
||||
from ConfigParser import NoSectionError, NoOptionError, RawConfigParser
|
||||
from os.path import isfile, join, exists, basename
|
||||
import time
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
import exceptions
|
||||
import sys
|
||||
import select
|
||||
import eu.liebrand.udppipe.Utility
|
||||
import cStringIO
|
||||
import signal
|
||||
import os
|
||||
import Queue
|
||||
import datetime
|
||||
from optparse import OptionParser
|
||||
import errno
|
||||
from daemon import runner
|
||||
|
||||
class PipeBase:
|
||||
|
||||
CONFIG_DIR="./"
|
||||
CONFIG_FILE="udppipe.ini"
|
||||
|
||||
KEY_ENABLELOGGING="EnableLogging"
|
||||
KEY_LOGFILENAME="logFileName"
|
||||
KEY_MAXFILESIZE="MaxFilesize"
|
||||
KEY_MSGFORMAT="MsgFormat"
|
||||
KEY_LOGLEVEL="logLevel"
|
||||
KEY_PORTCONFIG="portConfig_%d"
|
||||
KEY_LISTENPORT="listenPort"
|
||||
KEY_CFGID="id"
|
||||
KEY_FORWARDHOST="forwardHost"
|
||||
KEY_FORWARDPORT="forwardPort"
|
||||
|
||||
FIELD_HOST='host'
|
||||
FIELD_PORT='port'
|
||||
FIELD_OP='OP'
|
||||
FIELD_SRVPORT='srvPort'
|
||||
FIELD_UDPDATA='udpData'
|
||||
|
||||
VALUE_UDP='udp'
|
||||
VALUE_PING='ping'
|
||||
VALUE_CONFIG='__cfg__'
|
||||
|
||||
IDX_FORWARDHOST=3
|
||||
IDX_FORWARDPORT=4
|
||||
|
||||
DEFAULTS={KEY_ENABLELOGGING :"yes",
|
||||
KEY_LOGFILENAME : "/tmp/udppipe.log",
|
||||
KEY_MAXFILESIZE : 1000000,
|
||||
KEY_MSGFORMAT : "%(asctime)s, %(levelname)s, %(module)s, %(lineno)d, %(message)s",
|
||||
KEY_LOGLEVEL : 20
|
||||
}
|
||||
|
||||
def __init__(self, section):
|
||||
self.stdin_path = '/dev/null'
|
||||
self.stdout_path = '/dev/tty'
|
||||
self.stderr_path = '/dev/tty'
|
||||
self.pidfile_path = '/tmp/udppipe.pid'
|
||||
self.pidfile_timeout = 5
|
||||
self.section=section
|
||||
path=join(PipeBase.CONFIG_DIR, PipeBase.CONFIG_FILE)
|
||||
if not(exists(path)):
|
||||
self.printLogLine(sys.stderr,"[UDPPIPE] No config file %s found at %s" % (PipeBase.CONFIG_FILE, PipeBase.CONFIG_DIR))
|
||||
self.setupOk=False
|
||||
return
|
||||
self.cfg=self.readConfig(path)
|
||||
#self.setupLogger(self.cfg)
|
||||
self.setupOk=True
|
||||
#self.log.info("[%s] init done" % (section))
|
||||
self.packetsIn=0
|
||||
self.packetsOut=0
|
||||
self.UDPBytesIn=0
|
||||
self.UDPBytesOut=0
|
||||
self.TCPBytesIn=0
|
||||
self.TCPBytesOut=0
|
||||
self.reconnects=0
|
||||
signal.signal(signal.SIGUSR1, self.toggleLogLevel)
|
||||
signal.signal(signal.SIGUSR2, self.logStats)
|
||||
self.startTime=datetime.datetime.now()
|
||||
|
||||
|
||||
def getTimeStamp(self):
|
||||
return time.strftime('%d.%m.%Y %H:%M:%S', time.localtime(time.time()))
|
||||
|
||||
def printLogLine(self, fl, message):
|
||||
fl.write('%s %s\n' % (self.getTimeStamp(), message))
|
||||
fl.flush()
|
||||
|
||||
def setupLogger(self, cfg):
|
||||
try:
|
||||
maxFileSize=cfg.getint(self.section, PipeBase.KEY_MAXFILESIZE)
|
||||
msgFormat=cfg.get(self.section, PipeBase.KEY_MSGFORMAT)
|
||||
self.logFileName=cfg.get(self.section, PipeBase.KEY_LOGFILENAME)
|
||||
self.log=logging.Logger(self.section)
|
||||
loghdl=RotatingFileHandler(self.logFileName, 'a', maxFileSize, 4)
|
||||
loghdl.setFormatter(logging.Formatter(msgFormat))
|
||||
loghdl.setLevel(cfg.getint(self.section, PipeBase.KEY_LOGLEVEL))
|
||||
self.log.addHandler(loghdl)
|
||||
self.log.disabled=False
|
||||
return True
|
||||
except exceptions.Exception, e:
|
||||
self.printLogLine(sys.stderr, "[UDPPIPE] Unable to initialize logging. Reason: %s" % e)
|
||||
return False
|
||||
|
||||
def toggleLogLevel(self, sigNo, stackFrame):
|
||||
if self.log.getEffectiveLevel()==10:
|
||||
newLevel=20
|
||||
else:
|
||||
newLevel=10
|
||||
self.log.setLevel(newLevel)
|
||||
|
||||
def logStats(self, sigNo, stackFrame):
|
||||
now=datetime.datetime.now()
|
||||
uptime=now-self.startTime
|
||||
self.log.info("[%s] %d Packets in, %d Packets out" % (self.section, self.packetsIn, self.packetsOut))
|
||||
self.log.info("[%s] UDP Traffic %d bytes in, %d bytes out, TCP Traffic %d bytes in, %d bytes out" % (self.section, self.UDPBytesIn, self.UDPBytesOut, self.TCPBytesIn, self.TCPBytesOut))
|
||||
self.log.info("[%s] Uptime %s, Reconnects %d" % (self.section, str(uptime), self.reconnects))
|
||||
|
||||
def readConfig(self, cfgFile):
|
||||
cfg=RawConfigParser(PipeBase.DEFAULTS)
|
||||
_=cfg.read(cfgFile)
|
||||
i=0
|
||||
self.listenerConfig=[]
|
||||
while True:
|
||||
i+=1
|
||||
section=PipeBase.KEY_PORTCONFIG % (i)
|
||||
if cfg.has_section(section):
|
||||
listenPort=cfg.getint(section, PipeBase.KEY_LISTENPORT)
|
||||
cfgId=cfg.get(section, PipeBase.KEY_CFGID)
|
||||
if cfgId==PipeBase.VALUE_CONFIG:
|
||||
self.printLogLine(sys.stderr, "WARN: Don't use ID %s for a port configuration" % (cfgId))
|
||||
if self.section==Head.SECTION:
|
||||
forwardHost=None
|
||||
forwardPort=None
|
||||
else:
|
||||
forwardHost=cfg.get(section, PipeBase.KEY_FORWARDHOST)
|
||||
forwardPort=cfg.getint(section, PipeBase.KEY_FORWARDPORT)
|
||||
self.listenerConfig.append([ cfgId, listenPort, None, forwardHost, forwardPort ])
|
||||
else:
|
||||
break
|
||||
return cfg
|
||||
|
||||
|
||||
|
||||
class Head(PipeBase):
|
||||
|
||||
SECTION="head"
|
||||
KEY_PIPEPORT="pipePort"
|
||||
KEY_TAILHOSTNAME="tailHostname"
|
||||
|
||||
|
||||
def __init__(self):
|
||||
PipeBase.__init__(self, Head.SECTION)
|
||||
self.readHeadConfig(self.cfg)
|
||||
self._terminate=False
|
||||
signal.signal(signal.SIGTERM, self.terminate)
|
||||
signal.signal(signal.SIGINT, self.terminate)
|
||||
|
||||
|
||||
def terminate(self, sigNo, stackFrame):
|
||||
if sigNo==signal.SIGINT:
|
||||
self.log.info("[Head] Terminating upon Keyboard Interrupt")
|
||||
if sigNo==signal.SIGTERM:
|
||||
self.log.info("[Head] Terminating upon Signal Term")
|
||||
self._terminate=True
|
||||
self.pipeSocket.close()
|
||||
|
||||
|
||||
def readHeadConfig(self, cfg):
|
||||
self.pipePort=cfg.getint(Head.SECTION, Head.KEY_PIPEPORT)
|
||||
self.enableHostNameCheck=cfg.has_option(Head.SECTION, Head.KEY_TAILHOSTNAME)
|
||||
if self.enableHostNameCheck:
|
||||
self.tailHostname=cfg.get(Head.SECTION, Head.KEY_TAILHOSTNAME)
|
||||
|
||||
def run(self):
|
||||
# Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors
|
||||
# in combination with the daemon function
|
||||
self.setupLogger(self.cfg)
|
||||
socketArray=[]
|
||||
# Step #1: Setup all the sockets
|
||||
self.pipeSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.pipeSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.pipeSocket.bind(('', self.pipePort))
|
||||
self.pipeSocket.listen(5)
|
||||
while not(self._terminate):
|
||||
try:
|
||||
self.log.info("[Head] Waiting for >Tail< to connect on port %d" % (self.pipePort))
|
||||
(clientSocket, address) = self.pipeSocket.accept()
|
||||
if self.enableHostNameCheck:
|
||||
data = socket.gethostbyname(self.tailHostname)
|
||||
ip = repr(data)
|
||||
if address[0]!=ip:
|
||||
self.log.warn("[Head] Connection attempt from wrong IP (%s but expected %s)" % (address[0], ip))
|
||||
clientSocket.close()
|
||||
continue
|
||||
self.log.info("[Head] Connection from tail at %s:%d" % (address[0], address[1]))
|
||||
self.reconnects+=1
|
||||
socketArray.append(clientSocket)
|
||||
tailConnection=True
|
||||
# now we are ready for incoming udp messages
|
||||
try:
|
||||
if len(socketArray)==1:
|
||||
for lstCfg in self.listenerConfig:
|
||||
lstCfg[2]=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
lstCfg[2].bind(('', lstCfg[1]))
|
||||
socketArray.append(lstCfg[2])
|
||||
self.log.info("[Head] UDP Listener <%s> on port <%d>" % (lstCfg[0], lstCfg[1]))
|
||||
except socket.error as e:
|
||||
if e.errno==errno.EADDRINUSE:
|
||||
self.log.error("[Head] Unable to listen on port %d - already in use" % (lstCfg[1]))
|
||||
else:
|
||||
self.log.error("[Head] Unable to listen on port %d - Error %d %s" % (lstCfg[1], e.errno, errno.errorcode[e.errno]))
|
||||
self._terminate=True
|
||||
clientSocket.close()
|
||||
continue
|
||||
# Step #2: listen on all the sockets
|
||||
lastReport=datetime.datetime.now()
|
||||
while not(self._terminate) and tailConnection:
|
||||
try:
|
||||
ready=select.select(socketArray, [], [], 3600)
|
||||
except select.error, (_errno, _strerror):
|
||||
if _errno == errno.EINTR:
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
if ready[0]:
|
||||
for r in ready[0]:
|
||||
if r==clientSocket:
|
||||
# we received something from the tail
|
||||
dta=r.recv(5)
|
||||
if len(dta)==0:
|
||||
#connection reset
|
||||
self.log.warn("[Head] >Tail< disconnected")
|
||||
socketArray.remove(clientSocket)
|
||||
clientSocket.close()
|
||||
tailConnection=False
|
||||
continue
|
||||
while(len(dta)<5):
|
||||
dta+=r.recv(5-len(dta))
|
||||
sockRd=eu.liebrand.udppipe.Utility.SockRead()
|
||||
buf=cStringIO.StringIO(dta)
|
||||
_,_,length=sockRd.read(buf)
|
||||
data=[]
|
||||
tmp=length
|
||||
while length>0:
|
||||
chunk=r.recv(length)
|
||||
data.append(chunk)
|
||||
length-=len(chunk)
|
||||
self.log.debug("[Head] Received %d bytes from >Tail<" % (tmp))
|
||||
self.TCPBytesIn+=tmp
|
||||
self.packetsIn+=1
|
||||
readDict=eu.liebrand.udppipe.Utility.ReadDictionary()
|
||||
fields=readDict.read(''.join(data))
|
||||
if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_PING:
|
||||
continue
|
||||
if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG:
|
||||
for k in fields.keys():
|
||||
if k!=PipeBase.FIELD_OP:
|
||||
found=False
|
||||
for lstCfg in self.listenerConfig:
|
||||
if k==lstCfg[0]:
|
||||
found=True
|
||||
# existing ID
|
||||
if fields[k]!=lstCfg[1]:
|
||||
# listening port changed
|
||||
self.log.info("[Head] Received new port for service id %s (old %d -> new %d)" % (k, lstCfg[1], fields[k]))
|
||||
lstCfg[2].close()
|
||||
socketArray.remove(lstCfg[2])
|
||||
lstCfg[1]=fields[k]
|
||||
lstCfg[2]=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
lstCfg[2].bind(('', lstCfg[1]))
|
||||
socketArray.append(lstCfg[2])
|
||||
if not(found):
|
||||
s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.bind(('', fields[k]))
|
||||
self.listenerConfig.append([ k, fields[k], s, None, None ])
|
||||
socketArray.append(s)
|
||||
self.log.info("[Head] New UDP Listener <%s> on port <%d>" % (k, fields[k]))
|
||||
for lstCfg in self.listenerConfig:
|
||||
found=False
|
||||
for k in fields.keys():
|
||||
if k==lstCfg[0]:
|
||||
found=True
|
||||
break
|
||||
if not(found):
|
||||
lstCfg[2].close()
|
||||
self.listenerConfig.remove(lstCfg)
|
||||
self.log.info("[Head] Deleted UDP Listener <%s> on port <%d>" % (lstCfg[0], lstCfg[1]))
|
||||
continue
|
||||
# find the outbound socket
|
||||
found=False
|
||||
for lstCfg in self.listenerConfig:
|
||||
if lstCfg[1]==fields['srvPort']:
|
||||
lstCfg[2].sendto(fields[PipeBase.FIELD_UDPDATA], (fields['host'], fields['port']))
|
||||
found=True
|
||||
self.log.debug("[Head] Forwarded response packet of %d bytes to %s:%d for port %d" % \
|
||||
(len(fields[PipeBase.FIELD_UDPDATA]), fields['host'], fields['port'], \
|
||||
fields['srvPort']))
|
||||
self.UDPBytesOut+=len(fields[PipeBase.FIELD_UDPDATA])
|
||||
self.packetsOut+=1
|
||||
break
|
||||
if not(found):
|
||||
self.log.warn("[Head] Received a response for an unknown client on port %d" % (fields['srvPort']))
|
||||
|
||||
for lstCfg in self.listenerConfig:
|
||||
if r==lstCfg[2]:
|
||||
# we have an inbound message
|
||||
udpData, address=r.recvfrom(4096)
|
||||
self.log.debug("[Head] Received %d bytes from %s:%d" % (len(udpData), address[0], address[1]))
|
||||
self.UDPBytesIn+=len(udpData)
|
||||
self.packetsIn+=1
|
||||
# we need to send udpData, listening Port, address
|
||||
util=eu.liebrand.udppipe.Utility.SockWrite()
|
||||
dataBuffer=cStringIO.StringIO()
|
||||
util.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_UDP, dataBuffer)
|
||||
util.writeString(PipeBase.FIELD_HOST, address[0], dataBuffer)
|
||||
util.writeLong(PipeBase.FIELD_PORT, address[1], dataBuffer)
|
||||
util.writeLong(PipeBase.FIELD_SRVPORT, lstCfg[1], dataBuffer)
|
||||
util.writeBinary(PipeBase.FIELD_UDPDATA, udpData, dataBuffer)
|
||||
dta=dataBuffer.getvalue()
|
||||
ctlBuffer=cStringIO.StringIO()
|
||||
util.writeLongDirect(len(dta), ctlBuffer)
|
||||
util.writeBinaryDirect(dta, ctlBuffer)
|
||||
dta=ctlBuffer.getvalue()
|
||||
bytesSnd=0
|
||||
while bytesSnd<len(dta):
|
||||
bytesSnd=bytesSnd+clientSocket.send(dta[bytesSnd:])
|
||||
self.log.debug("[Head] Send %d bytes to >Tail<" % (bytesSnd))
|
||||
self.TCPBytesOut+=bytesSnd
|
||||
self.packetsOut+=1
|
||||
dataBuffer.close()
|
||||
ctlBuffer.close()
|
||||
break
|
||||
now=datetime.datetime.now()
|
||||
if (now-lastReport).seconds>=3600:
|
||||
self.logStats(0, None)
|
||||
lastReport=now
|
||||
except socket.error as e:
|
||||
if e.errno == errno.EINTR and self._terminate:
|
||||
pass
|
||||
elif e.errno == errno.ECONNRESET:
|
||||
self.log.warn("[Head] Tail disconnected")
|
||||
socketArray.remove(clientSocket)
|
||||
else:
|
||||
raise
|
||||
except Exception as e:
|
||||
self.log.exception(e)
|
||||
|
||||
|
||||
class Tail(PipeBase):
|
||||
|
||||
SECTION="tail"
|
||||
KEY_HEADHOST="headHost"
|
||||
KEY_HEADPORT="headPort"
|
||||
KEY_TIMEOUT="timeout"
|
||||
WAIT4RETRY=300
|
||||
|
||||
|
||||
def __init__(self):
|
||||
PipeBase.__init__(self, Tail.SECTION)
|
||||
self._terminate=False
|
||||
signal.signal(signal.SIGTERM, self.terminate)
|
||||
signal.signal(signal.SIGINT, self.terminate)
|
||||
self.readTailConfig(self.cfg)
|
||||
self.sourceIds={}
|
||||
self.sourceIdLock=threading.Lock()
|
||||
self.responseQ=Queue.Queue()
|
||||
self.connected=False
|
||||
|
||||
|
||||
def readTailConfig(self, cfg):
|
||||
self.headHost=cfg.get(Tail.SECTION, Tail.KEY_HEADHOST)
|
||||
self.headPort=cfg.getint(Tail.SECTION, Tail.KEY_HEADPORT)
|
||||
self.timeout=cfg.getint(Tail.SECTION, Tail.KEY_TIMEOUT)
|
||||
|
||||
def terminate(self, sigNo, stackFrame):
|
||||
if sigNo==signal.SIGINT:
|
||||
self.log.info("[Tail] Terminating upon Keyboard Interrupt")
|
||||
if sigNo==signal.SIGTERM:
|
||||
self.log.info("[Tail] Terminating upon Signal Term")
|
||||
self._terminate=True
|
||||
if self.connected:
|
||||
os.write(self.controlPipe[1], 'x')
|
||||
for s in self.sourceIds.keys():
|
||||
v=self.sourceIds[s]
|
||||
v[0].put({})
|
||||
os.write(v[1][1], 'x')
|
||||
|
||||
def run(self):
|
||||
# Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors
|
||||
# in combination with the daemon function
|
||||
self.setupLogger(self.cfg)
|
||||
self.controlPipe=os.pipe()
|
||||
self.fds=[]
|
||||
self.fds.append(self.controlPipe[0])
|
||||
sockRd=eu.liebrand.udppipe.Utility.SockRead()
|
||||
sockWt=eu.liebrand.udppipe.Utility.SockWrite()
|
||||
|
||||
# Step #1: Connect to the head
|
||||
servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# wait for 2 seconds before trying to connect (avoid missing each other, when cron
|
||||
# starts head & tail at the same time
|
||||
time.sleep(2)
|
||||
lastReport=datetime.datetime.now()
|
||||
while not(self._terminate):
|
||||
try:
|
||||
servSocket.connect((self.headHost,self.headPort))
|
||||
self.connected=True
|
||||
self.log.info("[Tail] Connected to >Head< at %s:%d" % (self.headHost,self.headPort))
|
||||
self.fds.append(servSocket)
|
||||
|
||||
# send config
|
||||
dataBuffer=cStringIO.StringIO()
|
||||
sockWt.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_CONFIG, dataBuffer)
|
||||
for lstCfg in self.listenerConfig:
|
||||
sockWt.writeLong(lstCfg[0], lstCfg[1], dataBuffer)
|
||||
dta=dataBuffer.getvalue()
|
||||
ctlBuffer=cStringIO.StringIO()
|
||||
sockWt.writeLongDirect(len(dta), ctlBuffer)
|
||||
sockWt.writeBinaryDirect(dta, ctlBuffer)
|
||||
dta=ctlBuffer.getvalue()
|
||||
bytesSnd=0
|
||||
while bytesSnd<len(dta):
|
||||
bytesSnd=bytesSnd+servSocket.send(dta[bytesSnd:])
|
||||
dataBuffer.close()
|
||||
ctlBuffer.close()
|
||||
self.log.info("[Tail] Send %d UDP port configs to head" % (len(self.listenerConfig)))
|
||||
|
||||
while not(self._terminate) and self.connected:
|
||||
try:
|
||||
ready=select.select(self.fds, [], [], 60)
|
||||
except select.error, (_errno, _strerror):
|
||||
if _errno == errno.EINTR:
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
if len(ready[0])==0:
|
||||
# send something every 60 seconds to avoid a timeout on the connection
|
||||
self.log.debug("[Tail] Sending ping to head")
|
||||
dataBuffer=cStringIO.StringIO()
|
||||
sockWt.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_PING, dataBuffer)
|
||||
dta=dataBuffer.getvalue()
|
||||
ctlBuffer=cStringIO.StringIO()
|
||||
sockWt.writeLongDirect(len(dta), ctlBuffer)
|
||||
sockWt.writeBinaryDirect(dta, ctlBuffer)
|
||||
dta=ctlBuffer.getvalue()
|
||||
bytesSnd=0
|
||||
while bytesSnd<len(dta):
|
||||
bytesSnd=bytesSnd+servSocket.send(dta[bytesSnd:])
|
||||
dataBuffer.close()
|
||||
ctlBuffer.close()
|
||||
now=datetime.datetime.now()
|
||||
if (now-lastReport).seconds>=3600:
|
||||
self.logStats(0, None)
|
||||
lastReport=now
|
||||
continue
|
||||
for r in ready[0]:
|
||||
if r==servSocket:
|
||||
dta=servSocket.recv(5)
|
||||
if len(dta)==0:
|
||||
# Head has gone
|
||||
self.connected=False
|
||||
servSocket.close()
|
||||
continue
|
||||
while(len(dta)<5):
|
||||
dta+=r.recv(5-len(dta))
|
||||
buf=cStringIO.StringIO(dta)
|
||||
_,_,length=sockRd.read(buf)
|
||||
self.log.debug("[Tail] Received %ld bytes from >Head<" % (length))
|
||||
data=[]
|
||||
while length>0:
|
||||
chunk=r.recv(length)
|
||||
data.append(chunk)
|
||||
length-=len(chunk)
|
||||
self.TCPBytesIn+=len(chunk)
|
||||
self.packetsIn+=1
|
||||
readDict=eu.liebrand.udppipe.Utility.ReadDictionary()
|
||||
fields=readDict.read(''.join(data))
|
||||
# received the data as dict - now we need to find out whether we already have thread
|
||||
# for host:port running
|
||||
if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_UDP:
|
||||
sourceId=str(fields[PipeBase.FIELD_SRVPORT]) + "@" + fields[PipeBase.FIELD_HOST] + ":" + str(fields[PipeBase.FIELD_PORT])
|
||||
self.sourceIdLock.acquire()
|
||||
if self.sourceIds.has_key(sourceId):
|
||||
self.log.debug("[Tail] Adding packet to existing handler")
|
||||
self.sourceIds[sourceId][0].put(fields)
|
||||
#wake up thread
|
||||
os.write(self.sourceIds[sourceId][1][1],'x')
|
||||
else:
|
||||
self.log.debug("[Tail] Creating new handler for source id %s" % (sourceId))
|
||||
found=False
|
||||
for lstCfg in self.listenerConfig:
|
||||
if lstCfg[1]==fields[PipeBase.FIELD_SRVPORT]:
|
||||
found=True
|
||||
break
|
||||
if found:
|
||||
q=Queue.Queue()
|
||||
q.put(fields)
|
||||
self.sourceIds[sourceId]=[q, os.pipe()]
|
||||
t=threading.Thread(target=self.handleUdpPacket, args=(lstCfg, sourceId, fields))
|
||||
t.daemon=True
|
||||
t.start()
|
||||
else:
|
||||
self.log.error("[TAIL] Received UDP Packet for port %d without having a forward configured" % fields[PipeBase.FIELD_SRVPORT])
|
||||
self.sourceIdLock.release()
|
||||
if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG:
|
||||
#todo send a dict / json with the head configuration over
|
||||
pass
|
||||
if r==self.controlPipe[0]:
|
||||
os.read(self.controlPipe[0],1)
|
||||
data=self.responseQ.get()
|
||||
dataBuffer=cStringIO.StringIO()
|
||||
sockWt.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_UDP, dataBuffer)
|
||||
sockWt.writeString(PipeBase.FIELD_HOST, data[PipeBase.FIELD_HOST], dataBuffer)
|
||||
sockWt.writeLong(PipeBase.FIELD_PORT, data[PipeBase.FIELD_PORT], dataBuffer)
|
||||
sockWt.writeLong(PipeBase.FIELD_SRVPORT, data[PipeBase.FIELD_SRVPORT], dataBuffer)
|
||||
sockWt.writeBinary(PipeBase.FIELD_UDPDATA, data[PipeBase.FIELD_UDPDATA], dataBuffer)
|
||||
dta=dataBuffer.getvalue()
|
||||
ctlBuffer=cStringIO.StringIO()
|
||||
sockWt.writeLongDirect(len(dta), ctlBuffer)
|
||||
sockWt.writeBinaryDirect(dta, ctlBuffer)
|
||||
dta=ctlBuffer.getvalue()
|
||||
bytesSnd=0
|
||||
while bytesSnd<len(dta):
|
||||
bytesSnd=bytesSnd+servSocket.send(dta[bytesSnd:])
|
||||
dataBuffer.close()
|
||||
ctlBuffer.close()
|
||||
self.responseQ.task_done()
|
||||
self.TCPBytesOut+=bytesSnd
|
||||
self.packetsOut+=1
|
||||
self.log.debug("[Tail] Forwarded response packet of %d bytes for listening port %d from client %s:%d to >Head<" % \
|
||||
(len(data[PipeBase.FIELD_UDPDATA]), data[PipeBase.FIELD_SRVPORT], data[PipeBase.FIELD_HOST], data[PipeBase.FIELD_PORT] ) )
|
||||
except socket.error as e:
|
||||
self.connected=False
|
||||
if e.errno == errno.EINTR and self._terminate:
|
||||
pass
|
||||
elif e.errno==errno.ECONNREFUSED or e.errno==errno.EBADF or e.errno==errno.ECONNRESET:
|
||||
self.log.warn("[Tail] Unable to connect to host %s:%d. Will try again in %d seconds.(Reason %s)" % (self.headHost,self.headPort, Tail.WAIT4RETRY, str(e)))
|
||||
servSocket.close()
|
||||
self.connected=False
|
||||
time.sleep(Tail.WAIT4RETRY)
|
||||
else:
|
||||
raise
|
||||
self.logStats(0, None)
|
||||
self.log.info("[Tail] Terminating")
|
||||
|
||||
def handleUdpPacket(self, listenerCfg, sourceId, fields):
|
||||
queue=self.sourceIds[sourceId][0]
|
||||
localfds=self.sourceIds[sourceId][1]
|
||||
udpSocket=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
lastAction=datetime.datetime.now()
|
||||
initial=True
|
||||
while not(self._terminate) and (datetime.datetime.now()-lastAction).seconds<self.timeout:
|
||||
if not(initial):
|
||||
ready=select.select([localfds[0], udpSocket], [], [], self.timeout)
|
||||
else:
|
||||
ready=[[localfds[0]],]
|
||||
os.write(localfds[1], 'x')
|
||||
initial=False
|
||||
for r in ready[0]:
|
||||
if r==localfds[0]:
|
||||
os.read(localfds[0],1)
|
||||
while not(queue.empty()):
|
||||
if self._terminate:
|
||||
continue
|
||||
#output udp, we should have sthg in the queue
|
||||
try:
|
||||
data=queue.get(True, 5)
|
||||
if self._terminate or len(data.keys())==0:
|
||||
continue
|
||||
udpSocket.sendto(data[PipeBase.FIELD_UDPDATA], (listenerCfg[PipeBase.IDX_FORWARDHOST], listenerCfg[PipeBase.IDX_FORWARDPORT]))
|
||||
queue.task_done()
|
||||
self.UDPBytesOut+=len(data[PipeBase.FIELD_UDPDATA])
|
||||
self.packetsOut+=1
|
||||
self.log.debug("[Tail] Send %d bytes to local address %s:%d" % (len(data[PipeBase.FIELD_UDPDATA]), listenerCfg[PipeBase.IDX_FORWARDHOST], listenerCfg[PipeBase.IDX_FORWARDPORT]))
|
||||
except Queue.Empty:
|
||||
pass
|
||||
lastAction=datetime.datetime.now()
|
||||
if r==udpSocket:
|
||||
#inbound udp, need to pass it back to head
|
||||
udpData, address=udpSocket.recvfrom(4096)
|
||||
data={}
|
||||
data[PipeBase.FIELD_UDPDATA]=udpData
|
||||
data[PipeBase.FIELD_HOST]=fields[PipeBase.FIELD_HOST]
|
||||
data[PipeBase.FIELD_PORT]=fields[PipeBase.FIELD_PORT]
|
||||
data[PipeBase.FIELD_SRVPORT]=fields[PipeBase.FIELD_SRVPORT]
|
||||
self.responseQ.put(data)
|
||||
os.write(self.controlPipe[1], 'x')
|
||||
self.log.debug("[Tail] Received %d bytes from local address %s:%d" % (len(udpData), address[0], address[1]))
|
||||
lastAction=datetime.datetime.now()
|
||||
|
||||
# upon exit we need to remove the queue object to avoid receiving more requests
|
||||
self.sourceIdLock.acquire()
|
||||
del self.sourceIds[sourceId]
|
||||
self.sourceIdLock.release()
|
||||
self.log.debug("[Tail] Removed handler for source id %s" % (sourceId))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = OptionParser('"usage: %prog start|stop [option]')
|
||||
parser.add_option("-H", "--head", action="store_const", const="head", dest="mode",
|
||||
help="run as head end of the pipe")
|
||||
parser.add_option("-T", "--tail", action="store_const", const="tail", dest="mode",
|
||||
help="run as tail end of the pipe")
|
||||
(options, args) = parser.parse_args()
|
||||
if(len(args)==0):
|
||||
print "specify start|stop|restart"
|
||||
sys.exit()
|
||||
if options.mode=="head":
|
||||
head=Head()
|
||||
daemon_runner = runner.DaemonRunner(head)
|
||||
daemon_runner.do_action()
|
||||
if options.mode=="tail":
|
||||
tail=Tail()
|
||||
daemon_runner = runner.DaemonRunner(tail)
|
||||
daemon_runner.do_action()
|
||||
|
15
udppipe<template head>.ini
Normal file
15
udppipe<template head>.ini
Normal file
@ -0,0 +1,15 @@
|
||||
# rename the template to udppipe.ini in order to use it
|
||||
# once started lock at /tmp/udppipe.log
|
||||
|
||||
[head]
|
||||
|
||||
# Tail connects to this port port via TCP. UDP packets will be relayed through this connection
|
||||
# this is the only mandatory key
|
||||
pipePort=16001
|
||||
|
||||
# optional key - the ip of the connecting client needs to resolve to the hostname provided here
|
||||
# use a dynamic dns provider to assign if you don't have a fixed ip / hostname
|
||||
#tailHostname=<hostname>
|
||||
|
||||
# optional key - 10 = debug, 20 = info
|
||||
#logLevel=20
|
29
udppipe<template tail>.ini
Normal file
29
udppipe<template tail>.ini
Normal file
@ -0,0 +1,29 @@
|
||||
# rename the template to udppipe.ini in order to use it
|
||||
# once started lock at /tmp/udppipe.log
|
||||
|
||||
[tail]
|
||||
# the following two keys tell thead app where to connect to
|
||||
headHost=<hostname>
|
||||
headPort=<port>
|
||||
# timeout in seconds to wait for a response or the next incoming packet giving up / cleaning up
|
||||
timeout=60
|
||||
|
||||
# see head ini file
|
||||
#logLevel=20
|
||||
|
||||
|
||||
# portConfigs need to be numberer sequentially starting with index 1. Reading the config will stop at the first "missing" number.
|
||||
# n - integer
|
||||
# Example:
|
||||
#[portConfig_1]
|
||||
#id=TEST
|
||||
#listenPort=12345
|
||||
#forwardHost=192.168.0.3
|
||||
#forwardPort=12345
|
||||
|
||||
[portConfig_<n>]
|
||||
id=<string identifier, something meaningfull for you>
|
||||
listenPort=<port the head application should listen on for incoming udp packets>
|
||||
forwardHost=<hostname in the local network tail is running>
|
||||
forwardPort=<port number where the target service is running in the LAN>
|
||||
|
63
udptest.py
Normal file
63
udptest.py
Normal file
@ -0,0 +1,63 @@
|
||||
#! /usr/bin/env python
|
||||
|
||||
# Client and server for udp (datagram) echo.
|
||||
#
|
||||
# Usage: udpecho -s [port] (to start a server)
|
||||
# or: udpecho -c host [port] <file (client)
|
||||
|
||||
import sys
|
||||
from socket import *
|
||||
|
||||
ECHO_PORT = 50000 + 7
|
||||
BUFSIZE = 1024
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
usage()
|
||||
if sys.argv[1] == '-s':
|
||||
server()
|
||||
elif sys.argv[1] == '-c':
|
||||
client()
|
||||
else:
|
||||
usage()
|
||||
|
||||
def usage():
|
||||
sys.stdout = sys.stderr
|
||||
print 'Usage: udpecho -s [port] (server)'
|
||||
print 'or: udpecho -c host [port] <file (client)'
|
||||
sys.exit(2)
|
||||
|
||||
def server():
|
||||
if len(sys.argv) > 2:
|
||||
port = eval(sys.argv[2])
|
||||
else:
|
||||
port = ECHO_PORT
|
||||
s = socket(AF_INET, SOCK_DGRAM)
|
||||
s.bind(('', port))
|
||||
print 'udp echo server ready'
|
||||
while 1:
|
||||
data, addr = s.recvfrom(BUFSIZE)
|
||||
print 'server received %r from %r' % (data, addr)
|
||||
s.sendto(data, addr)
|
||||
|
||||
def client():
|
||||
if len(sys.argv) < 3:
|
||||
usage()
|
||||
host = sys.argv[2]
|
||||
if len(sys.argv) > 3:
|
||||
port = eval(sys.argv[3])
|
||||
else:
|
||||
port = ECHO_PORT
|
||||
addr = host, port
|
||||
s = socket(AF_INET, SOCK_DGRAM)
|
||||
s.bind(('', 0))
|
||||
print 'udp echo client ready, reading stdin'
|
||||
while 1:
|
||||
line = sys.stdin.readline()
|
||||
if not line:
|
||||
break
|
||||
s.sendto(line, addr)
|
||||
data, fromaddr = s.recvfrom(BUFSIZE)
|
||||
print 'client received %r from %r' % (data, fromaddr)
|
||||
|
||||
main()
|
Loading…
Reference in New Issue
Block a user