diff --git a/nodes/core/io/lib/mqtt.js b/nodes/core/io/lib/mqtt.js index c1576d747..95af63560 100644 --- a/nodes/core/io/lib/mqtt.js +++ b/nodes/core/io/lib/mqtt.js @@ -45,141 +45,143 @@ function MQTTClient(port,host) { util.inherits(MQTTClient, events.EventEmitter); MQTTClient.prototype.connect = function(options) { - var self = this; - options = options||{}; - self.options = options; - self.options.keepalive = options.keepalive||15; - self.options.clean = self.options.clean||true; - self.options.protocolId = 'MQIsdp'; - self.options.protocolVersion = 3; - - self.client = mqtt.createConnection(this.port,this.host,function(err,client) { - if (err) { - self.connected = false; - clearInterval(self.watchdog); - self.connectionError = true; - //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err)); - self.emit('connectionlost',err); - return; - } - client.on('close',function(e) { - //util.log('[mqtt] ['+self.uid+'] on close'); - clearInterval(self.watchdog); - if (!self.connectionError) { + if (!this.connected) { + var self = this; + options = options||{}; + self.options = options; + self.options.keepalive = options.keepalive||15; + self.options.clean = self.options.clean||true; + self.options.protocolId = 'MQIsdp'; + self.options.protocolVersion = 3; + + self.client = mqtt.createConnection(this.port,this.host,function(err,client) { + if (err) { + self.connected = false; + clearInterval(self.watchdog); + self.connectionError = true; + //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err)); + self.emit('connectionlost',err); + return; + } + client.on('close',function(e) { + //util.log('[mqtt] ['+self.uid+'] on close'); + clearInterval(self.watchdog); + if (!self.connectionError) { + if (self.connected) { + self.connected = false; + self.emit('connectionlost',e); + } else { + self.emit('disconnect'); + } + } + }); + client.on('error',function(e) { + //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e)); + clearInterval(self.watchdog); if (self.connected) { self.connected = false; self.emit('connectionlost',e); - } else { - self.emit('disconnect'); } - } - }); - client.on('error',function(e) { - //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e)); - clearInterval(self.watchdog); - if (self.connected) { - self.connected = false; - self.emit('connectionlost',e); - } - }); - client.on('connack',function(packet) { - if (packet.returnCode == 0) { - self.watchdog = setInterval(function(self) { - var now = (new Date()).getTime(); - - //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound})); - - if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) { - if (self.pingOutstanding) { - //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect'); - try { - self.client.disconnect(); - } catch (err) { - } - } else { - //util.log('[mqtt] ['+self.uid+'] watchdog pinging'); - self.lastOutbound = (new Date()).getTime(); - self.lastInbound = (new Date()).getTime(); - self.pingOutstanding = true; - self.client.pingreq(); - } - } - - },self.options.keepalive*500,self); - self.pingOutstanding = false; - self.lastInbound = (new Date()).getTime() - self.lastOutbound = (new Date()).getTime() - self.connected = true; - self.connectionError = false; - self.emit('connect'); - } else { - self.connected = false; - self.emit('connectionlost'); - } - }); - client.on('suback',function(packet) { - self.lastInbound = (new Date()).getTime() - var topic = self.pendingSubscriptions[packet.messageId]; - self.emit('subscribe',topic,packet.granted[0]); - delete self.pendingSubscriptions[packet.messageId]; - }); - client.on('unsuback',function(packet) { - self.lastInbound = (new Date()).getTime() - var topic = self.pendingSubscriptions[packet.messageId]; - self.emit('unsubscribe',topic,packet.granted[0]); - delete self.pendingSubscriptions[packet.messageId]; - }); - client.on('publish',function(packet) { - self.lastInbound = (new Date()).getTime() - if (packet.qos < 2) { - var p = packet; - self.emit('message',p.topic,p.payload,p.qos,p.retain); - } else { - self.inboundMessages[packet.messageId] = packet; - this.lastOutbound = (new Date()).getTime() - self.client.pubrec(packet); - } - if (packet.qos == 1) { - this.lastOutbound = (new Date()).getTime() - self.client.puback(packet); - } - }); - - client.on('pubrel',function(packet) { - self.lastInbound = (new Date()).getTime() - var p = self.inboundMessages[packet.messageId]; - if (p) { - self.emit('message',p.topic,p.payload,p.qos,p.retain); - delete self.inboundMessages[packet.messageId]; - } - self.lastOutbound = (new Date()).getTime() - self.client.pubcomp(packet); - }); - - client.on('puback',function(packet) { - self.lastInbound = (new Date()).getTime() - // outbound qos-1 complete - }); - - client.on('pubrec',function(packet) { - self.lastInbound = (new Date()).getTime() - self.lastOutbound = (new Date()).getTime() - self.client.pubrel(packet); - }); - client.on('pubcomp',function(packet) { - self.lastInbound = (new Date()).getTime() - // outbound qos-2 complete - }); - client.on('pingresp',function(packet) { - //util.log('[mqtt] ['+self.uid+'] received pingresp'); - self.lastInbound = (new Date()).getTime() - self.pingOutstanding = false; - }); - - this.lastOutbound = (new Date()).getTime() - this.connectionError = false; - client.connect(self.options); - }); + }); + client.on('connack',function(packet) { + if (packet.returnCode == 0) { + self.watchdog = setInterval(function(self) { + var now = (new Date()).getTime(); + + //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound})); + + if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) { + if (self.pingOutstanding) { + //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect'); + try { + self.client.disconnect(); + } catch (err) { + } + } else { + //util.log('[mqtt] ['+self.uid+'] watchdog pinging'); + self.lastOutbound = (new Date()).getTime(); + self.lastInbound = (new Date()).getTime(); + self.pingOutstanding = true; + self.client.pingreq(); + } + } + + },self.options.keepalive*500,self); + self.pingOutstanding = false; + self.lastInbound = (new Date()).getTime() + self.lastOutbound = (new Date()).getTime() + self.connected = true; + self.connectionError = false; + self.emit('connect'); + } else { + self.connected = false; + self.emit('connectionlost'); + } + }); + client.on('suback',function(packet) { + self.lastInbound = (new Date()).getTime() + var topic = self.pendingSubscriptions[packet.messageId]; + self.emit('subscribe',topic,packet.granted[0]); + delete self.pendingSubscriptions[packet.messageId]; + }); + client.on('unsuback',function(packet) { + self.lastInbound = (new Date()).getTime() + var topic = self.pendingSubscriptions[packet.messageId]; + self.emit('unsubscribe',topic,packet.granted[0]); + delete self.pendingSubscriptions[packet.messageId]; + }); + client.on('publish',function(packet) { + self.lastInbound = (new Date()).getTime() + if (packet.qos < 2) { + var p = packet; + self.emit('message',p.topic,p.payload,p.qos,p.retain); + } else { + self.inboundMessages[packet.messageId] = packet; + this.lastOutbound = (new Date()).getTime() + self.client.pubrec(packet); + } + if (packet.qos == 1) { + this.lastOutbound = (new Date()).getTime() + self.client.puback(packet); + } + }); + + client.on('pubrel',function(packet) { + self.lastInbound = (new Date()).getTime() + var p = self.inboundMessages[packet.messageId]; + if (p) { + self.emit('message',p.topic,p.payload,p.qos,p.retain); + delete self.inboundMessages[packet.messageId]; + } + self.lastOutbound = (new Date()).getTime() + self.client.pubcomp(packet); + }); + + client.on('puback',function(packet) { + self.lastInbound = (new Date()).getTime() + // outbound qos-1 complete + }); + + client.on('pubrec',function(packet) { + self.lastInbound = (new Date()).getTime() + self.lastOutbound = (new Date()).getTime() + self.client.pubrel(packet); + }); + client.on('pubcomp',function(packet) { + self.lastInbound = (new Date()).getTime() + // outbound qos-2 complete + }); + client.on('pingresp',function(packet) { + //util.log('[mqtt] ['+self.uid+'] received pingresp'); + self.lastInbound = (new Date()).getTime() + self.pingOutstanding = false; + }); + + this.lastOutbound = (new Date()).getTime() + this.connectionError = false; + client.connect(self.options); + }); + } } MQTTClient.prototype.subscribe = function(topic,qos) { diff --git a/nodes/core/io/lib/mqttConnectionPool.js b/nodes/core/io/lib/mqttConnectionPool.js index ca005c73a..b43498eed 100644 --- a/nodes/core/io/lib/mqttConnectionPool.js +++ b/nodes/core/io/lib/mqttConnectionPool.js @@ -91,7 +91,6 @@ module.exports = { client.on('connect',function() { if (client) { util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port); - connecting = false; for (var s in subscriptions) { var topic = subscriptions[s].topic; @@ -109,13 +108,13 @@ module.exports = { }); client.on('connectionlost', function(err) { util.log('[mqtt] ['+uid+'] connection lost to broker tcp://'+broker+':'+port); + connecting = false; setTimeout(function() { - if (client) { - client.connect(options); - } + obj.connect(); }, settings.mqttReconnectTime||5000); }); client.on('disconnect', function() { + connecting = false; util.log('[mqtt] ['+uid+'] disconnected from broker tcp://'+broker+':'+port); });