From 0fa9df5f7a204e2b5081fc47eda82012000c6762 Mon Sep 17 00:00:00 2001 From: Mark Liebrand Date: Fri, 27 Jan 2017 21:16:13 +0100 Subject: [PATCH] ... --- .DS_Store | Bin 0 -> 6148 bytes README.md | 5 - UdpPipe/.DS_Store | Bin 0 -> 6148 bytes UdpPipe/.project | 17 + UdpPipe/.pydevproject | 8 + UdpPipe/eu/.DS_Store | Bin 0 -> 6148 bytes UdpPipe/eu/.gitignore | 1 + UdpPipe/eu/liebrand/udppipe/.gitignore | 2 + UdpPipe/eu/liebrand/udppipe/__init__.py | 694 +++++++++++++++++------- 9 files changed, 536 insertions(+), 191 deletions(-) create mode 100644 .DS_Store delete mode 100644 README.md create mode 100644 UdpPipe/.DS_Store create mode 100644 UdpPipe/.project create mode 100644 UdpPipe/.pydevproject create mode 100644 UdpPipe/eu/.DS_Store create mode 100644 UdpPipe/eu/.gitignore create mode 100644 UdpPipe/eu/liebrand/udppipe/.gitignore diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..450682deb6e7a041d0f3fe13a1139f8921d1228e GIT binary patch literal 6148 zcmeHKyJ`b55ZrYPIdF4jO8tWWK$vqMFzG{l2^a!41ov0@yZmXHeH1ZDI=gU~h1E!_ z9SKcwdn+QkI*v<`UPL;$p`2`J&GyY3n`K0SaGbG|gS>x?U)#5Rll^pr@)|#X7izWvu1~)eml-Do-SGgIZ^>CaH+sU zEPJc}Yxp1i|0RhlDnJGPl>#~+Hp2=}%G%mI&T4Iezrro&3^&8vDHyyQ1HBw$VdePA clOnI!9Q!qK3UoT+P6zU5z;vNefmGA`upj4yIrl2Hc46u-pSzlYadAkr~nn90#twsj7Nby z%IAymJQAOT3Q&QOC}7`*0yoxV3;L%6!AAgahO!&hK1+bb3SdpPASy78Rxn!C#}KP~ zJ6LjGO}1dPi{|j5d1tjL2By(2T9ClBIvA(`6&NTmjl8q-e+U0I{|{Q2QUNOPXA0 + + UdpPipe + + + + + + org.python.pydev.PyDevBuilder + + + + + + org.python.pydev.pythonNature + + diff --git a/UdpPipe/.pydevproject b/UdpPipe/.pydevproject new file mode 100644 index 0000000..037bd25 --- /dev/null +++ b/UdpPipe/.pydevproject @@ -0,0 +1,8 @@ + + + +/${PROJECT_DIR_NAME} + +python 2.7 +Default + diff --git a/UdpPipe/eu/.DS_Store b/UdpPipe/eu/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..bd4d1fed15f256890b81b5baaf477a71f4b5e56f GIT binary patch literal 6148 zcmeHKJ5EC}5S)b+Pohal=__ypD+(vz0uWx1B1MS=qJI_V;%Ll%3Ze&H(nPb;dhGR% zEl=_GEdX16jt{^Rz(9Azmk)FEefOE&RK$pMp7D+s95B9(AFs3QzXQ%4u*VKtY<}}M z$K&2NWl}&2NC7Dz1*E`P6{zw$+@1AQ9VP{&z%?k~--kwb?1fWed^$Kp3qV{j9L9O{ z62#^KVlSK$8KGHHiAl9;F)Znfx612XIIp#h{!I6rZ@L@jLE#YPm>A`l3opm_ ck(7DO=iKjwQ)18=4?0ml1Fnlq3S3%&A7!f*X#fBK literal 0 HcmV?d00001 diff --git a/UdpPipe/eu/.gitignore b/UdpPipe/eu/.gitignore new file mode 100644 index 0000000..0479089 --- /dev/null +++ b/UdpPipe/eu/.gitignore @@ -0,0 +1 @@ +/__init__.pyc diff --git a/UdpPipe/eu/liebrand/udppipe/.gitignore b/UdpPipe/eu/liebrand/udppipe/.gitignore new file mode 100644 index 0000000..a822e02 --- /dev/null +++ b/UdpPipe/eu/liebrand/udppipe/.gitignore @@ -0,0 +1,2 @@ +/Utility.pyc +/__init__.pyc diff --git a/UdpPipe/eu/liebrand/udppipe/__init__.py b/UdpPipe/eu/liebrand/udppipe/__init__.py index ab8ab9d..0a520ae 100644 --- a/UdpPipe/eu/liebrand/udppipe/__init__.py +++ b/UdpPipe/eu/liebrand/udppipe/__init__.py @@ -17,56 +17,137 @@ import datetime from optparse import OptionParser import errno from daemon import runner +from threading import Thread +import uuid +from datetime import date +from Crypto.PublicKey import RSA +from Crypto.Hash import SHA256 +from Crypto.Signature import PKCS1_v1_5 +import json + + + + +class Config: + + STRING_KEYS=["msgFormat", "logFileName", "secretKey", "id", "forwardHost", \ + "headHost", "privateKey", "publicKey", "certificate"] + INT_KEYS=["maxFilesize", "listenPort", "adminPort", "forwardPort", "logLevel", "headPort", "timeout", "pipePort"] + BOOLEAN_KEYS=["enableLogging", "enableAdmin"] + + DEFAULTS={"enableLogging" :"yes", + "logFileName" : "/tmp/udppipe.log", + "maxFilesize" : 1000000, + "msgFormat" : "%(asctime)s, %(levelname)s, %(module)s, %(lineno)d, %(message)s", + "logLevel" :20, + "enableAdmin" :"no" + } + + + def __init__(self, cfgFile, section): + self.section=section + self.cfg=RawConfigParser(Config.DEFAULTS) + _=self.cfg.read(cfgFile) + + def hasKey(self, dct, key): + k=key.upper() + for d in dct: + if d.upper() == k: + return d + return None + + def hasSection(self, section): + return self.cfg.has_section(section) + + def hasOption(self, option): + return self.cfg.has_option(self.section, option) + + def __getattr__(self, name): + key=self.hasKey(Config.STRING_KEYS, name) + if not key is None: + return self.cfg.get(self.section, key) + key=self.hasKey(Config.INT_KEYS, name) + if not key is None: + return self.cfg.getint(self.section, key) + key=self.hasKey(Config.BOOLEAN_KEYS, name) + if not key is None: + return self.cfg.getboolean(self.section, key) + return None + + def setSection(self, newSection): + tmp=self.section + self.section=newSection + return tmp + +class DateTimeEncoder(json.JSONEncoder): + + + def default(self, obj): + if isinstance(obj, datetime.datetime): + return { + '__type__' : 'seconds', + 'seconds' : time.mktime(obj.timetuple()), + } + else: + return json.JSONEncoder.default(self, obj) + +class DateTimeDecoder(json.JSONDecoder): + + def __init__(self, *args, **kargs): + json.JSONDecoder.__init__(self, object_hook=self.dict_to_object, + *args, **kargs) + + def dict_to_object(self, d): + if '__type__' not in d: + return d + + return datetime.datetime.fromtimestamp(d['seconds']) + class PipeBase: CONFIG_DIR="./" CONFIG_FILE="udppipe.ini" - KEY_ENABLELOGGING="EnableLogging" - KEY_LOGFILENAME="logFileName" - KEY_MAXFILESIZE="MaxFilesize" - KEY_MSGFORMAT="MsgFormat" - KEY_LOGLEVEL="logLevel" - KEY_PORTCONFIG="portConfig_%d" - KEY_LISTENPORT="listenPort" - KEY_CFGID="id" - KEY_FORWARDHOST="forwardHost" - KEY_FORWARDPORT="forwardPort" + SECTION_PORTCONFIG="portConfig_%d" + + KEY_CFGFILE="__cfgFile" FIELD_HOST='host' FIELD_PORT='port' FIELD_OP='OP' FIELD_SRVPORT='srvPort' FIELD_UDPDATA='udpData' + FIELD_PRIVKEY="privateKey" + FIELD_PUBKEY="publicKey" + FIELD_CERTIFICATE="certificate" + FIELD_ADMINPORT="adminPort" + FIELD_ADMINSTATUS="adminStatus" VALUE_UDP='udp' VALUE_PING='ping' VALUE_CONFIG='__cfg__' + VALUE_KEY='__key__' IDX_FORWARDHOST=3 IDX_FORWARDPORT=4 - DEFAULTS={KEY_ENABLELOGGING :"yes", - KEY_LOGFILENAME : "/tmp/udppipe.log", - KEY_MAXFILESIZE : 1000000, - KEY_MSGFORMAT : "%(asctime)s, %(levelname)s, %(module)s, %(lineno)d, %(message)s", - KEY_LOGLEVEL : 20 - } def __init__(self, section): - self.stdin_path = '/dev/null' - self.stdout_path = '/dev/tty' - self.stderr_path = '/dev/tty' - self.pidfile_path = '/tmp/udppipe.pid' - self.pidfile_timeout = 5 + self.section=section path=join(PipeBase.CONFIG_DIR, PipeBase.CONFIG_FILE) if not(exists(path)): self.printLogLine(sys.stderr,"[UDPPIPE] No config file %s found at %s" % (PipeBase.CONFIG_FILE, PipeBase.CONFIG_DIR)) self.setupOk=False return - self.cfg=self.readConfig(path) + self.cfg=Config(path, section) + self.readConfig(self.cfg) + self.stdin_path = '/dev/null' + self.stdout_path = self.cfg.logFileName + self.stderr_path = self.cfg.logFileName + self.pidfile_path = '/tmp/udppipe.pid' + self.pidfile_timeout = 5 #self.setupLogger(self.cfg) self.setupOk=True #self.log.info("[%s] init done" % (section)) @@ -80,6 +161,7 @@ class PipeBase: signal.signal(signal.SIGUSR1, self.toggleLogLevel) signal.signal(signal.SIGUSR2, self.logStats) self.startTime=datetime.datetime.now() + self.lastPing=None def getTimeStamp(self): @@ -91,13 +173,10 @@ class PipeBase: def setupLogger(self, cfg): try: - maxFileSize=cfg.getint(self.section, PipeBase.KEY_MAXFILESIZE) - msgFormat=cfg.get(self.section, PipeBase.KEY_MSGFORMAT) - self.logFileName=cfg.get(self.section, PipeBase.KEY_LOGFILENAME) self.log=logging.Logger(self.section) - loghdl=RotatingFileHandler(self.logFileName, 'a', maxFileSize, 4) - loghdl.setFormatter(logging.Formatter(msgFormat)) - loghdl.setLevel(cfg.getint(self.section, PipeBase.KEY_LOGLEVEL)) + loghdl=RotatingFileHandler(cfg.logFileName, 'a', cfg.maxFilesize, 4) + loghdl.setFormatter(logging.Formatter(cfg.msgFormat)) + loghdl.setLevel(cfg.logLevel) self.log.addHandler(loghdl) self.log.disabled=False return True @@ -118,27 +197,33 @@ class PipeBase: self.log.info("[%s] %d Packets in, %d Packets out" % (self.section, self.packetsIn, self.packetsOut)) self.log.info("[%s] UDP Traffic %d bytes in, %d bytes out, TCP Traffic %d bytes in, %d bytes out" % (self.section, self.UDPBytesIn, self.UDPBytesOut, self.TCPBytesIn, self.TCPBytesOut)) self.log.info("[%s] Uptime %s, Reconnects %d" % (self.section, str(uptime), self.reconnects)) + if self.lastPing is None: + self.log.info("[%s] Last Ping: never") + else: + ago=now-self.lastPing + self.log.info("[%s] Last Ping: %s (%s ago)" % (self.section, str(self.lastPing), str(ago))) - def readConfig(self, cfgFile): - cfg=RawConfigParser(PipeBase.DEFAULTS) - _=cfg.read(cfgFile) + def readConfig(self, cfg): + #cfg.set(self.section, PipeBase.KEY_CFGFILE) i=0 self.listenerConfig=[] while True: i+=1 - section=PipeBase.KEY_PORTCONFIG % (i) - if cfg.has_section(section): - listenPort=cfg.getint(section, PipeBase.KEY_LISTENPORT) - cfgId=cfg.get(section, PipeBase.KEY_CFGID) + section=PipeBase.SECTION_PORTCONFIG % (i) + if cfg.hasSection(section): + tmpSection=cfg.setSection(section) + listenPort=cfg.listenPort + cfgId=cfg.id if cfgId==PipeBase.VALUE_CONFIG: self.printLogLine(sys.stderr, "WARN: Don't use ID %s for a port configuration" % (cfgId)) if self.section==Head.SECTION: forwardHost=None forwardPort=None else: - forwardHost=cfg.get(section, PipeBase.KEY_FORWARDHOST) - forwardPort=cfg.getint(section, PipeBase.KEY_FORWARDPORT) + forwardHost=cfg.forwardHost + forwardPort=cfg.forwardPort self.listenerConfig.append([ cfgId, listenPort, None, forwardHost, forwardPort ]) + cfg.setSection(tmpSection) else: break return cfg @@ -158,6 +243,8 @@ class Head(PipeBase): self._terminate=False signal.signal(signal.SIGTERM, self.terminate) signal.signal(signal.SIGINT, self.terminate) + self.adminSocket=None + self._terminateAdmin=False def terminate(self, sigNo, stackFrame): @@ -167,24 +254,38 @@ class Head(PipeBase): self.log.info("[Head] Terminating upon Signal Term") self._terminate=True self.pipeSocket.close() + if self.adminSocket is not None: + self._terminateAdmin=True + self.adminSocket.close() + self.adminSocket=None + def readHeadConfig(self, cfg): - self.pipePort=cfg.getint(Head.SECTION, Head.KEY_PIPEPORT) - self.enableHostNameCheck=cfg.has_option(Head.SECTION, Head.KEY_TAILHOSTNAME) + self.pipePort=cfg.pipePort + self.enableHostNameCheck=cfg.hasOption("tailHostname") if self.enableHostNameCheck: - self.tailHostname=cfg.get(Head.SECTION, Head.KEY_TAILHOSTNAME) + self.tailHostname=cfg.tailHostname + self.enableAdmin=cfg.enableAdmin + if self.enableAdmin: + self.adminPort=cfg.adminPort + self.publicKey=cfg.publicKey def run(self): # Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors # in combination with the daemon function self.setupLogger(self.cfg) socketArray=[] + # Step #1: Setup all the sockets self.pipeSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.pipeSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.pipeSocket.bind(('', self.pipePort)) self.pipeSocket.listen(5) + thrd=None + self.isThreadRunning=False + self.controlPipe=os.pipe() + socketArray.append(self.controlPipe[0]) while not(self._terminate): try: self.log.info("[Head] Waiting for >Tail< to connect on port %d" % (self.pipePort)) @@ -196,9 +297,6 @@ class Head(PipeBase): clientSocket.close() continue self.log.info("[Head] Connection from tail at %s:%d" % (address[0], address[1])) - self.reconnects+=1 - socketArray.append(clientSocket) - tailConnection=True # now we are ready for incoming udp messages try: if len(socketArray)==1: @@ -212,158 +310,285 @@ class Head(PipeBase): self.log.error("[Head] Unable to listen on port %d - already in use" % (lstCfg[1])) else: self.log.error("[Head] Unable to listen on port %d - Error %d %s" % (lstCfg[1], e.errno, errno.errorcode[e.errno])) - self._terminate=True - clientSocket.close() - continue - # Step #2: listen on all the sockets - lastReport=datetime.datetime.now() - while not(self._terminate) and tailConnection: - try: - ready=select.select(socketArray, [], [], 180) - except select.error, (_errno, _strerror): - if _errno == errno.EINTR: - continue - else: - raise - if ready[0]: - for r in ready[0]: - if r==clientSocket: - # we received something from the tail - dta=r.recv(5) - if len(dta)==0: - #connection reset - self.log.warn("[Head] >Tail< disconnected") - socketArray.remove(clientSocket) - clientSocket.close() - tailConnection=False - continue - while(len(dta)<5): - dta+=r.recv(5-len(dta)) - sockRd=eu.liebrand.udppipe.Utility.SockRead() - buf=cStringIO.StringIO(dta) - _,_,length=sockRd.read(buf) - data=[] - tmp=length - while length>0: - chunk=r.recv(length) - data.append(chunk) - length-=len(chunk) - self.log.debug("[Head] Received %d bytes from >Tail<" % (tmp)) - self.TCPBytesIn+=tmp - self.packetsIn+=1 - readDict=eu.liebrand.udppipe.Utility.ReadDictionary() - fields=readDict.read(''.join(data)) - if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_PING: - continue - if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG: - for k in fields.keys(): - if k!=PipeBase.FIELD_OP: - found=False - for lstCfg in self.listenerConfig: - if k==lstCfg[0]: - found=True - # existing ID - if fields[k]!=lstCfg[1]: - # listening port changed - self.log.info("[Head] Received new port for service id %s (old %d -> new %d)" % (k, lstCfg[1], fields[k])) - lstCfg[2].close() - socketArray.remove(lstCfg[2]) - lstCfg[1]=fields[k] - lstCfg[2]=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - lstCfg[2].bind(('', lstCfg[1])) - socketArray.append(lstCfg[2]) - if not(found): - s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.bind(('', fields[k])) - self.listenerConfig.append([ k, fields[k], s, None, None ]) - socketArray.append(s) - self.log.info("[Head] New UDP Listener <%s> on port <%d>" % (k, fields[k])) - for lstCfg in self.listenerConfig: - found=False - for k in fields.keys(): - if k==lstCfg[0]: - found=True - break - if not(found): - lstCfg[2].close() - self.listenerConfig.remove(lstCfg) - socketArray.remove(lstCfg[2]) - self.log.info("[Head] Deleted UDP Listener <%s> on port <%d>" % (lstCfg[0], lstCfg[1])) - continue - # find the outbound socket - found=False - for lstCfg in self.listenerConfig: - if lstCfg[1]==fields['srvPort']: - lstCfg[2].sendto(fields[PipeBase.FIELD_UDPDATA], (fields['host'], fields['port'])) - found=True - self.log.debug("[Head] Forwarded response packet of %d bytes to %s:%d for port %d" % \ - (len(fields[PipeBase.FIELD_UDPDATA]), fields['host'], fields['port'], \ - fields['srvPort'])) - self.UDPBytesOut+=len(fields[PipeBase.FIELD_UDPDATA]) - self.packetsOut+=1 - break - if not(found): - self.log.warn("[Head] Received a response for an unknown client on port %d" % (fields['srvPort'])) - continue - - for lstCfg in self.listenerConfig: - if r==lstCfg[2]: - # we have an inbound message - udpData, address=r.recvfrom(4096) - self.log.debug("[Head] Received %d bytes from %s:%d" % (len(udpData), address[0], address[1])) - self.UDPBytesIn+=len(udpData) - self.packetsIn+=1 - # we need to send udpData, listening Port, address - util=eu.liebrand.udppipe.Utility.SockWrite() - dataBuffer=cStringIO.StringIO() - util.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_UDP, dataBuffer) - util.writeString(PipeBase.FIELD_HOST, address[0], dataBuffer) - util.writeLong(PipeBase.FIELD_PORT, address[1], dataBuffer) - util.writeLong(PipeBase.FIELD_SRVPORT, lstCfg[1], dataBuffer) - util.writeBinary(PipeBase.FIELD_UDPDATA, udpData, dataBuffer) - dta=dataBuffer.getvalue() - ctlBuffer=cStringIO.StringIO() - util.writeLongDirect(len(dta), ctlBuffer) - util.writeBinaryDirect(dta, ctlBuffer) - dta=ctlBuffer.getvalue() - bytesSnd=0 - while bytesSndTail<" % (bytesSnd)) - self.TCPBytesOut+=bytesSnd - self.packetsOut+=1 - dataBuffer.close() - ctlBuffer.close() - break - else: - self.log.warn("[Head] No activity for 180 seconds, assumung Tail is absent") - socketArray.remove(clientSocket) - tailConnection=False - now=datetime.datetime.now() - if (now-lastReport).seconds>=(3600*24): - self.logStats(0, None) - lastReport=now + self._terminate=True + clientSocket.close() + continue + self.reconnects+=1 + socketArray.append(clientSocket) + if self.isThreadRunning: + self._terminateThread=True + os.write(self.controlPipe[1], 'x') + thrd.join() + thrd=Thread(target=self.handleMessages, args=(socketArray, clientSocket)) + thrd.start() + except socket.error as e: - if e.errno == errno.EINTR and self._terminate: + if e.errno == errno.EINTR: pass elif e.errno == errno.ECONNRESET: self.log.warn("[Head] Tail disconnected") - socketArray.remove(clientSocket) - tailConnection=False + if clientSocket in socketArray: + socketArray.remove(clientSocket) else: self.log.exception(e) raise except Exception as e: self.log.exception(e) raise - + + + + def handleMessages(self, socketArray, clientSocket): + self.isThreadRunning=True + # Step #2: listen on all the sockets + lastReport=datetime.datetime.now() + self._terminateThread=False + while not(self._terminate) and not(self._terminateThread): + try: + ready=select.select(socketArray, [], [], 180) + if ready[0]: + for r in ready[0]: + if r==self.controlPipe: + os.read(self.controlPipe[0],1) + continue + if r==clientSocket: + # we received something from the tail + dta=r.recv(5) + if len(dta)==0: + #connection reset + self.log.warn("[Head] >Tail< disconnected") + clientSocket.close() + self._terminateThread=True + continue + while(len(dta)<5): + dta+=r.recv(5-len(dta)) + sockRd=eu.liebrand.udppipe.Utility.SockRead() + buf=cStringIO.StringIO(dta) + _,_,length=sockRd.read(buf) + data=[] + tmp=length + while length>0: + chunk=r.recv(length) + data.append(chunk) + length-=len(chunk) + self.log.debug("[Head] Received %d bytes from >Tail<" % (tmp)) + self.TCPBytesIn+=tmp + self.packetsIn+=1 + readDict=eu.liebrand.udppipe.Utility.ReadDictionary() + fields=readDict.read(''.join(data)) + if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_PING: + self.lastPing=datetime.datetime.now() + continue + if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG: + for k in fields.keys(): + if k.startswith('++'): + k=k[2:] + found=False + for lstCfg in self.listenerConfig: + if k==lstCfg[0]: + found=True + # existing ID + if fields[k]!=lstCfg[1]: + # listening port changed + self.log.info("[Head] Received new port for service id %s (old %d -> new %d)" % (k, lstCfg[1], fields[k])) + lstCfg[2].close() + socketArray.remove(lstCfg[2]) + lstCfg[1]=fields['++'+k] + lstCfg[2]=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + lstCfg[2].bind(('', lstCfg[1])) + socketArray.append(lstCfg[2]) + if not(found): + s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.bind(('', fields['++' + k])) + self.listenerConfig.append([ k, fields['++' + k], s, None, None ]) + socketArray.append(s) + self.log.info("[Head] New UDP Listener <%s> on port <%d>" % (k, fields['++' + k])) + continue + if k==PipeBase.FIELD_PUBKEY: + self.publicKey=fields[k] + continue + if k==PipeBase.FIELD_ADMINPORT: + self.adminPort=fields[k] + self.enableAdmin=(self.adminPort!=0) + if self.enableAdmin: + if self.adminSocket is not None: + self.log.info("[Head] Disabling current admin") + self._terminateAdmin=True + self.adminSocket.close() + self.adminSocket=None + self.adminThread.join() + self.log.info("[Head] Admin is enabled on port %d" % (self.adminPort)) + self._terminateAdmin=False + self.adminThread=threading.Thread(target=self.adminServer) + self.adminThread.daemon=True + self.adminThread.start() + continue + + for lstCfg in self.listenerConfig: + found=False + for k in fields.keys(): + k=k[2:] + if k==lstCfg[0]: + found=True + break + if not(found): + lstCfg[2].close() + self.listenerConfig.remove(lstCfg) + socketArray.remove(lstCfg[2]) + self.log.info("[Head] Deleted UDP Listener <%s> on port <%d>" % (lstCfg[0], lstCfg[1])) + continue + # find the outbound socket + found=False + for lstCfg in self.listenerConfig: + if lstCfg[1]==fields['srvPort']: + lstCfg[2].sendto(fields[PipeBase.FIELD_UDPDATA], (fields['host'], fields['port'])) + found=True + self.log.debug("[Head] Forwarded response packet of %d bytes to %s:%d for port %d" % \ + (len(fields[PipeBase.FIELD_UDPDATA]), fields['host'], fields['port'], \ + fields['srvPort'])) + self.UDPBytesOut+=len(fields[PipeBase.FIELD_UDPDATA]) + self.packetsOut+=1 + break + if not(found): + self.log.warn("[Head] Received a response for an unknown client on port %d" % (fields['srvPort'])) + continue + + for lstCfg in self.listenerConfig: + if r==lstCfg[2]: + # we have an inbound message + udpData, address=r.recvfrom(4096) + self.log.debug("[Head] Received %d bytes from %s:%d" % (len(udpData), address[0], address[1])) + self.UDPBytesIn+=len(udpData) + self.packetsIn+=1 + # we need to send udpData, listening Port, address + util=eu.liebrand.udppipe.Utility.SockWrite() + dataBuffer=cStringIO.StringIO() + util.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_UDP, dataBuffer) + util.writeString(PipeBase.FIELD_HOST, address[0], dataBuffer) + util.writeLong(PipeBase.FIELD_PORT, address[1], dataBuffer) + util.writeLong(PipeBase.FIELD_SRVPORT, lstCfg[1], dataBuffer) + util.writeBinary(PipeBase.FIELD_UDPDATA, udpData, dataBuffer) + dta=dataBuffer.getvalue() + ctlBuffer=cStringIO.StringIO() + util.writeLongDirect(len(dta), ctlBuffer) + util.writeBinaryDirect(dta, ctlBuffer) + dta=ctlBuffer.getvalue() + bytesSnd=0 + while bytesSndTail<" % (bytesSnd)) + self.TCPBytesOut+=bytesSnd + self.packetsOut+=1 + dataBuffer.close() + ctlBuffer.close() + break + else: + self.log.warn("[Head] No activity for 180 seconds, assuming Tail is absent") + self._terminateThread=True + now=datetime.datetime.now() + if (now-lastReport).seconds>=(3600*24) or self._terminateThread or self._terminate: + self.logStats(0, None) + lastReport=now + except select.error, (_errno, _strerror): + if _errno == errno.EINTR: + continue + except socket.error, (_errno, _strerror): + if _errno == errno.ECONNRESET: + self.log.warn("[Head] Tail disconnected") + self._terminateThread=True + else: + self.log.exception(socket.error) + self._terminateThread=True + if clientSocket in socketArray: + socketArray.remove(clientSocket) + self.isThreadRunning=False + + + def adminServer(self): + pubKeyObj=RSA.importKey(self.publicKey) + adminSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + adminSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + adminSocket.bind(('', self.adminPort)) + adminSocket.listen(5) + while not(self._terminate) and not(self._terminateAdmin): + self.log.info("[Head] Waiting for >Admin< to connect on port %d" % (self.adminPort)) + try: + (clientSocket, address)=adminSocket.accept() + self.log.info("[Head] Connection from >Admin< at %s:%d" % (address[0], address[1])) + dta=clientSocket.recv(5) + if len(dta)==0: + #connection reset + self.log.warn("[Head] >Admin< disconnected") + clientSocket.close() + continue + while(len(dta)<5): + dta+=clientSocket.recv(5-len(dta)) + sockRd=eu.liebrand.udppipe.Utility.SockRead() + buf=cStringIO.StringIO(dta) + _,_,length=sockRd.read(buf) + data=[] + tmp=length + while length>0: + chunk=clientSocket.recv(length) + data.append(chunk) + length-=len(chunk) + self.log.debug("[Head] Received %d bytes from >Admin<" % (tmp)) + readDict=eu.liebrand.udppipe.Utility.ReadDictionary() + fields=readDict.read(''.join(data)) + retData={} + if fields.has_key('payload') and fields.has_key('signature'): + dta=fields['payload'] + signature=fields['signature'] + hsh = SHA256.new(dta) + verifier=PKCS1_v1_5.new(pubKeyObj) + signed=verifier.verify(hsh, signature) + if signed: + self.log.info("[Head] Received valid request from >Admin>") + else: + self.log.info("[Head] Received INVALID request from >Admin>") + dct=json.loads(dta) + if dct['op']=="challenge": + retData["challenge"]=str(uuid.uuid4()) + if dct['op']=="status": + retData["tailConnected"]=self.isThreadRunning + retData["lastPing"]=self.lastPing + retData["startTime"]=self.startTime + retData["reconnects"]=self.reconnects + retData["noOfPorts"]=len(self.listenerConfig) + if dct['op']=="statistic": + pass + if dct['op']=="detailStatistic": + pass + retData['status']='ok' + else: + retData['status']='fail' + sockWt=eu.liebrand.udppipe.Utility.SockWrite() + buf=cStringIO.StringIO() + retStrg=json.dumps(retData, cls=DateTimeEncoder) + sockWt.writeString('result', retStrg, buf) + dta=buf.getvalue() + ctlBuffer=cStringIO.StringIO() + sockWt.writeLongDirect(len(dta), ctlBuffer) + sockWt.writeBinaryDirect(dta, ctlBuffer) + dta=ctlBuffer.getvalue() + bytesSnd=0 + while bytesSndAdmin<" % (bytesSnd)) + clientSocket.close() + except socket.error as e: + if e.errno == errno.EINTR: + pass + else: + self.log.exception(e) + + class Tail(PipeBase): SECTION="tail" - KEY_HEADHOST="headHost" - KEY_HEADPORT="headPort" - KEY_TIMEOUT="timeout" WAIT4RETRY=300 @@ -380,9 +605,16 @@ class Tail(PipeBase): def readTailConfig(self, cfg): - self.headHost=cfg.get(Tail.SECTION, Tail.KEY_HEADHOST) - self.headPort=cfg.getint(Tail.SECTION, Tail.KEY_HEADPORT) - self.timeout=cfg.getint(Tail.SECTION, Tail.KEY_TIMEOUT) + self.headHost=cfg.headHost + self.headPort=cfg.headPort + self.timeout=cfg.timeout + self.adminSocket=None + self.enableAdmin=cfg.enableAdmin + if self.enableAdmin: + self.adminPort=cfg.adminPort + self.publicKey=cfg.publicKey + self.privateKey=cfg.privateKey + self.certificate=cfg.certificate def terminate(self, sigNo, stackFrame): if sigNo==signal.SIGINT: @@ -396,6 +628,10 @@ class Tail(PipeBase): v=self.sourceIds[s] v[0].put({}) os.write(v[1][1], 'x') + if self.adminSocket is not None: + self.adminSocket.close() + self.adminSocket=None + def run(self): # Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors @@ -407,6 +643,11 @@ class Tail(PipeBase): sockRd=eu.liebrand.udppipe.Utility.SockRead() sockWt=eu.liebrand.udppipe.Utility.SockWrite() + if self.enableAdmin: + t=threading.Thread(target=self.adminServer) + t.daemon=True + t.start() + # Step #1: Connect to the head servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) # wait for 2 seconds before trying to connect (avoid missing each other, when cron @@ -415,16 +656,33 @@ class Tail(PipeBase): lastReport=datetime.datetime.now() while not(self._terminate): try: + self.log.info("[Tail] Trying to connect to >Head< at %s:%d" % (self.headHost,self.headPort)) servSocket.connect((self.headHost,self.headPort)) self.connected=True - self.log.info("[Tail] Connected to >Head< at %s:%d" % (self.headHost,self.headPort)) + self.log.info("[Tail] Connected to >Head< at %s" % (str(servSocket.getpeername()))) self.fds.append(servSocket) # send config dataBuffer=cStringIO.StringIO() sockWt.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_CONFIG, dataBuffer) for lstCfg in self.listenerConfig: - sockWt.writeLong(lstCfg[0], lstCfg[1], dataBuffer) + sockWt.writeLong("++" + lstCfg[0], lstCfg[1], dataBuffer) + if (self.enableAdmin): + keyPath=self.publicKey + if not(os.path.exists(keyPath)): + pwd=os.environ['PWD'] + keyPath=os.path.join(pwd,keyPath) + if not(os.path.exists(keyPath)): + self.log.warn("[Tail] Could not enable admin functionality, public key not found at %s nor %s" % (self.publicKey, keyPath)) + self.enableAdmin=False + if not(self.enableAdmin): + sockWt.writeLong(PipeBase.FIELD_ADMINPORT, 0, dataBuffer) + else: + sockWt.writeLong(PipeBase.FIELD_ADMINPORT, self.adminPort, dataBuffer) + fl=open(keyPath, 'r') + pubKey=fl.read() + fl.close() + sockWt.writeString(PipeBase.FIELD_PUBKEY, pubKey, dataBuffer) dta=dataBuffer.getvalue() ctlBuffer=cStringIO.StringIO() sockWt.writeLongDirect(len(dta), ctlBuffer) @@ -461,6 +719,7 @@ class Tail(PipeBase): dataBuffer.close() ctlBuffer.close() now=datetime.datetime.now() + self.lastPing=now if (now-lastReport).seconds>=(3600*24): self.logStats(0, None) lastReport=now @@ -610,6 +869,69 @@ class Tail(PipeBase): del self.sourceIds[sourceId] self.sourceIdLock.release() self.log.debug("[Tail] Removed handler for source id %s" % (sourceId)) + + def adminServer(self): + sockWt=eu.liebrand.udppipe.Utility.SockWrite() + adminSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + adminSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + adminSocket.bind(('', self.adminPort)) + adminSocket.listen(5) + while not(self._terminate): + self.log.info("[Tail] Waiting for 'Admin' to connect on port %d" % (self.adminPort)) + try: + (clientSocket, address)=adminSocket.accept() + self.log.info("[Tail] Connection from 'Admin' at %s:%d" % (address[0], address[1])) + dataBuffer=cStringIO.StringIO() + privateKeyPath=self.privateKey + if not(os.path.exists(privateKeyPath)): + pwd=os.environ['PWD'] + privateKeyPath=os.path.join(pwd,privateKeyPath) + if not(os.path.exists(privateKeyPath)): + self.log.warn("[Tail] Could not enable admin functionality, **private** key not found at %s nor %s" % (self.privateKey, privateKeyPath)) + self.enableAdmin=False + certficatePath=self.certificate + if not(os.path.exists(certficatePath)): + pwd=os.environ['PWD'] + certficatePath=os.path.join(pwd, certficatePath) + if not(os.path.exists(certficatePath)): + self.log.warn("[Tail] Could not enable admin functionality, **public** key not found at %s nor %s" % (self.publicKey, certficatePath)) + self.enableAdmin=False + if not(self.enableAdmin): + sockWt.writeString(PipeBase.FIELD_ADMINSTATUS, "FAIL", dataBuffer) + else: + # read keys + fl=open(privateKeyPath, 'r') + privKey=fl.read() + fl.close() + privKey="".join(privKey.split("-----")[2].split()) + fl=open(certficatePath, 'r') + cert=fl.read() + fl.close() + cert="".join(cert.split("-----")[2].split()) + #privKey=privKey[2] + #privKey="".join(privKey.split()) + sockWt.writeString(PipeBase.FIELD_ADMINSTATUS, "ok", dataBuffer) + sockWt.writeString(PipeBase.FIELD_PRIVKEY, privKey, dataBuffer) + sockWt.writeString(PipeBase.FIELD_PUBKEY, cert, dataBuffer) + sockWt.writeString(PipeBase.FIELD_HOST, self.headHost, dataBuffer) + sockWt.writeLong(PipeBase.FIELD_PORT, self.adminPort, dataBuffer) + dta=dataBuffer.getvalue() + ctlBuffer=cStringIO.StringIO() + sockWt.writeLongDirect(len(dta), ctlBuffer) + sockWt.writeBinaryDirect(dta, ctlBuffer) + dta=ctlBuffer.getvalue() + bytesSnd=0 + while bytesSnd