1
0
mirror of https://github.com/FrodoVDR/UdpPipe.git synced 2023-10-10 13:36:54 +02:00

Enhanced statistics, enhanced log messages, made head shut down faster

This commit is contained in:
Mark Liebrand 2017-02-11 18:40:11 +01:00
parent 0766ad9df1
commit 4ebf5e39b1

View File

@ -248,7 +248,7 @@ class PipeBase:
class Head(PipeBase): class Head(PipeBase):
SECTION="head" SECTION="head"
BYTES_IN, BYTES_OUT, PACKETS_IN, PACKETS_OUT, CONNECTS = range(0,5) BYTES_IN, BYTES_OUT, PACKETS_IN, PACKETS_OUT, CONNECTS, SORTKEY = range(0,6)
STATUS=['wait', 'idle', 'busy'] STATUS=['wait', 'idle', 'busy']
@ -263,6 +263,8 @@ class Head(PipeBase):
self.statisticEvent = threading.Event() self.statisticEvent = threading.Event()
self.statisticThread = None self.statisticThread = None
self.status=0 self.status=0
self.byHour={}
self.byMinute={}
def terminate(self, sigNo, stackFrame): def terminate(self, sigNo, stackFrame):
@ -271,6 +273,8 @@ class Head(PipeBase):
if sigNo==signal.SIGTERM: if sigNo==signal.SIGTERM:
self.log.info("[Head] Terminating upon Signal Term") self.log.info("[Head] Terminating upon Signal Term")
self._terminate=True self._terminate=True
self._terminateThread=True
os.write(self.controlPipe[1], 'x')
self.pipeSocket.close() self.pipeSocket.close()
self.statisticEvent.set() self.statisticEvent.set()
if self.adminSocket is not None: if self.adminSocket is not None:
@ -291,7 +295,13 @@ class Head(PipeBase):
def run(self): def run(self):
# Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors # Important: logging cannot be setup any earlier as the RotatingFileHandler produces errors
# in combination with the daemon function # in combination with the daemon function
signal.signal(signal.SIGTERM, self.terminate)
signal.signal(signal.SIGINT, self.terminate)
signal.signal(signal.SIGUSR1, self.toggleLogLevel)
signal.signal(signal.SIGUSR2, self.logStats)
signal.signal(signal.SIGQUIT, self.dumpstacks)
self.setupLogger(self.cfg) self.setupLogger(self.cfg)
self.log.info("[Head] Starting Listener")
socketArray=[] socketArray=[]
# Step #1: Setup all the sockets # Step #1: Setup all the sockets
@ -352,10 +362,11 @@ class Head(PipeBase):
except Exception as e: except Exception as e:
self.log.exception(e) self.log.exception(e)
raise raise
self.log.info("[Head] Terminating Listener")
def handleMessages(self, socketArray, clientSocket): def handleMessages(self, socketArray, clientSocket):
self.log.info("[Head] Starting Client Handler")
self.isThreadRunning=True self.isThreadRunning=True
# Step #2: listen on all the sockets # Step #2: listen on all the sockets
lastReport=datetime.datetime.now() lastReport=datetime.datetime.now()
@ -430,7 +441,7 @@ class Head(PipeBase):
self.enableAdmin=(self.adminPort!=0) self.enableAdmin=(self.adminPort!=0)
if self.enableAdmin: if self.enableAdmin:
if self.adminSocket is not None: if self.adminSocket is not None:
self.log.info("[Head] Disabling current admin") self.log.info("[Head] Disabling current admin (Going to restart admin)")
self._terminateAdmin=True self._terminateAdmin=True
self.adminSocket.close() self.adminSocket.close()
self.adminSocket=None self.adminSocket=None
@ -441,9 +452,11 @@ class Head(PipeBase):
self.adminThread.daemon=True self.adminThread.daemon=True
self.adminThread.start() self.adminThread.start()
if self.statisticThread is None: if self.statisticThread is None:
self.statisticThread=threading.Thread(target=self.statisticThread, name="Statistic-Thread") self.statisticThread=threading.Thread(target=self.statisticTracker, name="Statistic-Thread")
self.statisticThread.daemon=True self.statisticThread.daemon=True
self.statisticThread.start() self.statisticThread.start()
else:
self.log.info("[Head] Statistic Tracker is already running")
continue continue
for lstCfg in self.listenerConfig: for lstCfg in self.listenerConfig:
@ -532,9 +545,11 @@ class Head(PipeBase):
socketArray.remove(clientSocket) socketArray.remove(clientSocket)
self.isThreadRunning=False self.isThreadRunning=False
self.status=0 self.status=0
self.log.info("[Head] Terminating Client Handler")
def adminServer(self): def adminServer(self):
self.log.info("[Head] Starting Admin Server")
pubKeyObj=RSA.importKey(self.publicKey) pubKeyObj=RSA.importKey(self.publicKey)
adminSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) adminSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
adminSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) adminSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -589,30 +604,38 @@ class Head(PipeBase):
retData["noOfPorts"]=len(self.listenerConfig) retData["noOfPorts"]=len(self.listenerConfig)
retData["connStatus"]=Head.STATUS[self.status] retData["connStatus"]=Head.STATUS[self.status]
if dct['op']=="statisticHour": if dct['op']=="statisticHour":
for mn in self.byMinute: stat=[]
for mn in self.byMinute.keys():
tmp={} tmp={}
tmp['bytesIn']=mn[Head.BYTES_IN] tmp['bytesIn']=self.byMinute[mn][Head.BYTES_IN]
tmp['bytesOut']=mn[Head.BYTES_OUT] tmp['bytesOut']=self.byMinute[mn][Head.BYTES_OUT]
tmp['packetsIn']=mn[Head.PACKETS_IN] tmp['packetsIn']=self.byMinute[mn][Head.PACKETS_IN]
tmp['packetsOut']=mn[Head.PACKETS_OUT] tmp['packetsOut']=self.byMinute[mn][Head.PACKETS_OUT]
tmp['reconnects']=mn[Head.CONNECTS] tmp['reconnects']=self.byMinute[mn][Head.CONNECTS]
tmp['timestamp']=self.byMinute[mn][Head.SORTKEY]
retData[str(mn)]=tmp retData[str(mn)]=tmp
stat.append(str(mn))
retData['keys']=stat
if dct['op']=="statisticDay": if dct['op']=="statisticDay":
for hr in self.byHour: stat=[]
for hr in self.byHour.keys():
tmp={} tmp={}
tmp['bytesIn']=hr[Head.BYTES_IN] tmp['bytesIn']=self.byHour[hr][Head.BYTES_IN]
tmp['bytesOut']=hr[Head.BYTES_OUT] tmp['bytesOut']=self.byHour[hr][Head.BYTES_OUT]
tmp['packetsIn']=hr[Head.PACKETS_IN] tmp['packetsIn']=self.byHour[hr][Head.PACKETS_IN]
tmp['packetsOut']=hr[Head.PACKETS_OUT] tmp['packetsOut']=self.byHour[hr][Head.PACKETS_OUT]
tmp['reconnects']=hr[Head.CONNECTS] tmp['reconnects']=self.byHour[hr][Head.CONNECTS]
tmp['timestamp']=self.byMinute[hr][Head.SORTKEY]
retData[str(hr)]=tmp retData[str(hr)]=tmp
pass stat.append(str(hr))
retData['keys']=stat
retData['status']='ok' retData['status']='ok'
else: else:
retData['status']='fail' retData['status']='fail'
sockWt=eu.liebrand.udppipe.Utility.SockWrite() sockWt=eu.liebrand.udppipe.Utility.SockWrite()
buf=cStringIO.StringIO() buf=cStringIO.StringIO()
retStrg=json.dumps(retData, cls=DateTimeEncoder) retStrg=json.dumps(retData, cls=DateTimeEncoder)
print retStrg
sockWt.writeString('result', retStrg, buf) sockWt.writeString('result', retStrg, buf)
dta=buf.getvalue() dta=buf.getvalue()
ctlBuffer=cStringIO.StringIO() ctlBuffer=cStringIO.StringIO()
@ -627,16 +650,17 @@ class Head(PipeBase):
self.log.info("[Head] Send %d bytes to >Admin<" % (bytesSnd)) self.log.info("[Head] Send %d bytes to >Admin<" % (bytesSnd))
clientSocket.close() clientSocket.close()
except socket.error as e: except socket.error as e:
if self._terminate or self._terminateAdmin:
pass
if e.errno == errno.EINTR: if e.errno == errno.EINTR:
pass pass
else: else:
self.log.exception(e) self.log.exception(e)
self.log.info("[Head] Terminating Admin Server")
def statisticTracker(self): def statisticTracker(self):
self.byHour={} self.log.info("[Head] Starting Statistic Tracker")
self.byMinute={}
now=datetime.datetime.now() now=datetime.datetime.now()
waitTime=59-now.second waitTime=59-now.second
if waitTime>0: if waitTime>0:
@ -648,16 +672,16 @@ class Head(PipeBase):
now=datetime.datetime.now() now=datetime.datetime.now()
minute=now.minute minute=now.minute
self.byMinute[now.minute]=[self.UDPBytesIn-baseline[0], self.UDPBytesOut-baseline[1], self.byMinute[now.minute]=[self.UDPBytesIn-baseline[0], self.UDPBytesOut-baseline[1],
self.packetsIn-baseline[2], self.packetsOut-baseline[3], self.reconnects-baseline[4]] self.packetsIn-baseline[2], self.packetsOut-baseline[3], self.reconnects-baseline[4], int(now.strftime('%s'))]
baseline=[self.UDPBytesIn, self.UDPBytesOut, self.packetsIn, self.packetsOut, self.reconnects] baseline=[self.UDPBytesIn, self.UDPBytesOut, self.packetsIn, self.packetsOut, self.reconnects]
hour=now.hour hour=now.hour
if hour!=lastHour: if hour!=lastHour:
lastHour=hour lastHour=hour
self.byHour[hour]=[self.byMinute[minute][0], self.byMinute[minute][1], self.byMinute[minute][2], self.byMinute[minute][3], self.byMinute[minute][4] ] self.byHour[hour]=[self.byMinute[minute][0], self.byMinute[minute][1], self.byMinute[minute][2], self.byMinute[minute][3], self.byMinute[minute][4], int(now.strftime('%s')) ]
else: else:
self.byHour[hour]=[self.byHour[hour][0]+self.byMinute[minute][0], self.byHour[hour][1]+self.byMinute[minute][1], self.byHour[hour]=[self.byHour[hour][0]+self.byMinute[minute][0], self.byHour[hour][1]+self.byMinute[minute][1],
self.byHour[hour][2]+self.byMinute[minute][2], self.byHour[hour][3]+self.byMinute[minute][3], self.byHour[hour][4]+self.byMinute[minute][4] ] self.byHour[hour][2]+self.byMinute[minute][2], self.byHour[hour][3]+self.byMinute[minute][3], self.byHour[hour][4]+self.byMinute[minute][4], self.byHour[hour][5] ]
self.log.info("[Head] Terminating Statistic Tracker")
class Tail(PipeBase): class Tail(PipeBase):
@ -726,13 +750,14 @@ class Tail(PipeBase):
servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# wait for 2 seconds before trying to connect (avoid missing each other, when cron # wait for 2 seconds before trying to connect (avoid missing each other, when cron
# starts head & tail at the same time # starts head & tail at the same time
time.sleep(2) retry=6
lastReport=datetime.datetime.now() lastReport=datetime.datetime.now()
while not(self._terminate): while not(self._terminate):
try: try:
self.log.info("[Tail] Trying to connect to >Head< at %s:%d" % (self.headHost,self.headPort)) self.log.info("[Tail] Trying to connect to >Head< at %s:%d" % (self.headHost,self.headPort))
servSocket.connect((self.headHost,self.headPort)) servSocket.connect((self.headHost,self.headPort))
self.connected=True self.connected=True
retry=6
self.log.info("[Tail] Connected to >Head< at %s" % (str(servSocket.getpeername()))) self.log.info("[Tail] Connected to >Head< at %s" % (str(servSocket.getpeername())))
self.fds.append(servSocket) self.fds.append(servSocket)
@ -875,18 +900,23 @@ class Tail(PipeBase):
self.log.debug("[Tail] Forwarded response packet of %d bytes for listening port %d from client %s:%d to >Head<" % \ self.log.debug("[Tail] Forwarded response packet of %d bytes for listening port %d from client %s:%d to >Head<" % \
(len(data[PipeBase.FIELD_UDPDATA]), data[PipeBase.FIELD_SRVPORT], data[PipeBase.FIELD_HOST], data[PipeBase.FIELD_PORT] ) ) (len(data[PipeBase.FIELD_UDPDATA]), data[PipeBase.FIELD_SRVPORT], data[PipeBase.FIELD_HOST], data[PipeBase.FIELD_PORT] ) )
except socket.error as e: except socket.error as e:
self.connected=False
if e.errno == errno.EINTR and self._terminate: if e.errno == errno.EINTR and self._terminate:
pass pass
elif e.errno==errno.ECONNREFUSED or e.errno==errno.EBADF or e.errno==errno.ECONNRESET or e.errno==errno.ENETUNREACH: elif e.errno==errno.ECONNREFUSED or e.errno==errno.EBADF or e.errno==errno.ECONNRESET or e.errno==errno.ENETUNREACH:
self.log.warn("[Tail] Unable to connect to host %s:%d. Will try again in %d seconds.(Reason %s)" % (self.headHost,self.headPort, Tail.WAIT4RETRY, str(e))) retry-=1
if retry>0:
waitTime=Tail.WAIT4RETRY / 10
else:
waitTime=Tail.WAIT4RETRY
self.log.warn("[Tail] Unable to connect to host %s:%d. Will try again in %d seconds. Reason %s" % (self.headHost,self.headPort, waitTime, str(e)))
servSocket.close() servSocket.close()
if servSocket in self.fds: if servSocket in self.fds:
self.fds.remove(servSocket) self.fds.remove(servSocket)
self.connected=False self.connected=False
time.sleep(Tail.WAIT4RETRY) time.sleep(waitTime)
servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) servSocket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else: else:
self.connected=False
raise raise
self.logStats(0, None) self.logStats(0, None)
self.log.info("[Tail] Terminating") self.log.info("[Tail] Terminating")
@ -1016,14 +1046,20 @@ if __name__ == '__main__':
help="run as tail end of the pipe") help="run as tail end of the pipe")
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
if(len(args)==0): if(len(args)==0):
print "specify start|stop|restart" print "specify start|stop|restart|nodaemon"
sys.exit() sys.exit()
if options.mode=="head": if options.mode=="head":
head=Head() head=Head()
daemon_runner = runner.DaemonRunner(head) if(args[0]=='nodaemon'):
daemon_runner.do_action() head.run()
else:
daemon_runner = runner.DaemonRunner(head)
daemon_runner.do_action()
if options.mode=="tail": if options.mode=="tail":
tail=Tail() tail=Tail()
daemon_runner = runner.DaemonRunner(tail) if(args[0]=='nodaemon'):
daemon_runner.do_action() tail.run()
else:
daemon_runner = runner.DaemonRunner(tail)
daemon_runner.do_action()