diff --git a/UdpPipe/eu/liebrand/udppipe/__init__.py b/UdpPipe/eu/liebrand/udppipe/__init__.py index 678c0a6..078eb20 100644 --- a/UdpPipe/eu/liebrand/udppipe/__init__.py +++ b/UdpPipe/eu/liebrand/udppipe/__init__.py @@ -165,6 +165,7 @@ class PipeBase: signal.signal(signal.SIGQUIT, self.dumpstacks) self.startTime=datetime.datetime.now() self.lastPing=None + def getTimeStamp(self): @@ -242,14 +243,13 @@ class PipeBase: else: break return cfg - - + class Head(PipeBase): SECTION="head" - KEY_PIPEPORT="pipePort" - KEY_TAILHOSTNAME="tailHostname" + BYTES_IN, BYTES_OUT, PACKETS_IN, PACKETS_OUT, CONNECTS = range(0,5) + STATUS=['wait', 'idle', 'busy'] def __init__(self): @@ -260,6 +260,9 @@ class Head(PipeBase): signal.signal(signal.SIGINT, self.terminate) self.adminSocket=None self._terminateAdmin=False + self.statisticEvent = threading.Event() + self.statisticThread = None + self.status=0 def terminate(self, sigNo, stackFrame): @@ -269,6 +272,7 @@ class Head(PipeBase): self.log.info("[Head] Terminating upon Signal Term") self._terminate=True self.pipeSocket.close() + self.statisticEvent.set() if self.adminSocket is not None: self._terminateAdmin=True self.adminSocket.close() @@ -391,6 +395,7 @@ class Head(PipeBase): fields=readDict.read(''.join(data)) if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_PING: self.lastPing=datetime.datetime.now() + self.status=1 continue if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG: for k in fields.keys(): @@ -432,9 +437,13 @@ class Head(PipeBase): self.adminThread.join() self.log.info("[Head] Admin is enabled on port %d" % (self.adminPort)) self._terminateAdmin=False - self.adminThread=threading.Thread(target=self.adminServer) + self.adminThread=threading.Thread(target=self.adminServer, name="AdminServer-Thread") self.adminThread.daemon=True self.adminThread.start() + if self.statisticThread is None: + self.statisticThread=threading.Thread(target=self.statisticThread, name="Statistic-Thread") + self.statisticThread.daemon=True + self.statisticThread.start() continue for lstCfg in self.listenerConfig: @@ -454,6 +463,7 @@ class Head(PipeBase): found=False for lstCfg in self.listenerConfig: if lstCfg[1]==fields['srvPort']: + self.status=2 lstCfg[2].sendto(fields[PipeBase.FIELD_UDPDATA], (fields['host'], fields['port'])) found=True self.log.debug("[Head] Forwarded response packet of %d bytes to %s:%d for port %d" % \ @@ -468,6 +478,7 @@ class Head(PipeBase): for lstCfg in self.listenerConfig: if r==lstCfg[2]: + self.status=2 # we have an inbound message udpData, address=r.recvfrom(4096) self.log.debug("[Head] Received %d bytes from %s:%d" % (len(udpData), address[0], address[1])) @@ -498,6 +509,7 @@ class Head(PipeBase): else: self.log.warn("[Head] No activity for 180 seconds, assuming Tail is absent") self._terminateThread=True + self.status=0 now=datetime.datetime.now() if (now-lastReport).seconds>=(3600*24) or self._terminateThread or self._terminate: self.logStats(0, None) @@ -508,16 +520,18 @@ class Head(PipeBase): except socket.error, (_errno, _strerror): if _errno == errno.ECONNRESET: self.log.warn("[Head] Tail disconnected") - self._terminateThread=True else: self.log.exception(socket.error) - self._terminateThread=True + self._terminateThread=True + self.status=0 except Exception as e: self.log.exception(e) self._terminateThread=True + self.status=0 if clientSocket in socketArray: socketArray.remove(clientSocket) self.isThreadRunning=False + self.status=0 def adminServer(self): @@ -558,24 +572,42 @@ class Head(PipeBase): hsh = SHA256.new(dta) verifier=PKCS1_v1_5.new(pubKeyObj) signed=verifier.verify(hsh, signature) - if signed: - self.log.info("[Head] Received valid request from >Admin>") - else: + if not(signed): self.log.info("[Head] Received INVALID request from >Admin>") - dct=json.loads(dta) - if dct['op']=="challenge": - retData["challenge"]=str(uuid.uuid4()) - if dct['op']=="status": - retData["tailConnected"]=self.isThreadRunning - retData["lastPing"]=self.lastPing - retData["startTime"]=self.startTime - retData["reconnects"]=self.reconnects - retData["noOfPorts"]=len(self.listenerConfig) - if dct['op']=="statistic": - pass - if dct['op']=="detailStatistic": - pass - retData['status']='ok' + retData['status']='fail' + else: + dct=json.loads(dta) + self.log.info("[Head] Received valid request from >Admin> (op %s)" % (dct['op'])) + + if dct['op']=="challenge": + retData["challenge"]=str(uuid.uuid4()) + if dct['op']=="status": + retData["tailConnected"]=self.isThreadRunning + retData["lastPing"]=self.lastPing + retData["startTime"]=self.startTime + retData["reconnects"]=self.reconnects + retData["noOfPorts"]=len(self.listenerConfig) + retData["connStatus"]=Head.STATUS[self.status] + if dct['op']=="statisticHour": + for mn in self.byMinute: + tmp={} + tmp['bytesIn']=mn[Head.BYTES_IN] + tmp['bytesOut']=mn[Head.BYTES_OUT] + tmp['packetsIn']=mn[Head.PACKETS_IN] + tmp['packetsOut']=mn[Head.PACKETS_OUT] + tmp['reconnects']=mn[Head.CONNECTS] + retData[str(mn)]=tmp + if dct['op']=="statisticDay": + for hr in self.byHour: + tmp={} + tmp['bytesIn']=hr[Head.BYTES_IN] + tmp['bytesOut']=hr[Head.BYTES_OUT] + tmp['packetsIn']=hr[Head.PACKETS_IN] + tmp['packetsOut']=hr[Head.PACKETS_OUT] + tmp['reconnects']=hr[Head.CONNECTS] + retData[str(hr)]=tmp + pass + retData['status']='ok' else: retData['status']='fail' sockWt=eu.liebrand.udppipe.Utility.SockWrite() @@ -600,6 +632,32 @@ class Head(PipeBase): else: self.log.exception(e) + + + def statisticTracker(self): + self.byHour={} + self.byMinute={} + now=datetime.datetime.now() + waitTime=59-now.second + if waitTime>0: + self.statisticEvent.wait(waitTime) + baseline=[self.UDPBytesIn, self.UDPBytesOut, self.packetsIn, self.packetsOut, self.reconnects] + lastHour=25 + while not(self._terminate): + self.statisticEvent.wait(60) + now=datetime.datetime.now() + minute=now.minute + self.byMinute[now.minute]=[self.UDPBytesIn-baseline[0], self.UDPBytesOut-baseline[1], + self.packetsIn-baseline[2], self.packetsOut-baseline[3], self.reconnects-baseline[4]] + baseline=[self.UDPBytesIn, self.UDPBytesOut, self.packetsIn, self.packetsOut, self.reconnects] + hour=now.hour + if hour!=lastHour: + lastHour=hour + self.byHour[hour]=[self.byMinute[minute][0], self.byMinute[minute][1], self.byMinute[minute][2], self.byMinute[minute][3], self.byMinute[minute][4] ] + else: + self.byHour[hour]=[self.byHour[hour][0]+self.byMinute[minute][0], self.byHour[hour][1]+self.byMinute[minute][1], + self.byHour[hour][2]+self.byMinute[minute][2], self.byHour[hour][3]+self.byMinute[minute][3], self.byHour[hour][4]+self.byMinute[minute][4] ] + class Tail(PipeBase): @@ -820,7 +878,7 @@ class Tail(PipeBase): self.connected=False if e.errno == errno.EINTR and self._terminate: pass - elif e.errno==errno.ECONNREFUSED or e.errno==errno.EBADF or e.errno==errno.ECONNRESET: + elif e.errno==errno.ECONNREFUSED or e.errno==errno.EBADF or e.errno==errno.ECONNRESET or e.errno==errno.ENETUNREACH: self.log.warn("[Tail] Unable to connect to host %s:%d. Will try again in %d seconds.(Reason %s)" % (self.headHost,self.headPort, Tail.WAIT4RETRY, str(e))) servSocket.close() if servSocket in self.fds: