From d840d0b67d508c2ae0a0aa37abf1af9220e671ae Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Tue, 17 Nov 2015 22:19:56 +0000 Subject: [PATCH] Fix mqtt node lifecycle with partial deployments --- nodes/core/io/10-mqtt.js | 86 ++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 47 deletions(-) diff --git a/nodes/core/io/10-mqtt.js b/nodes/core/io/10-mqtt.js index ca94bfcd1..bf56e9095 100644 --- a/nodes/core/io/10-mqtt.js +++ b/nodes/core/io/10-mqtt.js @@ -18,7 +18,6 @@ module.exports = function(RED) { "use strict"; var mqtt = require("mqtt"); var util = require("util"); - var events = require("events"); var isUtf8 = require('is-utf8'); function matchTopic(ts,t) { @@ -46,7 +45,6 @@ module.exports = function(RED) { this.brokerurl = ""; this.connected = false; this.connecting = false; - this.usecount = 0; this.options = {}; this.queue = []; this.subscriptions = {}; @@ -59,8 +57,6 @@ module.exports = function(RED) { retain: n.birthRetain=="true"|| n.birthRetain===true }; } - events.EventEmitter.call(this); - this.setMaxListeners(0); if (this.credentials) { this.username = this.credentials.user; @@ -131,13 +127,18 @@ module.exports = function(RED) { // Define functions called by MQTT in and out nodes var node = this; - this.register = function(){ - node.usecount += 1; + this.users = {}; + + this.register = function(mqttNode){ + node.users[mqttNode.id] = mqttNode; + if (Object.keys(node.users).length === 1) { + node.connect(); + } }; - this.deregister = function(){ - node.usecount -= 1; - if (node.usecount == 0) { + this.deregister = function(mqttNode){ + delete node.users[mqttNode.id]; + if (Object.keys(node.users).length === 0) { node.client.end(); } }; @@ -146,12 +147,17 @@ module.exports = function(RED) { if (!node.connected && !node.connecting) { node.connecting = true; node.client = mqtt.connect(node.brokerurl ,node.options); + node.client.setMaxListeners(0); // Register successful connect or reconnect handler node.client.on('connect', function () { + node.connecting = false; node.connected = true; node.log(RED._("mqtt.state.connected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); - node.emit('connected'); - + for (var id in node.users) { + if (node.users.hasOwnProperty(id)) { + node.users[id].status({fill:"green",shape:"dot",text:"common.status.connected"}); + } + } // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection node.client.removeAllListeners('message'); @@ -185,7 +191,11 @@ module.exports = function(RED) { if (node.connected) { node.connected = false; node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); - node.emit('disconnected'); + for (var id in node.users) { + if (node.users.hasOwnProperty(id)) { + node.users[id].status({fill:"red",shape:"ring",text:"common.status.disconnected"}); + } + } } else if (node.connecting) { node.log(RED._("mqtt.state.connect-failed",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); } @@ -193,7 +203,6 @@ module.exports = function(RED) { // Register connect error handler node.client.on('error', function (error) { - console.log("ERROR",error); if (node.connecting) { node.client.end(); node.connecting = false; @@ -264,19 +273,18 @@ module.exports = function(RED) { } }; - this.on('close', function(closecomplete) { + this.on('close', function(done) { if (this.connected) { - this.on('disconnected', function() { - closecomplete(); + this.client.on('close', function() { + done(); }); this.client.end(); } else { - closecomplete(); + done(); } }); } - util.inherits(MQTTBrokerNode, events.EventEmitter); RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{ credentials: { @@ -290,10 +298,9 @@ module.exports = function(RED) { this.topic = n.topic; this.broker = n.broker; this.brokerConn = RED.nodes.getNode(this.broker); + var node = this; if (this.brokerConn) { this.status({fill:"red",shape:"ring",text:"common.status.disconnected"}); - var node = this; - node.brokerConn.register(); if (this.topic) { this.brokerConn.subscribe(this.topic,2,function(topic,payload,packet) { if (isUtf8(payload)) { payload = payload.toString(); } @@ -303,30 +310,23 @@ module.exports = function(RED) { } node.send(msg); }, this.id); - this.brokerConn.on("disconnected",function() { - node.status({fill:"red",shape:"ring",text:"common.status.disconnected"}); - }); - this.brokerConn.on("connected",function() { - node.status({fill:"green",shape:"dot",text:"common.status.connected"}); - }); if (this.brokerConn.connected) { node.status({fill:"green",shape:"dot",text:"common.status.connected"}); - } else { - this.brokerConn.connect(); } + node.brokerConn.register(this); } else { this.error(RED._("mqtt.errors.not-defined")); } + this.on('close', function() { + if (node.brokerConn) { + node.brokerConn.unsubscribe(node.topic,node.id); + node.brokerConn.deregister(node); + } + }); } else { this.error(RED._("mqtt.errors.missing-config")); } - this.on('close', function() { - if (this.brokerConn) { - this.brokerConn.unsubscribe(this.topic,this.id); - node.brokerConn.deregister(); - } - }); } RED.nodes.registerType("mqtt in",MQTTInNode); @@ -337,11 +337,10 @@ module.exports = function(RED) { this.retain = n.retain; this.broker = n.broker; this.brokerConn = RED.nodes.getNode(this.broker); + var node = this; if (this.brokerConn) { this.status({fill:"red",shape:"ring",text:"common.status.disconnected"}); - var node = this; - node.brokerConn.register(); this.on("input",function(msg) { if (msg.qos) { msg.qos = parseInt(msg.qos); @@ -362,23 +361,16 @@ module.exports = function(RED) { else { node.warn(RED._("mqtt.errors.invalid-topic")); } } }); - this.brokerConn.on("disconnected",function() { - node.status({fill:"red",shape:"ring",text:"common.status.disconnected"}); - }); - this.brokerConn.on("connected",function() { - node.status({fill:"green",shape:"dot",text:"common.status.connected"}); - }); if (this.brokerConn.connected) { node.status({fill:"green",shape:"dot",text:"common.status.connected"}); - } else { - this.brokerConn.connect(); } + node.brokerConn.register(node); + this.on('close', function() { + node.brokerConn.deregister(node); + }); } else { this.error(RED._("mqtt.errors.missing-config")); } - this.on('close', function() { - node.brokerConn.deregister(); - }); } RED.nodes.registerType("mqtt out",MQTTOutNode); };