1
0
mirror of https://github.com/FrodoVDR/UdpPipe.git synced 2023-10-10 13:36:54 +02:00

enhancements on adminserver

This commit is contained in:
Mark Liebrand 2017-02-06 21:02:13 +01:00
parent e641813c88
commit 0766ad9df1

View File

@ -165,6 +165,7 @@ class PipeBase:
signal.signal(signal.SIGQUIT, self.dumpstacks) signal.signal(signal.SIGQUIT, self.dumpstacks)
self.startTime=datetime.datetime.now() self.startTime=datetime.datetime.now()
self.lastPing=None self.lastPing=None
def getTimeStamp(self): def getTimeStamp(self):
@ -242,14 +243,13 @@ class PipeBase:
else: else:
break break
return cfg return cfg
class Head(PipeBase): class Head(PipeBase):
SECTION="head" SECTION="head"
KEY_PIPEPORT="pipePort" BYTES_IN, BYTES_OUT, PACKETS_IN, PACKETS_OUT, CONNECTS = range(0,5)
KEY_TAILHOSTNAME="tailHostname" STATUS=['wait', 'idle', 'busy']
def __init__(self): def __init__(self):
@ -260,6 +260,9 @@ class Head(PipeBase):
signal.signal(signal.SIGINT, self.terminate) signal.signal(signal.SIGINT, self.terminate)
self.adminSocket=None self.adminSocket=None
self._terminateAdmin=False self._terminateAdmin=False
self.statisticEvent = threading.Event()
self.statisticThread = None
self.status=0
def terminate(self, sigNo, stackFrame): def terminate(self, sigNo, stackFrame):
@ -269,6 +272,7 @@ class Head(PipeBase):
self.log.info("[Head] Terminating upon Signal Term") self.log.info("[Head] Terminating upon Signal Term")
self._terminate=True self._terminate=True
self.pipeSocket.close() self.pipeSocket.close()
self.statisticEvent.set()
if self.adminSocket is not None: if self.adminSocket is not None:
self._terminateAdmin=True self._terminateAdmin=True
self.adminSocket.close() self.adminSocket.close()
@ -391,6 +395,7 @@ class Head(PipeBase):
fields=readDict.read(''.join(data)) fields=readDict.read(''.join(data))
if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_PING: if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_PING:
self.lastPing=datetime.datetime.now() self.lastPing=datetime.datetime.now()
self.status=1
continue continue
if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG: if fields[PipeBase.FIELD_OP]==PipeBase.VALUE_CONFIG:
for k in fields.keys(): for k in fields.keys():
@ -432,9 +437,13 @@ class Head(PipeBase):
self.adminThread.join() self.adminThread.join()
self.log.info("[Head] Admin is enabled on port %d" % (self.adminPort)) self.log.info("[Head] Admin is enabled on port %d" % (self.adminPort))
self._terminateAdmin=False self._terminateAdmin=False
self.adminThread=threading.Thread(target=self.adminServer) self.adminThread=threading.Thread(target=self.adminServer, name="AdminServer-Thread")
self.adminThread.daemon=True self.adminThread.daemon=True
self.adminThread.start() self.adminThread.start()
if self.statisticThread is None:
self.statisticThread=threading.Thread(target=self.statisticThread, name="Statistic-Thread")
self.statisticThread.daemon=True
self.statisticThread.start()
continue continue
for lstCfg in self.listenerConfig: for lstCfg in self.listenerConfig:
@ -454,6 +463,7 @@ class Head(PipeBase):
found=False found=False
for lstCfg in self.listenerConfig: for lstCfg in self.listenerConfig:
if lstCfg[1]==fields['srvPort']: if lstCfg[1]==fields['srvPort']:
self.status=2
lstCfg[2].sendto(fields[PipeBase.FIELD_UDPDATA], (fields['host'], fields['port'])) lstCfg[2].sendto(fields[PipeBase.FIELD_UDPDATA], (fields['host'], fields['port']))
found=True found=True
self.log.debug("[Head] Forwarded response packet of %d bytes to %s:%d for port %d" % \ self.log.debug("[Head] Forwarded response packet of %d bytes to %s:%d for port %d" % \
@ -468,6 +478,7 @@ class Head(PipeBase):
for lstCfg in self.listenerConfig: for lstCfg in self.listenerConfig:
if r==lstCfg[2]: if r==lstCfg[2]:
self.status=2
# we have an inbound message # we have an inbound message
udpData, address=r.recvfrom(4096) udpData, address=r.recvfrom(4096)
self.log.debug("[Head] Received %d bytes from %s:%d" % (len(udpData), address[0], address[1])) self.log.debug("[Head] Received %d bytes from %s:%d" % (len(udpData), address[0], address[1]))
@ -498,6 +509,7 @@ class Head(PipeBase):
else: else:
self.log.warn("[Head] No activity for 180 seconds, assuming Tail is absent") self.log.warn("[Head] No activity for 180 seconds, assuming Tail is absent")
self._terminateThread=True self._terminateThread=True
self.status=0
now=datetime.datetime.now() now=datetime.datetime.now()
if (now-lastReport).seconds>=(3600*24) or self._terminateThread or self._terminate: if (now-lastReport).seconds>=(3600*24) or self._terminateThread or self._terminate:
self.logStats(0, None) self.logStats(0, None)
@ -508,16 +520,18 @@ class Head(PipeBase):
except socket.error, (_errno, _strerror): except socket.error, (_errno, _strerror):
if _errno == errno.ECONNRESET: if _errno == errno.ECONNRESET:
self.log.warn("[Head] Tail disconnected") self.log.warn("[Head] Tail disconnected")
self._terminateThread=True
else: else:
self.log.exception(socket.error) self.log.exception(socket.error)
self._terminateThread=True self._terminateThread=True
self.status=0
except Exception as e: except Exception as e:
self.log.exception(e) self.log.exception(e)
self._terminateThread=True self._terminateThread=True
self.status=0
if clientSocket in socketArray: if clientSocket in socketArray:
socketArray.remove(clientSocket) socketArray.remove(clientSocket)
self.isThreadRunning=False self.isThreadRunning=False
self.status=0
def adminServer(self): def adminServer(self):
@ -558,24 +572,42 @@ class Head(PipeBase):
hsh = SHA256.new(dta) hsh = SHA256.new(dta)
verifier=PKCS1_v1_5.new(pubKeyObj) verifier=PKCS1_v1_5.new(pubKeyObj)
signed=verifier.verify(hsh, signature) signed=verifier.verify(hsh, signature)
if signed: if not(signed):
self.log.info("[Head] Received valid request from >Admin>")
else:
self.log.info("[Head] Received INVALID request from >Admin>") self.log.info("[Head] Received INVALID request from >Admin>")
dct=json.loads(dta) retData['status']='fail'
if dct['op']=="challenge": else:
retData["challenge"]=str(uuid.uuid4()) dct=json.loads(dta)
if dct['op']=="status": self.log.info("[Head] Received valid request from >Admin> (op %s)" % (dct['op']))
retData["tailConnected"]=self.isThreadRunning
retData["lastPing"]=self.lastPing if dct['op']=="challenge":
retData["startTime"]=self.startTime retData["challenge"]=str(uuid.uuid4())
retData["reconnects"]=self.reconnects if dct['op']=="status":
retData["noOfPorts"]=len(self.listenerConfig) retData["tailConnected"]=self.isThreadRunning
if dct['op']=="statistic": retData["lastPing"]=self.lastPing
pass retData["startTime"]=self.startTime
if dct['op']=="detailStatistic": retData["reconnects"]=self.reconnects
pass retData["noOfPorts"]=len(self.listenerConfig)
retData['status']='ok' retData["connStatus"]=Head.STATUS[self.status]
if dct['op']=="statisticHour":
for mn in self.byMinute:
tmp={}
tmp['bytesIn']=mn[Head.BYTES_IN]
tmp['bytesOut']=mn[Head.BYTES_OUT]
tmp['packetsIn']=mn[Head.PACKETS_IN]
tmp['packetsOut']=mn[Head.PACKETS_OUT]
tmp['reconnects']=mn[Head.CONNECTS]
retData[str(mn)]=tmp
if dct['op']=="statisticDay":
for hr in self.byHour:
tmp={}
tmp['bytesIn']=hr[Head.BYTES_IN]
tmp['bytesOut']=hr[Head.BYTES_OUT]
tmp['packetsIn']=hr[Head.PACKETS_IN]
tmp['packetsOut']=hr[Head.PACKETS_OUT]
tmp['reconnects']=hr[Head.CONNECTS]
retData[str(hr)]=tmp
pass
retData['status']='ok'
else: else:
retData['status']='fail' retData['status']='fail'
sockWt=eu.liebrand.udppipe.Utility.SockWrite() sockWt=eu.liebrand.udppipe.Utility.SockWrite()
@ -600,6 +632,32 @@ class Head(PipeBase):
else: else:
self.log.exception(e) self.log.exception(e)
def statisticTracker(self):
self.byHour={}
self.byMinute={}
now=datetime.datetime.now()
waitTime=59-now.second
if waitTime>0:
self.statisticEvent.wait(waitTime)
baseline=[self.UDPBytesIn, self.UDPBytesOut, self.packetsIn, self.packetsOut, self.reconnects]
lastHour=25
while not(self._terminate):
self.statisticEvent.wait(60)
now=datetime.datetime.now()
minute=now.minute
self.byMinute[now.minute]=[self.UDPBytesIn-baseline[0], self.UDPBytesOut-baseline[1],
self.packetsIn-baseline[2], self.packetsOut-baseline[3], self.reconnects-baseline[4]]
baseline=[self.UDPBytesIn, self.UDPBytesOut, self.packetsIn, self.packetsOut, self.reconnects]
hour=now.hour
if hour!=lastHour:
lastHour=hour
self.byHour[hour]=[self.byMinute[minute][0], self.byMinute[minute][1], self.byMinute[minute][2], self.byMinute[minute][3], self.byMinute[minute][4] ]
else:
self.byHour[hour]=[self.byHour[hour][0]+self.byMinute[minute][0], self.byHour[hour][1]+self.byMinute[minute][1],
self.byHour[hour][2]+self.byMinute[minute][2], self.byHour[hour][3]+self.byMinute[minute][3], self.byHour[hour][4]+self.byMinute[minute][4] ]
class Tail(PipeBase): class Tail(PipeBase):
@ -820,7 +878,7 @@ class Tail(PipeBase):
self.connected=False self.connected=False
if e.errno == errno.EINTR and self._terminate: if e.errno == errno.EINTR and self._terminate:
pass pass
elif e.errno==errno.ECONNREFUSED or e.errno==errno.EBADF or e.errno==errno.ECONNRESET: elif e.errno==errno.ECONNREFUSED or e.errno==errno.EBADF or e.errno==errno.ECONNRESET or e.errno==errno.ENETUNREACH:
self.log.warn("[Tail] Unable to connect to host %s:%d. Will try again in %d seconds.(Reason %s)" % (self.headHost,self.headPort, Tail.WAIT4RETRY, str(e))) self.log.warn("[Tail] Unable to connect to host %s:%d. Will try again in %d seconds.(Reason %s)" % (self.headHost,self.headPort, Tail.WAIT4RETRY, str(e)))
servSocket.close() servSocket.close()
if servSocket in self.fds: if servSocket in self.fds: