diff --git a/UdpPipe/eu/liebrand/udppipe/__init__.py b/UdpPipe/eu/liebrand/udppipe/__init__.py index 078eb20..1d7bc01 100644 --- a/UdpPipe/eu/liebrand/udppipe/__init__.py +++ b/UdpPipe/eu/liebrand/udppipe/__init__.py @@ -248,7 +248,7 @@ class PipeBase: class Head(PipeBase): SECTION="head" - BYTES_IN, BYTES_OUT, PACKETS_IN, PACKETS_OUT, CONNECTS = range(0,5) + BYTES_IN, BYTES_OUT, PACKETS_IN, PACKETS_OUT, CONNECTS, SORTKEY = range(0,6) STATUS=['wait', 'idle', 'busy'] @@ -263,6 +263,8 @@ class Head(PipeBase): self.statisticEvent = threading.Event() self.statisticThread = None self.status=0 + self.byHour={} + self.byMinute={} def terminate(self, sigNo, stackFrame): @@ -271,6 +273,8 @@ class Head(PipeBase): if sigNo==signal.SIGTERM: self.log.info("[Head] Terminating upon Signal Term") self._terminate=True + self._terminateThread=True + os.write(self.controlPipe[1], 'x') self.pipeSocket.close() self.statisticEvent.set() if self.adminSocket is not None: @@ -291,7 +295,13 @@ class Head(PipeBase): def run(self): # Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors # in combination with the daemon function + signal.signal(signal.SIGTERM, self.terminate) + signal.signal(signal.SIGINT, self.terminate) + signal.signal(signal.SIGUSR1, self.toggleLogLevel) + signal.signal(signal.SIGUSR2, self.logStats) + signal.signal(signal.SIGQUIT, self.dumpstacks) self.setupLogger(self.cfg) + self.log.info("[Head] Starting Listener") socketArray=[] # Step #1: Setup all the sockets @@ -352,10 +362,11 @@ class Head(PipeBase): except Exception as e: self.log.exception(e) raise - + self.log.info("[Head] Terminating Listener") def handleMessages(self, socketArray, clientSocket): + self.log.info("[Head] Starting Client Handler") self.isThreadRunning=True # Step #2: listen on all the sockets lastReport=datetime.datetime.now() @@ -430,7 +441,7 @@ class Head(PipeBase): self.enableAdmin=(self.adminPort!=0) if self.enableAdmin: if self.adminSocket is not None: - self.log.info("[Head] Disabling current admin") + self.log.info("[Head] Disabling current admin (Going to restart admin)") self._terminateAdmin=True self.adminSocket.close() self.adminSocket=None @@ -441,9 +452,11 @@ class Head(PipeBase): self.adminThread.daemon=True self.adminThread.start() if self.statisticThread is None: - self.statisticThread=threading.Thread(target=self.statisticThread, name="Statistic-Thread") + self.statisticThread=threading.Thread(target=self.statisticTracker, name="Statistic-Thread") self.statisticThread.daemon=True self.statisticThread.start() + else: + self.log.info("[Head] Statistic Tracker is already running") continue for lstCfg in self.listenerConfig: @@ -532,9 +545,11 @@ class Head(PipeBase): socketArray.remove(clientSocket) self.isThreadRunning=False self.status=0 + self.log.info("[Head] Terminating Client Handler") def adminServer(self): + self.log.info("[Head] Starting Admin Server") pubKeyObj=RSA.importKey(self.publicKey) adminSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) adminSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -589,30 +604,38 @@ class Head(PipeBase): retData["noOfPorts"]=len(self.listenerConfig) retData["connStatus"]=Head.STATUS[self.status] if dct['op']=="statisticHour": - for mn in self.byMinute: + stat=[] + for mn in self.byMinute.keys(): tmp={} - tmp['bytesIn']=mn[Head.BYTES_IN] - tmp['bytesOut']=mn[Head.BYTES_OUT] - tmp['packetsIn']=mn[Head.PACKETS_IN] - tmp['packetsOut']=mn[Head.PACKETS_OUT] - tmp['reconnects']=mn[Head.CONNECTS] + tmp['bytesIn']=self.byMinute[mn][Head.BYTES_IN] + tmp['bytesOut']=self.byMinute[mn][Head.BYTES_OUT] + tmp['packetsIn']=self.byMinute[mn][Head.PACKETS_IN] + tmp['packetsOut']=self.byMinute[mn][Head.PACKETS_OUT] + tmp['reconnects']=self.byMinute[mn][Head.CONNECTS] + tmp['timestamp']=self.byMinute[mn][Head.SORTKEY] retData[str(mn)]=tmp + stat.append(str(mn)) + retData['keys']=stat if dct['op']=="statisticDay": - for hr in self.byHour: + stat=[] + for hr in self.byHour.keys(): tmp={} - tmp['bytesIn']=hr[Head.BYTES_IN] - tmp['bytesOut']=hr[Head.BYTES_OUT] - tmp['packetsIn']=hr[Head.PACKETS_IN] - tmp['packetsOut']=hr[Head.PACKETS_OUT] - tmp['reconnects']=hr[Head.CONNECTS] + tmp['bytesIn']=self.byHour[hr][Head.BYTES_IN] + tmp['bytesOut']=self.byHour[hr][Head.BYTES_OUT] + tmp['packetsIn']=self.byHour[hr][Head.PACKETS_IN] + tmp['packetsOut']=self.byHour[hr][Head.PACKETS_OUT] + tmp['reconnects']=self.byHour[hr][Head.CONNECTS] + tmp['timestamp']=self.byMinute[hr][Head.SORTKEY] retData[str(hr)]=tmp - pass + stat.append(str(hr)) + retData['keys']=stat retData['status']='ok' else: retData['status']='fail' sockWt=eu.liebrand.udppipe.Utility.SockWrite() buf=cStringIO.StringIO() retStrg=json.dumps(retData, cls=DateTimeEncoder) + print retStrg sockWt.writeString('result', retStrg, buf) dta=buf.getvalue() ctlBuffer=cStringIO.StringIO() @@ -627,16 +650,17 @@ class Head(PipeBase): self.log.info("[Head] Send %d bytes to >Admin<" % (bytesSnd)) clientSocket.close() except socket.error as e: + if self._terminate or self._terminateAdmin: + pass if e.errno == errno.EINTR: pass else: self.log.exception(e) - + self.log.info("[Head] Terminating Admin Server") def statisticTracker(self): - self.byHour={} - self.byMinute={} + self.log.info("[Head] Starting Statistic Tracker") now=datetime.datetime.now() waitTime=59-now.second if waitTime>0: @@ -648,16 +672,16 @@ class Head(PipeBase): now=datetime.datetime.now() minute=now.minute self.byMinute[now.minute]=[self.UDPBytesIn-baseline[0], self.UDPBytesOut-baseline[1], - self.packetsIn-baseline[2], self.packetsOut-baseline[3], self.reconnects-baseline[4]] + self.packetsIn-baseline[2], self.packetsOut-baseline[3], self.reconnects-baseline[4], int(now.strftime('%s'))] baseline=[self.UDPBytesIn, self.UDPBytesOut, self.packetsIn, self.packetsOut, self.reconnects] hour=now.hour if hour!=lastHour: lastHour=hour - self.byHour[hour]=[self.byMinute[minute][0], self.byMinute[minute][1], self.byMinute[minute][2], self.byMinute[minute][3], self.byMinute[minute][4] ] + self.byHour[hour]=[self.byMinute[minute][0], self.byMinute[minute][1], self.byMinute[minute][2], self.byMinute[minute][3], self.byMinute[minute][4], int(now.strftime('%s')) ] else: self.byHour[hour]=[self.byHour[hour][0]+self.byMinute[minute][0], self.byHour[hour][1]+self.byMinute[minute][1], - self.byHour[hour][2]+self.byMinute[minute][2], self.byHour[hour][3]+self.byMinute[minute][3], self.byHour[hour][4]+self.byMinute[minute][4] ] - + self.byHour[hour][2]+self.byMinute[minute][2], self.byHour[hour][3]+self.byMinute[minute][3], self.byHour[hour][4]+self.byMinute[minute][4], self.byHour[hour][5] ] + self.log.info("[Head] Terminating Statistic Tracker") class Tail(PipeBase): @@ -726,13 +750,14 @@ class Tail(PipeBase): servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) # wait for 2 seconds before trying to connect (avoid missing each other, when cron # starts head & tail at the same time - time.sleep(2) + retry=6 lastReport=datetime.datetime.now() while not(self._terminate): try: self.log.info("[Tail] Trying to connect to >Head< at %s:%d" % (self.headHost,self.headPort)) servSocket.connect((self.headHost,self.headPort)) self.connected=True + retry=6 self.log.info("[Tail] Connected to >Head< at %s" % (str(servSocket.getpeername()))) self.fds.append(servSocket) @@ -875,18 +900,23 @@ class Tail(PipeBase): self.log.debug("[Tail] Forwarded response packet of %d bytes for listening port %d from client %s:%d to >Head<" % \ (len(data[PipeBase.FIELD_UDPDATA]), data[PipeBase.FIELD_SRVPORT], data[PipeBase.FIELD_HOST], data[PipeBase.FIELD_PORT] ) ) except socket.error as e: - self.connected=False if e.errno == errno.EINTR and self._terminate: pass elif e.errno==errno.ECONNREFUSED or e.errno==errno.EBADF or e.errno==errno.ECONNRESET or e.errno==errno.ENETUNREACH: - self.log.warn("[Tail] Unable to connect to host %s:%d. Will try again in %d seconds.(Reason %s)" % (self.headHost,self.headPort, Tail.WAIT4RETRY, str(e))) + retry-=1 + if retry>0: + waitTime=Tail.WAIT4RETRY / 10 + else: + waitTime=Tail.WAIT4RETRY + self.log.warn("[Tail] Unable to connect to host %s:%d. Will try again in %d seconds. Reason %s" % (self.headHost,self.headPort, waitTime, str(e))) servSocket.close() if servSocket in self.fds: self.fds.remove(servSocket) self.connected=False - time.sleep(Tail.WAIT4RETRY) + time.sleep(waitTime) servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) else: + self.connected=False raise self.logStats(0, None) self.log.info("[Tail] Terminating") @@ -1016,14 +1046,20 @@ if __name__ == '__main__': help="run as tail end of the pipe") (options, args) = parser.parse_args() if(len(args)==0): - print "specify start|stop|restart" + print "specify start|stop|restart|nodaemon" sys.exit() if options.mode=="head": head=Head() - daemon_runner = runner.DaemonRunner(head) - daemon_runner.do_action() + if(args[0]=='nodaemon'): + head.run() + else: + daemon_runner = runner.DaemonRunner(head) + daemon_runner.do_action() if options.mode=="tail": tail=Tail() - daemon_runner = runner.DaemonRunner(tail) - daemon_runner.do_action() - \ No newline at end of file + if(args[0]=='nodaemon'): + tail.run() + else: + daemon_runner = runner.DaemonRunner(tail) + daemon_runner.do_action() + \ No newline at end of file