1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Clear MQTT Connection watchdog on error

This commit is contained in:
Nick O'Leary 2014-04-19 22:17:14 +01:00
parent 4ae5f34d2e
commit eee8f89146
2 changed files with 12 additions and 0 deletions

View File

@ -16,6 +16,7 @@
var util = require("util"); var util = require("util");
var mqtt = require("mqtt"); var mqtt = require("mqtt");
var events = require("events"); var events = require("events");
//var inspect = require("sys").inspect;
//var Client = module.exports.Client = function( //var Client = module.exports.Client = function(
@ -55,11 +56,14 @@ MQTTClient.prototype.connect = function(options) {
self.client = mqtt.createConnection(this.port,this.host,function(err,client) { self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
if (err) { if (err) {
self.connected = false; self.connected = false;
clearInterval(self.watchdog);
self.connectionError = true; self.connectionError = true;
//util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
self.emit('connectionlost',err); self.emit('connectionlost',err);
return; return;
} }
client.on('close',function(e) { client.on('close',function(e) {
//util.log('[mqtt] ['+self.uid+'] on close');
clearInterval(self.watchdog); clearInterval(self.watchdog);
if (!self.connectionError) { if (!self.connectionError) {
if (self.connected) { if (self.connected) {
@ -71,6 +75,7 @@ MQTTClient.prototype.connect = function(options) {
} }
}); });
client.on('error',function(e) { client.on('error',function(e) {
//util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
clearInterval(self.watchdog); clearInterval(self.watchdog);
if (self.connected) { if (self.connected) {
self.connected = false; self.connected = false;
@ -81,13 +86,18 @@ MQTTClient.prototype.connect = function(options) {
if (packet.returnCode == 0) { if (packet.returnCode == 0) {
self.watchdog = setInterval(function(self) { self.watchdog = setInterval(function(self) {
var now = (new Date()).getTime(); var now = (new Date()).getTime();
//util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) { if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) {
if (self.pingOutstanding) { if (self.pingOutstanding) {
//util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
try { try {
self.client.disconnect(); self.client.disconnect();
} catch (err) { } catch (err) {
} }
} else { } else {
//util.log('[mqtt] ['+self.uid+'] watchdog pinging');
self.lastOutbound = (new Date()).getTime(); self.lastOutbound = (new Date()).getTime();
self.lastInbound = (new Date()).getTime(); self.lastInbound = (new Date()).getTime();
self.pingOutstanding = true; self.pingOutstanding = true;
@ -161,6 +171,7 @@ MQTTClient.prototype.connect = function(options) {
// outbound qos-2 complete // outbound qos-2 complete
}); });
client.on('pingresp',function(packet) { client.on('pingresp',function(packet) {
//util.log('[mqtt] ['+self.uid+'] received pingresp');
self.lastInbound = (new Date()).getTime() self.lastInbound = (new Date()).getTime()
self.pingOutstanding = false; self.pingOutstanding = false;
}); });

View File

@ -34,6 +34,7 @@ module.exports = {
connections[id] = function() { connections[id] = function() {
var uid = (1+Math.random()*4294967295).toString(16); var uid = (1+Math.random()*4294967295).toString(16);
var client = mqtt.createClient(port,broker); var client = mqtt.createClient(port,broker);
client.uid = uid;
client.setMaxListeners(0); client.setMaxListeners(0);
var options = {keepalive:15}; var options = {keepalive:15};
options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16); options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);