import socket import threading from ConfigParser import NoSectionError, NoOptionError, RawConfigParser from os.path import isfile, join, exists, basename import time import logging from logging.handlers import RotatingFileHandler import exceptions import sys import select import eu.liebrand.udppipe.Utility import cStringIO import signal import os import Queue import datetime from optparse import OptionParser import errno from daemon import runner class PipeBase: CONFIG_DIR="./" CONFIG_FILE="udppipe.ini" KEY_ENABLELOGGING="EnableLogging" KEY_LOGFILENAME="logFileName" KEY_MAXFILESIZE="MaxFilesize" KEY_MSGFORMAT="MsgFormat" KEY_LOGLEVEL="logLevel" KEY_PORTCONFIG="portConfig_%d" KEY_LISTENPORT="listenPort" KEY_CFGID="id" KEY_FORWARDHOST="forwardHost" KEY_FORWARDPORT="forwardPort" FIELD_HOST='host' FIELD_PORT='port' FIELD_OP='OP' FIELD_SRVPORT='srvPort' FIELD_UDPDATA='udpData' VALUE_UDP='udp' VALUE_PING='ping' VALUE_CONFIG='__cfg__' IDX_FORWARDHOST=3 IDX_FORWARDPORT=4 DEFAULTS={KEY_ENABLELOGGING :"yes", KEY_LOGFILENAME : "/tmp/udppipe.log", KEY_MAXFILESIZE : 1000000, KEY_MSGFORMAT : "%(asctime)s, %(levelname)s, %(module)s, %(lineno)d, %(message)s", KEY_LOGLEVEL : 20 } def __init__(self, section): self.stdin_path = '/dev/null' self.stdout_path = '/dev/tty' self.stderr_path = '/dev/tty' self.pidfile_path = '/tmp/udppipe.pid' self.pidfile_timeout = 5 self.section=section path=join(PipeBase.CONFIG_DIR, PipeBase.CONFIG_FILE) if not(exists(path)): self.printLogLine(sys.stderr,"[UDPPIPE] No config file %s found at %s" % (PipeBase.CONFIG_FILE, PipeBase.CONFIG_DIR)) self.setupOk=False return self.cfg=self.readConfig(path) #self.setupLogger(self.cfg) self.setupOk=True #self.log.info("[%s] init done" % (section)) self.packetsIn=0 self.packetsOut=0 self.UDPBytesIn=0 self.UDPBytesOut=0 self.TCPBytesIn=0 self.TCPBytesOut=0 self.reconnects=0 signal.signal(signal.SIGUSR1, self.toggleLogLevel) signal.signal(signal.SIGUSR2, self.logStats) self.startTime=datetime.datetime.now() def getTimeStamp(self): return time.strftime('%d.%m.%Y %H:%M:%S', time.localtime(time.time())) def printLogLine(self, fl, message): fl.write('%s %s\n' % (self.getTimeStamp(), message)) fl.flush() def setupLogger(self, cfg): try: maxFileSize=cfg.getint(self.section, PipeBase.KEY_MAXFILESIZE) msgFormat=cfg.get(self.section, PipeBase.KEY_MSGFORMAT) self.logFileName=cfg.get(self.section, PipeBase.KEY_LOGFILENAME) self.log=logging.Logger(self.section) loghdl=RotatingFileHandler(self.logFileName, 'a', maxFileSize, 4) loghdl.setFormatter(logging.Formatter(msgFormat)) loghdl.setLevel(cfg.getint(self.section, PipeBase.KEY_LOGLEVEL)) self.log.addHandler(loghdl) self.log.disabled=False return True except exceptions.Exception, e: self.printLogLine(sys.stderr, "[UDPPIPE] Unable to initialize logging. Reason: %s" % e) return False def toggleLogLevel(self, sigNo, stackFrame): if self.log.getEffectiveLevel()==10: newLevel=20 else: newLevel=10 self.log.setLevel(newLevel) def logStats(self, sigNo, stackFrame): now=datetime.datetime.now() uptime=now-self.startTime self.log.info("[%s] %d Packets in, %d Packets out" % (self.section, self.packetsIn, self.packetsOut)) self.log.info("[%s] UDP Traffic %d bytes in, %d bytes out, TCP Traffic %d bytes in, %d bytes out" % (self.section, self.UDPBytesIn, self.UDPBytesOut, self.TCPBytesIn, self.TCPBytesOut)) self.log.info("[%s] Uptime %s, Reconnects %d" % (self.section, str(uptime), self.reconnects)) def readConfig(self, cfgFile): cfg=RawConfigParser(PipeBase.DEFAULTS) _=cfg.read(cfgFile) i=0 self.listenerConfig=[] while True: i+=1 section=PipeBase.KEY_PORTCONFIG % (i) if cfg.has_section(section): listenPort=cfg.getint(section, PipeBase.KEY_LISTENPORT) cfgId=cfg.get(section, PipeBase.KEY_CFGID) if cfgId==PipeBase.VALUE_CONFIG: self.printLogLine(sys.stderr, "WARN: Don't use ID %s for a port configuration" % (cfgId)) if self.section==Head.SECTION: forwardHost=None forwardPort=None else: forwardHost=cfg.get(section, PipeBase.KEY_FORWARDHOST) forwardPort=cfg.getint(section, PipeBase.KEY_FORWARDPORT) self.listenerConfig.append([ cfgId, listenPort, None, forwardHost, forwardPort ]) else: break return cfg class Head(PipeBase): SECTION="head" KEY_PIPEPORT="pipePort" KEY_TAILHOSTNAME="tailHostname" def __init__(self): PipeBase.__init__(self, Head.SECTION) self.readHeadConfig(self.cfg) self._terminate=False signal.signal(signal.SIGTERM, self.terminate) signal.signal(signal.SIGINT, self.terminate) def terminate(self, sigNo, stackFrame): if sigNo==signal.SIGINT: self.log.info("[Head] Terminating upon Keyboard Interrupt") if sigNo==signal.SIGTERM: self.log.info("[Head] Terminating upon Signal Term") self._terminate=True self.pipeSocket.close() def readHeadConfig(self, cfg): self.pipePort=cfg.getint(Head.SECTION, Head.KEY_PIPEPORT) self.enableHostNameCheck=cfg.has_option(Head.SECTION, Head.KEY_TAILHOSTNAME) if self.enableHostNameCheck: self.tailHostname=cfg.get(Head.SECTION, Head.KEY_TAILHOSTNAME) def run(self): # Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors # in combination with the daemon function self.setupLogger(self.cfg) socketArray=[] # Step #1: Setup all the sockets self.pipeSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.pipeSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.pipeSocket.bind(('', self.pipePort)) self.pipeSocket.listen(5) while not(self._terminate): try: self.log.info("[Head] Waiting for >Tail< to connect on port %d" % (self.pipePort)) (clientSocket, address) = self.pipeSocket.accept() if self.enableHostNameCheck: data = socket.gethostbyname(self.tailHostname) if address[0]!=data: self.log.warn("[Head] Connection attempt from wrong IP (%s but expected %s)" % (address[0], data)) clientSocket.close() continue self.log.info("[Head] Connection from tail at %s:%d" % (address[0], address[1])) self.reconnects+=1 socketArray.append(clientSocket) tailConnection=True # now we are ready for incoming udp messages try: if len(socketArray)==1: for lstCfg in self.listenerConfig: lstCfg[2]=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) lstCfg[2].bind(('', lstCfg[1])) socketArray.append(lstCfg[2]) self.log.info("[Head] UDP Listener <%s> on port <%d>" % (lstCfg[0], lstCfg[1])) except socket.error as e: if e.errno==errno.EADDRINUSE: self.log.error("[Head] Unable to listen on port %d - already in use" % (lstCfg[1])) else: self.log.error("[Head] Unable to listen on port %d - Error %d %s" % (lstCfg[1], e.errno, errno.errorcode[e.errno])) self._terminate=True clientSocket.close() continue # Step #2: listen on all the sockets lastReport=datetime.datetime.now() while not(self._terminate) and tailConnection: try: ready=select.select(socketArray, [], [], 3600) except select.error, (_errno, _strerror): if _errno == errno.EINTR: continue else: raise if ready[0]: for r in ready[0]: if r==clientSocket: # we received something from the tail dta=r.recv(5) if len(dta)==0: #connection reset self.log.warn("[Head] >Tail< disconnected") socketArray.remove(clientSocket) clientSocket.close() tailConnection=False continue while(len(dta)<5): dta+=r.recv(5-len(dta)) sockRd=eu.liebrand.udppipe.Utility.SockRead() buf=cStringIO.StringIO(dta) _,_,length=sockRd.read(buf) data=[] tmp=length while length>0: chunk=r.recv(length) data.append(chunk) length-=len(chunk) self.log.debug("[Head] Received %d bytes from >Tail<" % (tmp)) self.TCPBytesIn+=tmp self.packetsIn+=1 readDict=eu.liebrand.udppipe.Utility.ReadDictionary() fields=readDict.read(''.join(data)) if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_PING: continue if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG: for k in fields.keys(): if k!=PipeBase.FIELD_OP: found=False for lstCfg in self.listenerConfig: if k==lstCfg[0]: found=True # existing ID if fields[k]!=lstCfg[1]: # listening port changed self.log.info("[Head] Received new port for service id %s (old %d -> new %d)" % (k, lstCfg[1], fields[k])) lstCfg[2].close() socketArray.remove(lstCfg[2]) lstCfg[1]=fields[k] lstCfg[2]=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) lstCfg[2].bind(('', lstCfg[1])) socketArray.append(lstCfg[2]) if not(found): s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(('', fields[k])) self.listenerConfig.append([ k, fields[k], s, None, None ]) socketArray.append(s) self.log.info("[Head] New UDP Listener <%s> on port <%d>" % (k, fields[k])) for lstCfg in self.listenerConfig: found=False for k in fields.keys(): if k==lstCfg[0]: found=True break if not(found): lstCfg[2].close() self.listenerConfig.remove(lstCfg) self.log.info("[Head] Deleted UDP Listener <%s> on port <%d>" % (lstCfg[0], lstCfg[1])) continue # find the outbound socket found=False for lstCfg in self.listenerConfig: if lstCfg[1]==fields['srvPort']: lstCfg[2].sendto(fields[PipeBase.FIELD_UDPDATA], (fields['host'], fields['port'])) found=True self.log.debug("[Head] Forwarded response packet of %d bytes to %s:%d for port %d" % \ (len(fields[PipeBase.FIELD_UDPDATA]), fields['host'], fields['port'], \ fields['srvPort'])) self.UDPBytesOut+=len(fields[PipeBase.FIELD_UDPDATA]) self.packetsOut+=1 break if not(found): self.log.warn("[Head] Received a response for an unknown client on port %d" % (fields['srvPort'])) for lstCfg in self.listenerConfig: if r==lstCfg[2]: # we have an inbound message udpData, address=r.recvfrom(4096) self.log.debug("[Head] Received %d bytes from %s:%d" % (len(udpData), address[0], address[1])) self.UDPBytesIn+=len(udpData) self.packetsIn+=1 # we need to send udpData, listening Port, address util=eu.liebrand.udppipe.Utility.SockWrite() dataBuffer=cStringIO.StringIO() util.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_UDP, dataBuffer) util.writeString(PipeBase.FIELD_HOST, address[0], dataBuffer) util.writeLong(PipeBase.FIELD_PORT, address[1], dataBuffer) util.writeLong(PipeBase.FIELD_SRVPORT, lstCfg[1], dataBuffer) util.writeBinary(PipeBase.FIELD_UDPDATA, udpData, dataBuffer) dta=dataBuffer.getvalue() ctlBuffer=cStringIO.StringIO() util.writeLongDirect(len(dta), ctlBuffer) util.writeBinaryDirect(dta, ctlBuffer) dta=ctlBuffer.getvalue() bytesSnd=0 while bytesSndTail<" % (bytesSnd)) self.TCPBytesOut+=bytesSnd self.packetsOut+=1 dataBuffer.close() ctlBuffer.close() break now=datetime.datetime.now() if (now-lastReport).seconds>=(3600*24): self.logStats(0, None) lastReport=now except socket.error as e: if e.errno == errno.EINTR and self._terminate: pass elif e.errno == errno.ECONNRESET: self.log.warn("[Head] Tail disconnected") socketArray.remove(clientSocket) else: raise except Exception as e: self.log.exception(e) class Tail(PipeBase): SECTION="tail" KEY_HEADHOST="headHost" KEY_HEADPORT="headPort" KEY_TIMEOUT="timeout" WAIT4RETRY=300 def __init__(self): PipeBase.__init__(self, Tail.SECTION) self._terminate=False signal.signal(signal.SIGTERM, self.terminate) signal.signal(signal.SIGINT, self.terminate) self.readTailConfig(self.cfg) self.sourceIds={} self.sourceIdLock=threading.Lock() self.responseQ=Queue.Queue() self.connected=False def readTailConfig(self, cfg): self.headHost=cfg.get(Tail.SECTION, Tail.KEY_HEADHOST) self.headPort=cfg.getint(Tail.SECTION, Tail.KEY_HEADPORT) self.timeout=cfg.getint(Tail.SECTION, Tail.KEY_TIMEOUT) def terminate(self, sigNo, stackFrame): if sigNo==signal.SIGINT: self.log.info("[Tail] Terminating upon Keyboard Interrupt") if sigNo==signal.SIGTERM: self.log.info("[Tail] Terminating upon Signal Term") self._terminate=True if self.connected: os.write(self.controlPipe[1], 'x') for s in self.sourceIds.keys(): v=self.sourceIds[s] v[0].put({}) os.write(v[1][1], 'x') def run(self): # Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors # in combination with the daemon function self.setupLogger(self.cfg) self.controlPipe=os.pipe() self.fds=[] self.fds.append(self.controlPipe[0]) sockRd=eu.liebrand.udppipe.Utility.SockRead() sockWt=eu.liebrand.udppipe.Utility.SockWrite() # Step #1: Connect to the head servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) # wait for 2 seconds before trying to connect (avoid missing each other, when cron # starts head & tail at the same time time.sleep(2) lastReport=datetime.datetime.now() while not(self._terminate): try: servSocket.connect((self.headHost,self.headPort)) self.connected=True self.log.info("[Tail] Connected to >Head< at %s:%d" % (self.headHost,self.headPort)) self.fds.append(servSocket) # send config dataBuffer=cStringIO.StringIO() sockWt.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_CONFIG, dataBuffer) for lstCfg in self.listenerConfig: sockWt.writeLong(lstCfg[0], lstCfg[1], dataBuffer) dta=dataBuffer.getvalue() ctlBuffer=cStringIO.StringIO() sockWt.writeLongDirect(len(dta), ctlBuffer) sockWt.writeBinaryDirect(dta, ctlBuffer) dta=ctlBuffer.getvalue() bytesSnd=0 while bytesSnd=(3600*24): self.logStats(0, None) lastReport=now continue for r in ready[0]: if r==servSocket: dta=servSocket.recv(5) if len(dta)==0: # Head has gone self.connected=False servSocket.close() continue while(len(dta)<5): dta+=r.recv(5-len(dta)) buf=cStringIO.StringIO(dta) _,_,length=sockRd.read(buf) self.log.debug("[Tail] Received %ld bytes from >Head<" % (length)) data=[] while length>0: chunk=r.recv(length) data.append(chunk) length-=len(chunk) self.TCPBytesIn+=len(chunk) self.packetsIn+=1 readDict=eu.liebrand.udppipe.Utility.ReadDictionary() fields=readDict.read(''.join(data)) # received the data as dict - now we need to find out whether we already have thread # for host:port running if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_UDP: sourceId=str(fields[PipeBase.FIELD_SRVPORT]) + "@" + fields[PipeBase.FIELD_HOST] + ":" + str(fields[PipeBase.FIELD_PORT]) self.sourceIdLock.acquire() if self.sourceIds.has_key(sourceId): self.log.debug("[Tail] Adding packet to existing handler") self.sourceIds[sourceId][0].put(fields) #wake up thread os.write(self.sourceIds[sourceId][1][1],'x') else: self.log.debug("[Tail] Creating new handler for source id %s" % (sourceId)) found=False for lstCfg in self.listenerConfig: if lstCfg[1]==fields[PipeBase.FIELD_SRVPORT]: found=True break if found: q=Queue.Queue() q.put(fields) self.sourceIds[sourceId]=[q, os.pipe()] t=threading.Thread(target=self.handleUdpPacket, args=(lstCfg, sourceId, fields)) t.daemon=True t.start() else: self.log.error("[TAIL] Received UDP Packet for port %d without having a forward configured" % fields[PipeBase.FIELD_SRVPORT]) self.sourceIdLock.release() if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG: #todo send a dict / json with the head configuration over pass if r==self.controlPipe[0]: os.read(self.controlPipe[0],1) data=self.responseQ.get() dataBuffer=cStringIO.StringIO() sockWt.writeString(PipeBase.FIELD_OP, PipeBase.VALUE_UDP, dataBuffer) sockWt.writeString(PipeBase.FIELD_HOST, data[PipeBase.FIELD_HOST], dataBuffer) sockWt.writeLong(PipeBase.FIELD_PORT, data[PipeBase.FIELD_PORT], dataBuffer) sockWt.writeLong(PipeBase.FIELD_SRVPORT, data[PipeBase.FIELD_SRVPORT], dataBuffer) sockWt.writeBinary(PipeBase.FIELD_UDPDATA, data[PipeBase.FIELD_UDPDATA], dataBuffer) dta=dataBuffer.getvalue() ctlBuffer=cStringIO.StringIO() sockWt.writeLongDirect(len(dta), ctlBuffer) sockWt.writeBinaryDirect(dta, ctlBuffer) dta=ctlBuffer.getvalue() bytesSnd=0 while bytesSndHead<" % \ (len(data[PipeBase.FIELD_UDPDATA]), data[PipeBase.FIELD_SRVPORT], data[PipeBase.FIELD_HOST], data[PipeBase.FIELD_PORT] ) ) except socket.error as e: self.connected=False if e.errno == errno.EINTR and self._terminate: pass elif e.errno==errno.ECONNREFUSED or e.errno==errno.EBADF or e.errno==errno.ECONNRESET: self.log.warn("[Tail] Unable to connect to host %s:%d. Will try again in %d seconds.(Reason %s)" % (self.headHost,self.headPort, Tail.WAIT4RETRY, str(e))) servSocket.close() self.fds.remove(servSocket) self.connected=False time.sleep(Tail.WAIT4RETRY) servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) else: raise self.logStats(0, None) self.log.info("[Tail] Terminating") def handleUdpPacket(self, listenerCfg, sourceId, fields): queue=self.sourceIds[sourceId][0] localfds=self.sourceIds[sourceId][1] udpSocket=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) lastAction=datetime.datetime.now() initial=True while not(self._terminate) and (datetime.datetime.now()-lastAction).seconds