diff --git a/nodes/core/io/10-mqtt.js b/nodes/core/io/10-mqtt.js index 9085505c7..6652ecd57 100644 --- a/nodes/core/io/10-mqtt.js +++ b/nodes/core/io/10-mqtt.js @@ -53,14 +53,18 @@ module.exports = function(RED) { msg._topic = topic; } node.send(msg); - }); + }, this.id); this.client.on("connectionlost",function() { node.status({fill:"red",shape:"ring",text:"disconnected"}); }); this.client.on("connect",function() { node.status({fill:"green",shape:"dot",text:"connected"}); }); - this.client.connect(); + if (this.client.isConnected()) { + node.status({fill:"green",shape:"dot",text:"connected"}); + } else { + this.client.connect(); + } } else { this.error("topic not defined"); @@ -70,6 +74,7 @@ module.exports = function(RED) { } this.on('close', function() { if (this.client) { + this.client.unsubscribe(this.topic,this.id); this.client.disconnect(); } }); diff --git a/nodes/core/io/lib/mqtt.js b/nodes/core/io/lib/mqtt.js index 5f35e85de..fe4b6cba7 100644 --- a/nodes/core/io/lib/mqtt.js +++ b/nodes/core/io/lib/mqtt.js @@ -127,7 +127,7 @@ MQTTClient.prototype.connect = function(options) { client.on('unsuback',function(packet) { self.lastInbound = (new Date()).getTime() var topic = self.pendingSubscriptions[packet.messageId]; - self.emit('unsubscribe',topic,packet.granted[0]); + self.emit('unsubscribe',topic); delete self.pendingSubscriptions[packet.messageId]; }); client.on('publish',function(packet) { @@ -201,7 +201,7 @@ MQTTClient.prototype.unsubscribe = function(topic) { var self = this; if (self.connected) { var options = { - topic:topic, + unsubscriptions:[topic], messageId: self._nextMessageId() }; this.pendingSubscriptions[options.messageId] = topic; diff --git a/nodes/core/io/lib/mqttConnectionPool.js b/nodes/core/io/lib/mqttConnectionPool.js index d15f0fc7c..0a2410ad1 100644 --- a/nodes/core/io/lib/mqttConnectionPool.js +++ b/nodes/core/io/lib/mqttConnectionPool.js @@ -42,7 +42,7 @@ module.exports = { options.password = password; options.will = will; var queue = []; - var subscriptions = []; + var subscriptions = {}; var connecting = false; var obj = { _instances: 0, @@ -57,17 +57,40 @@ module.exports = { queue.push(msg); } }, - subscribe: function(topic,qos,callback) { - subscriptions.push({topic:topic,qos:qos,callback:callback}); - client.on('message',function(mtopic,mpayload,mqos,mretain) { + subscribe: function(topic,qos,callback,ref) { + ref = ref||0; + subscriptions[topic] = subscriptions[topic]||{}; + + var sub = { + topic:topic, + qos:qos, + handler:function(mtopic,mpayload,mqos,mretain) { if (matchTopic(topic,mtopic)) { callback(mtopic,mpayload,mqos,mretain); } - }); + }, + ref: ref + }; + subscriptions[topic][ref] = sub; + client.on('message',sub.handler); if (client.isConnected()) { client.subscribe(topic,qos); } }, + unsubscribe: function(topic,ref) { + ref = ref||0; + var sub = subscriptions[topic]; + if (sub) { + if (sub[ref]) { + client.removeListener('message',sub[ref].handler); + delete sub[ref]; + } + if (Object.keys(sub).length == 0) { + delete subscriptions[topic]; + client.unsubscribe(topic); + } + } + }, on: function(a,b){ client.on(a,b); }, @@ -80,13 +103,17 @@ module.exports = { client.connect(options); } }, - disconnect: function() { + disconnect: function(ref) { + this._instances -= 1; if (this._instances == 0) { client.disconnect(); client = null; delete connections[id]; } + }, + isConnected: function() { + return client.isConnected(); } }; client.on('connect',function() { @@ -94,9 +121,11 @@ module.exports = { util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port); connecting = false; for (var s in subscriptions) { - var topic = subscriptions[s].topic; - var qos = subscriptions[s].qos; - var callback = subscriptions[s].callback; + var topic = s; + var qos = 0; + for (var r in subscriptions[s]) { + qos = Math.max(qos,subscriptions[s][r].qos); + } client.subscribe(topic,qos); } //console.log("connected - publishing",queue.length,"messages");