diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000..450682d
Binary files /dev/null and b/.DS_Store differ
diff --git a/README.md b/README.md
deleted file mode 100644
index e61fcce..0000000
--- a/README.md
+++ /dev/null
@@ -1,5 +0,0 @@
-# UdpPipe
-A UDP Traffic Routing Utility
-
-For more details, please have a look at this page: http://www.liebrandapps.com/udppipe-a-udp-traffic-routing-utility/
-
diff --git a/UdpPipe/.DS_Store b/UdpPipe/.DS_Store
new file mode 100644
index 0000000..49a5c9e
Binary files /dev/null and b/UdpPipe/.DS_Store differ
diff --git a/UdpPipe/.project b/UdpPipe/.project
new file mode 100644
index 0000000..b2ede09
--- /dev/null
+++ b/UdpPipe/.project
@@ -0,0 +1,17 @@
+
+
+ UdpPipe
+
+
+
+
+
+ org.python.pydev.PyDevBuilder
+
+
+
+
+
+ org.python.pydev.pythonNature
+
+
diff --git a/UdpPipe/.pydevproject b/UdpPipe/.pydevproject
new file mode 100644
index 0000000..037bd25
--- /dev/null
+++ b/UdpPipe/.pydevproject
@@ -0,0 +1,8 @@
+
+
+
+/${PROJECT_DIR_NAME}
+
+python 2.7
+Default
+
diff --git a/UdpPipe/eu/.DS_Store b/UdpPipe/eu/.DS_Store
new file mode 100644
index 0000000..bd4d1fe
Binary files /dev/null and b/UdpPipe/eu/.DS_Store differ
diff --git a/UdpPipe/eu/.gitignore b/UdpPipe/eu/.gitignore
new file mode 100644
index 0000000..0479089
--- /dev/null
+++ b/UdpPipe/eu/.gitignore
@@ -0,0 +1 @@
+/__init__.pyc
diff --git a/UdpPipe/eu/liebrand/udppipe/.gitignore b/UdpPipe/eu/liebrand/udppipe/.gitignore
new file mode 100644
index 0000000..a822e02
--- /dev/null
+++ b/UdpPipe/eu/liebrand/udppipe/.gitignore
@@ -0,0 +1,2 @@
+/Utility.pyc
+/__init__.pyc
diff --git a/UdpPipe/eu/liebrand/udppipe/__init__.py b/UdpPipe/eu/liebrand/udppipe/__init__.py
index ab8ab9d..0a520ae 100644
--- a/UdpPipe/eu/liebrand/udppipe/__init__.py
+++ b/UdpPipe/eu/liebrand/udppipe/__init__.py
@@ -17,56 +17,137 @@ import datetime
from optparse import OptionParser
import errno
from daemon import runner
+from threading import Thread
+import uuid
+from datetime import date
+from Crypto.PublicKey import RSA
+from Crypto.Hash import SHA256
+from Crypto.Signature import PKCS1_v1_5
+import json
+
+
+
+
+class Config:
+
+ STRING_KEYS=["msgFormat", "logFileName", "secretKey", "id", "forwardHost", \
+ "headHost", "privateKey", "publicKey", "certificate"]
+ INT_KEYS=["maxFilesize", "listenPort", "adminPort", "forwardPort", "logLevel", "headPort", "timeout", "pipePort"]
+ BOOLEAN_KEYS=["enableLogging", "enableAdmin"]
+
+ DEFAULTS={"enableLogging" :"yes",
+ "logFileName" : "/tmp/udppipe.log",
+ "maxFilesize" : 1000000,
+ "msgFormat" : "%(asctime)s, %(levelname)s, %(module)s, %(lineno)d, %(message)s",
+ "logLevel" :20,
+ "enableAdmin" :"no"
+ }
+
+
+ def __init__(self, cfgFile, section):
+ self.section=section
+ self.cfg=RawConfigParser(Config.DEFAULTS)
+ _=self.cfg.read(cfgFile)
+
+ def hasKey(self, dct, key):
+ k=key.upper()
+ for d in dct:
+ if d.upper() == k:
+ return d
+ return None
+
+ def hasSection(self, section):
+ return self.cfg.has_section(section)
+
+ def hasOption(self, option):
+ return self.cfg.has_option(self.section, option)
+
+ def __getattr__(self, name):
+ key=self.hasKey(Config.STRING_KEYS, name)
+ if not key is None:
+ return self.cfg.get(self.section, key)
+ key=self.hasKey(Config.INT_KEYS, name)
+ if not key is None:
+ return self.cfg.getint(self.section, key)
+ key=self.hasKey(Config.BOOLEAN_KEYS, name)
+ if not key is None:
+ return self.cfg.getboolean(self.section, key)
+ return None
+
+ def setSection(self, newSection):
+ tmp=self.section
+ self.section=newSection
+ return tmp
+
+class DateTimeEncoder(json.JSONEncoder):
+
+
+ def default(self, obj):
+ if isinstance(obj, datetime.datetime):
+ return {
+ '__type__' : 'seconds',
+ 'seconds' : time.mktime(obj.timetuple()),
+ }
+ else:
+ return json.JSONEncoder.default(self, obj)
+
+class DateTimeDecoder(json.JSONDecoder):
+
+ def __init__(self, *args, **kargs):
+ json.JSONDecoder.__init__(self, object_hook=self.dict_to_object,
+ *args, **kargs)
+
+ def dict_to_object(self, d):
+ if '__type__' not in d:
+ return d
+
+ return datetime.datetime.fromtimestamp(d['seconds'])
+
class PipeBase:
CONFIG_DIR="./"
CONFIG_FILE="udppipe.ini"
- KEY_ENABLELOGGING="EnableLogging"
- KEY_LOGFILENAME="logFileName"
- KEY_MAXFILESIZE="MaxFilesize"
- KEY_MSGFORMAT="MsgFormat"
- KEY_LOGLEVEL="logLevel"
- KEY_PORTCONFIG="portConfig_%d"
- KEY_LISTENPORT="listenPort"
- KEY_CFGID="id"
- KEY_FORWARDHOST="forwardHost"
- KEY_FORWARDPORT="forwardPort"
+ SECTION_PORTCONFIG="portConfig_%d"
+
+ KEY_CFGFILE="__cfgFile"
FIELD_HOST='host'
FIELD_PORT='port'
FIELD_OP='OP'
FIELD_SRVPORT='srvPort'
FIELD_UDPDATA='udpData'
+ FIELD_PRIVKEY="privateKey"
+ FIELD_PUBKEY="publicKey"
+ FIELD_CERTIFICATE="certificate"
+ FIELD_ADMINPORT="adminPort"
+ FIELD_ADMINSTATUS="adminStatus"
VALUE_UDP='udp'
VALUE_PING='ping'
VALUE_CONFIG='__cfg__'
+ VALUE_KEY='__key__'
IDX_FORWARDHOST=3
IDX_FORWARDPORT=4
- DEFAULTS={KEY_ENABLELOGGING :"yes",
- KEY_LOGFILENAME : "/tmp/udppipe.log",
- KEY_MAXFILESIZE : 1000000,
- KEY_MSGFORMAT : "%(asctime)s, %(levelname)s, %(module)s, %(lineno)d, %(message)s",
- KEY_LOGLEVEL : 20
- }
def __init__(self, section):
- self.stdin_path = '/dev/null'
- self.stdout_path = '/dev/tty'
- self.stderr_path = '/dev/tty'
- self.pidfile_path = '/tmp/udppipe.pid'
- self.pidfile_timeout = 5
+
self.section=section
path=join(PipeBase.CONFIG_DIR, PipeBase.CONFIG_FILE)
if not(exists(path)):
self.printLogLine(sys.stderr,"[UDPPIPE] No config file %s found at %s" % (PipeBase.CONFIG_FILE, PipeBase.CONFIG_DIR))
self.setupOk=False
return
- self.cfg=self.readConfig(path)
+ self.cfg=Config(path, section)
+ self.readConfig(self.cfg)
+ self.stdin_path = '/dev/null'
+ self.stdout_path = self.cfg.logFileName
+ self.stderr_path = self.cfg.logFileName
+ self.pidfile_path = '/tmp/udppipe.pid'
+ self.pidfile_timeout = 5
#self.setupLogger(self.cfg)
self.setupOk=True
#self.log.info("[%s] init done" % (section))
@@ -80,6 +161,7 @@ class PipeBase:
signal.signal(signal.SIGUSR1, self.toggleLogLevel)
signal.signal(signal.SIGUSR2, self.logStats)
self.startTime=datetime.datetime.now()
+ self.lastPing=None
def getTimeStamp(self):
@@ -91,13 +173,10 @@ class PipeBase:
def setupLogger(self, cfg):
try:
- maxFileSize=cfg.getint(self.section, PipeBase.KEY_MAXFILESIZE)
- msgFormat=cfg.get(self.section, PipeBase.KEY_MSGFORMAT)
- self.logFileName=cfg.get(self.section, PipeBase.KEY_LOGFILENAME)
self.log=logging.Logger(self.section)
- loghdl=RotatingFileHandler(self.logFileName, 'a', maxFileSize, 4)
- loghdl.setFormatter(logging.Formatter(msgFormat))
- loghdl.setLevel(cfg.getint(self.section, PipeBase.KEY_LOGLEVEL))
+ loghdl=RotatingFileHandler(cfg.logFileName, 'a', cfg.maxFilesize, 4)
+ loghdl.setFormatter(logging.Formatter(cfg.msgFormat))
+ loghdl.setLevel(cfg.logLevel)
self.log.addHandler(loghdl)
self.log.disabled=False
return True
@@ -118,27 +197,33 @@ class PipeBase:
self.log.info("[%s] %d Packets in, %d Packets out" % (self.section, self.packetsIn, self.packetsOut))
self.log.info("[%s] UDP Traffic %d bytes in, %d bytes out, TCP Traffic %d bytes in, %d bytes out" % (self.section, self.UDPBytesIn, self.UDPBytesOut, self.TCPBytesIn, self.TCPBytesOut))
self.log.info("[%s] Uptime %s, Reconnects %d" % (self.section, str(uptime), self.reconnects))
+ if self.lastPing is None:
+ self.log.info("[%s] Last Ping: never")
+ else:
+ ago=now-self.lastPing
+ self.log.info("[%s] Last Ping: %s (%s ago)" % (self.section, str(self.lastPing), str(ago)))
- def readConfig(self, cfgFile):
- cfg=RawConfigParser(PipeBase.DEFAULTS)
- _=cfg.read(cfgFile)
+ def readConfig(self, cfg):
+ #cfg.set(self.section, PipeBase.KEY_CFGFILE)
i=0
self.listenerConfig=[]
while True:
i+=1
- section=PipeBase.KEY_PORTCONFIG % (i)
- if cfg.has_section(section):
- listenPort=cfg.getint(section, PipeBase.KEY_LISTENPORT)
- cfgId=cfg.get(section, PipeBase.KEY_CFGID)
+ section=PipeBase.SECTION_PORTCONFIG % (i)
+ if cfg.hasSection(section):
+ tmpSection=cfg.setSection(section)
+ listenPort=cfg.listenPort
+ cfgId=cfg.id
if cfgId==PipeBase.VALUE_CONFIG:
self.printLogLine(sys.stderr, "WARN: Don't use ID %s for a port configuration" % (cfgId))
if self.section==Head.SECTION:
forwardHost=None
forwardPort=None
else:
- forwardHost=cfg.get(section, PipeBase.KEY_FORWARDHOST)
- forwardPort=cfg.getint(section, PipeBase.KEY_FORWARDPORT)
+ forwardHost=cfg.forwardHost
+ forwardPort=cfg.forwardPort
self.listenerConfig.append([ cfgId, listenPort, None, forwardHost, forwardPort ])
+ cfg.setSection(tmpSection)
else:
break
return cfg
@@ -158,6 +243,8 @@ class Head(PipeBase):
self._terminate=False
signal.signal(signal.SIGTERM, self.terminate)
signal.signal(signal.SIGINT, self.terminate)
+ self.adminSocket=None
+ self._terminateAdmin=False
def terminate(self, sigNo, stackFrame):
@@ -167,24 +254,38 @@ class Head(PipeBase):
self.log.info("[Head] Terminating upon Signal Term")
self._terminate=True
self.pipeSocket.close()
+ if self.adminSocket is not None:
+ self._terminateAdmin=True
+ self.adminSocket.close()
+ self.adminSocket=None
+
def readHeadConfig(self, cfg):
- self.pipePort=cfg.getint(Head.SECTION, Head.KEY_PIPEPORT)
- self.enableHostNameCheck=cfg.has_option(Head.SECTION, Head.KEY_TAILHOSTNAME)
+ self.pipePort=cfg.pipePort
+ self.enableHostNameCheck=cfg.hasOption("tailHostname")
if self.enableHostNameCheck:
- self.tailHostname=cfg.get(Head.SECTION, Head.KEY_TAILHOSTNAME)
+ self.tailHostname=cfg.tailHostname
+ self.enableAdmin=cfg.enableAdmin
+ if self.enableAdmin:
+ self.adminPort=cfg.adminPort
+ self.publicKey=cfg.publicKey
def run(self):
# Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors
# in combination with the daemon function
self.setupLogger(self.cfg)
socketArray=[]
+
# Step #1: Setup all the sockets
self.pipeSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.pipeSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.pipeSocket.bind(('', self.pipePort))
self.pipeSocket.listen(5)
+ thrd=None
+ self.isThreadRunning=False
+ self.controlPipe=os.pipe()
+ socketArray.append(self.controlPipe[0])
while not(self._terminate):
try:
self.log.info("[Head] Waiting for >Tail< to connect on port %d" % (self.pipePort))
@@ -196,9 +297,6 @@ class Head(PipeBase):
clientSocket.close()
continue
self.log.info("[Head] Connection from tail at %s:%d" % (address[0], address[1]))
- self.reconnects+=1
- socketArray.append(clientSocket)
- tailConnection=True
# now we are ready for incoming udp messages
try:
if len(socketArray)==1:
@@ -212,158 +310,285 @@ class Head(PipeBase):
self.log.error("[Head] Unable to listen on port %d - already in use" % (lstCfg[1]))
else:
self.log.error("[Head] Unable to listen on port %d - Error %d %s" % (lstCfg[1], e.errno, errno.errorcode[e.errno]))
- self._terminate=True
- clientSocket.close()
- continue
- # Step #2: listen on all the sockets
- lastReport=datetime.datetime.now()
- while not(self._terminate) and tailConnection:
- try:
- ready=select.select(socketArray, [], [], 180)
- except select.error, (_errno, _strerror):
- if _errno == errno.EINTR:
- continue
- else:
- raise
- if ready[0]:
- for r in ready[0]:
- if r==clientSocket:
- # we received something from the tail
- dta=r.recv(5)
- if len(dta)==0:
- #connection reset
- self.log.warn("[Head] >Tail< disconnected")
- socketArray.remove(clientSocket)
- clientSocket.close()
- tailConnection=False
- continue
- while(len(dta)<5):
- dta+=r.recv(5-len(dta))
- sockRd=eu.liebrand.udppipe.Utility.SockRead()
- buf=cStringIO.StringIO(dta)
- _,_,length=sockRd.read(buf)
- data=[]
- tmp=length
- while length>0:
- chunk=r.recv(length)
- data.append(chunk)
- length-=len(chunk)
- self.log.debug("[Head] Received %d bytes from >Tail<" % (tmp))
- self.TCPBytesIn+=tmp
- self.packetsIn+=1
- readDict=eu.liebrand.udppipe.Utility.ReadDictionary()
- fields=readDict.read(''.join(data))
- if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_PING:
- continue
- if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG:
- for k in fields.keys():
- if k!=PipeBase.FIELD_OP:
- found=False
- for lstCfg in self.listenerConfig:
- if k==lstCfg[0]:
- found=True
- # existing ID
- if fields[k]!=lstCfg[1]:
- # listening port changed
- self.log.info("[Head] Received new port for service id %s (old %d -> new %d)" % (k, lstCfg[1], fields[k]))
- lstCfg[2].close()
- socketArray.remove(lstCfg[2])
- lstCfg[1]=fields[k]
- lstCfg[2]=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- lstCfg[2].bind(('', lstCfg[1]))
- socketArray.append(lstCfg[2])
- if not(found):
- s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.bind(('', fields[k]))
- self.listenerConfig.append([ k, fields[k], s, None, None ])
- socketArray.append(s)
- self.log.info("[Head] New UDP Listener <%s> on port <%d>" % (k, fields[k]))
- for lstCfg in self.listenerConfig:
- found=False
- for k in fields.keys():
- if k==lstCfg[0]:
- found=True
- break
- if not(found):
- lstCfg[2].close()
- self.listenerConfig.remove(lstCfg)
- socketArray.remove(lstCfg[2])
- self.log.info("[Head] Deleted UDP Listener <%s> on port <%d>" % (lstCfg[0], lstCfg[1]))
- continue
- # find the outbound socket
- found=False
- for lstCfg in self.listenerConfig:
- if lstCfg[1]==fields['srvPort']:
- lstCfg[2].sendto(fields[PipeBase.FIELD_UDPDATA], (fields['host'], fields['port']))
- found=True
- self.log.debug("[Head] Forwarded response packet of %d bytes to %s:%d for port %d" % \
- (len(fields[PipeBase.FIELD_UDPDATA]), fields['host'], fields['port'], \
- fields['srvPort']))
- self.UDPBytesOut+=len(fields[PipeBase.FIELD_UDPDATA])
- self.packetsOut+=1
- break
- if not(found):
- self.log.warn("[Head] Received a response for an unknown client on port %d" % (fields['srvPort']))
- continue
-
- for lstCfg in self.listenerConfig:
- if r==lstCfg[2]:
- # we have an inbound message
- udpData, address=r.recvfrom(4096)
- self.log.debug("[Head] Received %d bytes from %s:%d" % (len(udpData), address[0], address[1]))
- self.UDPBytesIn+=len(udpData)
- self.packetsIn+=1
- # we need to send udpData, listening Port, address
- util=eu.liebrand.udppipe.Utility.SockWrite()
- dataBuffer=cStringIO.StringIO()
- util.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_UDP, dataBuffer)
- util.writeString(PipeBase.FIELD_HOST, address[0], dataBuffer)
- util.writeLong(PipeBase.FIELD_PORT, address[1], dataBuffer)
- util.writeLong(PipeBase.FIELD_SRVPORT, lstCfg[1], dataBuffer)
- util.writeBinary(PipeBase.FIELD_UDPDATA, udpData, dataBuffer)
- dta=dataBuffer.getvalue()
- ctlBuffer=cStringIO.StringIO()
- util.writeLongDirect(len(dta), ctlBuffer)
- util.writeBinaryDirect(dta, ctlBuffer)
- dta=ctlBuffer.getvalue()
- bytesSnd=0
- while bytesSndTail<" % (bytesSnd))
- self.TCPBytesOut+=bytesSnd
- self.packetsOut+=1
- dataBuffer.close()
- ctlBuffer.close()
- break
- else:
- self.log.warn("[Head] No activity for 180 seconds, assumung Tail is absent")
- socketArray.remove(clientSocket)
- tailConnection=False
- now=datetime.datetime.now()
- if (now-lastReport).seconds>=(3600*24):
- self.logStats(0, None)
- lastReport=now
+ self._terminate=True
+ clientSocket.close()
+ continue
+ self.reconnects+=1
+ socketArray.append(clientSocket)
+ if self.isThreadRunning:
+ self._terminateThread=True
+ os.write(self.controlPipe[1], 'x')
+ thrd.join()
+ thrd=Thread(target=self.handleMessages, args=(socketArray, clientSocket))
+ thrd.start()
+
except socket.error as e:
- if e.errno == errno.EINTR and self._terminate:
+ if e.errno == errno.EINTR:
pass
elif e.errno == errno.ECONNRESET:
self.log.warn("[Head] Tail disconnected")
- socketArray.remove(clientSocket)
- tailConnection=False
+ if clientSocket in socketArray:
+ socketArray.remove(clientSocket)
else:
self.log.exception(e)
raise
except Exception as e:
self.log.exception(e)
raise
-
+
+
+
+ def handleMessages(self, socketArray, clientSocket):
+ self.isThreadRunning=True
+ # Step #2: listen on all the sockets
+ lastReport=datetime.datetime.now()
+ self._terminateThread=False
+ while not(self._terminate) and not(self._terminateThread):
+ try:
+ ready=select.select(socketArray, [], [], 180)
+ if ready[0]:
+ for r in ready[0]:
+ if r==self.controlPipe:
+ os.read(self.controlPipe[0],1)
+ continue
+ if r==clientSocket:
+ # we received something from the tail
+ dta=r.recv(5)
+ if len(dta)==0:
+ #connection reset
+ self.log.warn("[Head] >Tail< disconnected")
+ clientSocket.close()
+ self._terminateThread=True
+ continue
+ while(len(dta)<5):
+ dta+=r.recv(5-len(dta))
+ sockRd=eu.liebrand.udppipe.Utility.SockRead()
+ buf=cStringIO.StringIO(dta)
+ _,_,length=sockRd.read(buf)
+ data=[]
+ tmp=length
+ while length>0:
+ chunk=r.recv(length)
+ data.append(chunk)
+ length-=len(chunk)
+ self.log.debug("[Head] Received %d bytes from >Tail<" % (tmp))
+ self.TCPBytesIn+=tmp
+ self.packetsIn+=1
+ readDict=eu.liebrand.udppipe.Utility.ReadDictionary()
+ fields=readDict.read(''.join(data))
+ if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_PING:
+ self.lastPing=datetime.datetime.now()
+ continue
+ if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG:
+ for k in fields.keys():
+ if k.startswith('++'):
+ k=k[2:]
+ found=False
+ for lstCfg in self.listenerConfig:
+ if k==lstCfg[0]:
+ found=True
+ # existing ID
+ if fields[k]!=lstCfg[1]:
+ # listening port changed
+ self.log.info("[Head] Received new port for service id %s (old %d -> new %d)" % (k, lstCfg[1], fields[k]))
+ lstCfg[2].close()
+ socketArray.remove(lstCfg[2])
+ lstCfg[1]=fields['++'+k]
+ lstCfg[2]=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ lstCfg[2].bind(('', lstCfg[1]))
+ socketArray.append(lstCfg[2])
+ if not(found):
+ s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.bind(('', fields['++' + k]))
+ self.listenerConfig.append([ k, fields['++' + k], s, None, None ])
+ socketArray.append(s)
+ self.log.info("[Head] New UDP Listener <%s> on port <%d>" % (k, fields['++' + k]))
+ continue
+ if k==PipeBase.FIELD_PUBKEY:
+ self.publicKey=fields[k]
+ continue
+ if k==PipeBase.FIELD_ADMINPORT:
+ self.adminPort=fields[k]
+ self.enableAdmin=(self.adminPort!=0)
+ if self.enableAdmin:
+ if self.adminSocket is not None:
+ self.log.info("[Head] Disabling current admin")
+ self._terminateAdmin=True
+ self.adminSocket.close()
+ self.adminSocket=None
+ self.adminThread.join()
+ self.log.info("[Head] Admin is enabled on port %d" % (self.adminPort))
+ self._terminateAdmin=False
+ self.adminThread=threading.Thread(target=self.adminServer)
+ self.adminThread.daemon=True
+ self.adminThread.start()
+ continue
+
+ for lstCfg in self.listenerConfig:
+ found=False
+ for k in fields.keys():
+ k=k[2:]
+ if k==lstCfg[0]:
+ found=True
+ break
+ if not(found):
+ lstCfg[2].close()
+ self.listenerConfig.remove(lstCfg)
+ socketArray.remove(lstCfg[2])
+ self.log.info("[Head] Deleted UDP Listener <%s> on port <%d>" % (lstCfg[0], lstCfg[1]))
+ continue
+ # find the outbound socket
+ found=False
+ for lstCfg in self.listenerConfig:
+ if lstCfg[1]==fields['srvPort']:
+ lstCfg[2].sendto(fields[PipeBase.FIELD_UDPDATA], (fields['host'], fields['port']))
+ found=True
+ self.log.debug("[Head] Forwarded response packet of %d bytes to %s:%d for port %d" % \
+ (len(fields[PipeBase.FIELD_UDPDATA]), fields['host'], fields['port'], \
+ fields['srvPort']))
+ self.UDPBytesOut+=len(fields[PipeBase.FIELD_UDPDATA])
+ self.packetsOut+=1
+ break
+ if not(found):
+ self.log.warn("[Head] Received a response for an unknown client on port %d" % (fields['srvPort']))
+ continue
+
+ for lstCfg in self.listenerConfig:
+ if r==lstCfg[2]:
+ # we have an inbound message
+ udpData, address=r.recvfrom(4096)
+ self.log.debug("[Head] Received %d bytes from %s:%d" % (len(udpData), address[0], address[1]))
+ self.UDPBytesIn+=len(udpData)
+ self.packetsIn+=1
+ # we need to send udpData, listening Port, address
+ util=eu.liebrand.udppipe.Utility.SockWrite()
+ dataBuffer=cStringIO.StringIO()
+ util.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_UDP, dataBuffer)
+ util.writeString(PipeBase.FIELD_HOST, address[0], dataBuffer)
+ util.writeLong(PipeBase.FIELD_PORT, address[1], dataBuffer)
+ util.writeLong(PipeBase.FIELD_SRVPORT, lstCfg[1], dataBuffer)
+ util.writeBinary(PipeBase.FIELD_UDPDATA, udpData, dataBuffer)
+ dta=dataBuffer.getvalue()
+ ctlBuffer=cStringIO.StringIO()
+ util.writeLongDirect(len(dta), ctlBuffer)
+ util.writeBinaryDirect(dta, ctlBuffer)
+ dta=ctlBuffer.getvalue()
+ bytesSnd=0
+ while bytesSndTail<" % (bytesSnd))
+ self.TCPBytesOut+=bytesSnd
+ self.packetsOut+=1
+ dataBuffer.close()
+ ctlBuffer.close()
+ break
+ else:
+ self.log.warn("[Head] No activity for 180 seconds, assuming Tail is absent")
+ self._terminateThread=True
+ now=datetime.datetime.now()
+ if (now-lastReport).seconds>=(3600*24) or self._terminateThread or self._terminate:
+ self.logStats(0, None)
+ lastReport=now
+ except select.error, (_errno, _strerror):
+ if _errno == errno.EINTR:
+ continue
+ except socket.error, (_errno, _strerror):
+ if _errno == errno.ECONNRESET:
+ self.log.warn("[Head] Tail disconnected")
+ self._terminateThread=True
+ else:
+ self.log.exception(socket.error)
+ self._terminateThread=True
+ if clientSocket in socketArray:
+ socketArray.remove(clientSocket)
+ self.isThreadRunning=False
+
+
+ def adminServer(self):
+ pubKeyObj=RSA.importKey(self.publicKey)
+ adminSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ adminSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ adminSocket.bind(('', self.adminPort))
+ adminSocket.listen(5)
+ while not(self._terminate) and not(self._terminateAdmin):
+ self.log.info("[Head] Waiting for >Admin< to connect on port %d" % (self.adminPort))
+ try:
+ (clientSocket, address)=adminSocket.accept()
+ self.log.info("[Head] Connection from >Admin< at %s:%d" % (address[0], address[1]))
+ dta=clientSocket.recv(5)
+ if len(dta)==0:
+ #connection reset
+ self.log.warn("[Head] >Admin< disconnected")
+ clientSocket.close()
+ continue
+ while(len(dta)<5):
+ dta+=clientSocket.recv(5-len(dta))
+ sockRd=eu.liebrand.udppipe.Utility.SockRead()
+ buf=cStringIO.StringIO(dta)
+ _,_,length=sockRd.read(buf)
+ data=[]
+ tmp=length
+ while length>0:
+ chunk=clientSocket.recv(length)
+ data.append(chunk)
+ length-=len(chunk)
+ self.log.debug("[Head] Received %d bytes from >Admin<" % (tmp))
+ readDict=eu.liebrand.udppipe.Utility.ReadDictionary()
+ fields=readDict.read(''.join(data))
+ retData={}
+ if fields.has_key('payload') and fields.has_key('signature'):
+ dta=fields['payload']
+ signature=fields['signature']
+ hsh = SHA256.new(dta)
+ verifier=PKCS1_v1_5.new(pubKeyObj)
+ signed=verifier.verify(hsh, signature)
+ if signed:
+ self.log.info("[Head] Received valid request from >Admin>")
+ else:
+ self.log.info("[Head] Received INVALID request from >Admin>")
+ dct=json.loads(dta)
+ if dct['op']=="challenge":
+ retData["challenge"]=str(uuid.uuid4())
+ if dct['op']=="status":
+ retData["tailConnected"]=self.isThreadRunning
+ retData["lastPing"]=self.lastPing
+ retData["startTime"]=self.startTime
+ retData["reconnects"]=self.reconnects
+ retData["noOfPorts"]=len(self.listenerConfig)
+ if dct['op']=="statistic":
+ pass
+ if dct['op']=="detailStatistic":
+ pass
+ retData['status']='ok'
+ else:
+ retData['status']='fail'
+ sockWt=eu.liebrand.udppipe.Utility.SockWrite()
+ buf=cStringIO.StringIO()
+ retStrg=json.dumps(retData, cls=DateTimeEncoder)
+ sockWt.writeString('result', retStrg, buf)
+ dta=buf.getvalue()
+ ctlBuffer=cStringIO.StringIO()
+ sockWt.writeLongDirect(len(dta), ctlBuffer)
+ sockWt.writeBinaryDirect(dta, ctlBuffer)
+ dta=ctlBuffer.getvalue()
+ bytesSnd=0
+ while bytesSndAdmin<" % (bytesSnd))
+ clientSocket.close()
+ except socket.error as e:
+ if e.errno == errno.EINTR:
+ pass
+ else:
+ self.log.exception(e)
+
+
class Tail(PipeBase):
SECTION="tail"
- KEY_HEADHOST="headHost"
- KEY_HEADPORT="headPort"
- KEY_TIMEOUT="timeout"
WAIT4RETRY=300
@@ -380,9 +605,16 @@ class Tail(PipeBase):
def readTailConfig(self, cfg):
- self.headHost=cfg.get(Tail.SECTION, Tail.KEY_HEADHOST)
- self.headPort=cfg.getint(Tail.SECTION, Tail.KEY_HEADPORT)
- self.timeout=cfg.getint(Tail.SECTION, Tail.KEY_TIMEOUT)
+ self.headHost=cfg.headHost
+ self.headPort=cfg.headPort
+ self.timeout=cfg.timeout
+ self.adminSocket=None
+ self.enableAdmin=cfg.enableAdmin
+ if self.enableAdmin:
+ self.adminPort=cfg.adminPort
+ self.publicKey=cfg.publicKey
+ self.privateKey=cfg.privateKey
+ self.certificate=cfg.certificate
def terminate(self, sigNo, stackFrame):
if sigNo==signal.SIGINT:
@@ -396,6 +628,10 @@ class Tail(PipeBase):
v=self.sourceIds[s]
v[0].put({})
os.write(v[1][1], 'x')
+ if self.adminSocket is not None:
+ self.adminSocket.close()
+ self.adminSocket=None
+
def run(self):
# Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors
@@ -407,6 +643,11 @@ class Tail(PipeBase):
sockRd=eu.liebrand.udppipe.Utility.SockRead()
sockWt=eu.liebrand.udppipe.Utility.SockWrite()
+ if self.enableAdmin:
+ t=threading.Thread(target=self.adminServer)
+ t.daemon=True
+ t.start()
+
# Step #1: Connect to the head
servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# wait for 2 seconds before trying to connect (avoid missing each other, when cron
@@ -415,16 +656,33 @@ class Tail(PipeBase):
lastReport=datetime.datetime.now()
while not(self._terminate):
try:
+ self.log.info("[Tail] Trying to connect to >Head< at %s:%d" % (self.headHost,self.headPort))
servSocket.connect((self.headHost,self.headPort))
self.connected=True
- self.log.info("[Tail] Connected to >Head< at %s:%d" % (self.headHost,self.headPort))
+ self.log.info("[Tail] Connected to >Head< at %s" % (str(servSocket.getpeername())))
self.fds.append(servSocket)
# send config
dataBuffer=cStringIO.StringIO()
sockWt.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_CONFIG, dataBuffer)
for lstCfg in self.listenerConfig:
- sockWt.writeLong(lstCfg[0], lstCfg[1], dataBuffer)
+ sockWt.writeLong("++" + lstCfg[0], lstCfg[1], dataBuffer)
+ if (self.enableAdmin):
+ keyPath=self.publicKey
+ if not(os.path.exists(keyPath)):
+ pwd=os.environ['PWD']
+ keyPath=os.path.join(pwd,keyPath)
+ if not(os.path.exists(keyPath)):
+ self.log.warn("[Tail] Could not enable admin functionality, public key not found at %s nor %s" % (self.publicKey, keyPath))
+ self.enableAdmin=False
+ if not(self.enableAdmin):
+ sockWt.writeLong(PipeBase.FIELD_ADMINPORT, 0, dataBuffer)
+ else:
+ sockWt.writeLong(PipeBase.FIELD_ADMINPORT, self.adminPort, dataBuffer)
+ fl=open(keyPath, 'r')
+ pubKey=fl.read()
+ fl.close()
+ sockWt.writeString(PipeBase.FIELD_PUBKEY, pubKey, dataBuffer)
dta=dataBuffer.getvalue()
ctlBuffer=cStringIO.StringIO()
sockWt.writeLongDirect(len(dta), ctlBuffer)
@@ -461,6 +719,7 @@ class Tail(PipeBase):
dataBuffer.close()
ctlBuffer.close()
now=datetime.datetime.now()
+ self.lastPing=now
if (now-lastReport).seconds>=(3600*24):
self.logStats(0, None)
lastReport=now
@@ -610,6 +869,69 @@ class Tail(PipeBase):
del self.sourceIds[sourceId]
self.sourceIdLock.release()
self.log.debug("[Tail] Removed handler for source id %s" % (sourceId))
+
+ def adminServer(self):
+ sockWt=eu.liebrand.udppipe.Utility.SockWrite()
+ adminSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ adminSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ adminSocket.bind(('', self.adminPort))
+ adminSocket.listen(5)
+ while not(self._terminate):
+ self.log.info("[Tail] Waiting for 'Admin' to connect on port %d" % (self.adminPort))
+ try:
+ (clientSocket, address)=adminSocket.accept()
+ self.log.info("[Tail] Connection from 'Admin' at %s:%d" % (address[0], address[1]))
+ dataBuffer=cStringIO.StringIO()
+ privateKeyPath=self.privateKey
+ if not(os.path.exists(privateKeyPath)):
+ pwd=os.environ['PWD']
+ privateKeyPath=os.path.join(pwd,privateKeyPath)
+ if not(os.path.exists(privateKeyPath)):
+ self.log.warn("[Tail] Could not enable admin functionality, **private** key not found at %s nor %s" % (self.privateKey, privateKeyPath))
+ self.enableAdmin=False
+ certficatePath=self.certificate
+ if not(os.path.exists(certficatePath)):
+ pwd=os.environ['PWD']
+ certficatePath=os.path.join(pwd, certficatePath)
+ if not(os.path.exists(certficatePath)):
+ self.log.warn("[Tail] Could not enable admin functionality, **public** key not found at %s nor %s" % (self.publicKey, certficatePath))
+ self.enableAdmin=False
+ if not(self.enableAdmin):
+ sockWt.writeString(PipeBase.FIELD_ADMINSTATUS, "FAIL", dataBuffer)
+ else:
+ # read keys
+ fl=open(privateKeyPath, 'r')
+ privKey=fl.read()
+ fl.close()
+ privKey="".join(privKey.split("-----")[2].split())
+ fl=open(certficatePath, 'r')
+ cert=fl.read()
+ fl.close()
+ cert="".join(cert.split("-----")[2].split())
+ #privKey=privKey[2]
+ #privKey="".join(privKey.split())
+ sockWt.writeString(PipeBase.FIELD_ADMINSTATUS, "ok", dataBuffer)
+ sockWt.writeString(PipeBase.FIELD_PRIVKEY, privKey, dataBuffer)
+ sockWt.writeString(PipeBase.FIELD_PUBKEY, cert, dataBuffer)
+ sockWt.writeString(PipeBase.FIELD_HOST, self.headHost, dataBuffer)
+ sockWt.writeLong(PipeBase.FIELD_PORT, self.adminPort, dataBuffer)
+ dta=dataBuffer.getvalue()
+ ctlBuffer=cStringIO.StringIO()
+ sockWt.writeLongDirect(len(dta), ctlBuffer)
+ sockWt.writeBinaryDirect(dta, ctlBuffer)
+ dta=ctlBuffer.getvalue()
+ bytesSnd=0
+ while bytesSnd