From eee8f891469aa56ef6d397a421f707e0bceae60b Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Sat, 19 Apr 2014 22:17:14 +0100 Subject: [PATCH] Clear MQTT Connection watchdog on error --- nodes/core/io/lib/mqtt.js | 11 +++++++++++ nodes/core/io/lib/mqttConnectionPool.js | 1 + 2 files changed, 12 insertions(+) diff --git a/nodes/core/io/lib/mqtt.js b/nodes/core/io/lib/mqtt.js index 50a8f3b4c..c1576d747 100644 --- a/nodes/core/io/lib/mqtt.js +++ b/nodes/core/io/lib/mqtt.js @@ -16,6 +16,7 @@ var util = require("util"); var mqtt = require("mqtt"); var events = require("events"); +//var inspect = require("sys").inspect; //var Client = module.exports.Client = function( @@ -55,11 +56,14 @@ MQTTClient.prototype.connect = function(options) { self.client = mqtt.createConnection(this.port,this.host,function(err,client) { if (err) { self.connected = false; + clearInterval(self.watchdog); self.connectionError = true; + //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err)); self.emit('connectionlost',err); return; } client.on('close',function(e) { + //util.log('[mqtt] ['+self.uid+'] on close'); clearInterval(self.watchdog); if (!self.connectionError) { if (self.connected) { @@ -71,6 +75,7 @@ MQTTClient.prototype.connect = function(options) { } }); client.on('error',function(e) { + //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e)); clearInterval(self.watchdog); if (self.connected) { self.connected = false; @@ -81,13 +86,18 @@ MQTTClient.prototype.connect = function(options) { if (packet.returnCode == 0) { self.watchdog = setInterval(function(self) { var now = (new Date()).getTime(); + + //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound})); + if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) { if (self.pingOutstanding) { + //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect'); try { self.client.disconnect(); } catch (err) { } } else { + //util.log('[mqtt] ['+self.uid+'] watchdog pinging'); self.lastOutbound = (new Date()).getTime(); self.lastInbound = (new Date()).getTime(); self.pingOutstanding = true; @@ -161,6 +171,7 @@ MQTTClient.prototype.connect = function(options) { // outbound qos-2 complete }); client.on('pingresp',function(packet) { + //util.log('[mqtt] ['+self.uid+'] received pingresp'); self.lastInbound = (new Date()).getTime() self.pingOutstanding = false; }); diff --git a/nodes/core/io/lib/mqttConnectionPool.js b/nodes/core/io/lib/mqttConnectionPool.js index e04b41041..ca005c73a 100644 --- a/nodes/core/io/lib/mqttConnectionPool.js +++ b/nodes/core/io/lib/mqttConnectionPool.js @@ -34,6 +34,7 @@ module.exports = { connections[id] = function() { var uid = (1+Math.random()*4294967295).toString(16); var client = mqtt.createClient(port,broker); + client.uid = uid; client.setMaxListeners(0); var options = {keepalive:15}; options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);