diff --git a/nodes/core/io/10-mqtt.js b/nodes/core/io/10-mqtt.js index bf56e9095..bf24cba90 100644 --- a/nodes/core/io/10-mqtt.js +++ b/nodes/core/io/10-mqtt.js @@ -45,6 +45,7 @@ module.exports = function(RED) { this.brokerurl = ""; this.connected = false; this.connecting = false; + this.closing = false; this.options = {}; this.queue = []; this.subscriptions = {}; @@ -136,11 +137,17 @@ module.exports = function(RED) { } }; - this.deregister = function(mqttNode){ + this.deregister = function(mqttNode,done){ delete node.users[mqttNode.id]; - if (Object.keys(node.users).length === 0) { - node.client.end(); + if (node.closing) { + return done(); } + if (Object.keys(node.users).length === 0) { + if (node.client) { + return node.client.end(done); + } + } + done(); }; this.connect = function () { @@ -274,8 +281,9 @@ module.exports = function(RED) { }; this.on('close', function(done) { + this.closing = true; if (this.connected) { - this.client.on('close', function() { + this.client.once('close', function() { done(); }); this.client.end(); @@ -302,6 +310,7 @@ module.exports = function(RED) { if (this.brokerConn) { this.status({fill:"red",shape:"ring",text:"common.status.disconnected"}); if (this.topic) { + node.brokerConn.register(this); this.brokerConn.subscribe(this.topic,2,function(topic,payload,packet) { if (isUtf8(payload)) { payload = payload.toString(); } var msg = {topic:topic,payload:payload, qos: packet.qos, retain: packet.retain}; @@ -313,15 +322,14 @@ module.exports = function(RED) { if (this.brokerConn.connected) { node.status({fill:"green",shape:"dot",text:"common.status.connected"}); } - node.brokerConn.register(this); } else { this.error(RED._("mqtt.errors.not-defined")); } - this.on('close', function() { + this.on('close', function(done) { if (node.brokerConn) { node.brokerConn.unsubscribe(node.topic,node.id); - node.brokerConn.deregister(node); + node.brokerConn.deregister(node,done); } }); } else { @@ -365,8 +373,8 @@ module.exports = function(RED) { node.status({fill:"green",shape:"dot",text:"common.status.connected"}); } node.brokerConn.register(node); - this.on('close', function() { - node.brokerConn.deregister(node); + this.on('close', function(done) { + node.brokerConn.deregister(node,done); }); } else { this.error(RED._("mqtt.errors.missing-config"));