1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Ensure MQTT nodes unsubscribe before disconnect

Fixes #609

Needed for partial deployment - the nodes assumed the
connection would always be closed when a deploy occurs.
This commit is contained in:
Nick O'Leary 2015-04-09 20:10:34 +01:00
parent 0e926c566b
commit f48ee01a03
3 changed files with 47 additions and 13 deletions

View File

@ -53,15 +53,19 @@ module.exports = function(RED) {
msg._topic = topic; msg._topic = topic;
} }
node.send(msg); node.send(msg);
}); }, this.id);
this.client.on("connectionlost",function() { this.client.on("connectionlost",function() {
node.status({fill:"red",shape:"ring",text:"disconnected"}); node.status({fill:"red",shape:"ring",text:"disconnected"});
}); });
this.client.on("connect",function() { this.client.on("connect",function() {
node.status({fill:"green",shape:"dot",text:"connected"}); node.status({fill:"green",shape:"dot",text:"connected"});
}); });
if (this.client.isConnected()) {
node.status({fill:"green",shape:"dot",text:"connected"});
} else {
this.client.connect(); this.client.connect();
} }
}
else { else {
this.error("topic not defined"); this.error("topic not defined");
} }
@ -70,6 +74,7 @@ module.exports = function(RED) {
} }
this.on('close', function() { this.on('close', function() {
if (this.client) { if (this.client) {
this.client.unsubscribe(this.topic,this.id);
this.client.disconnect(); this.client.disconnect();
} }
}); });

View File

@ -127,7 +127,7 @@ MQTTClient.prototype.connect = function(options) {
client.on('unsuback',function(packet) { client.on('unsuback',function(packet) {
self.lastInbound = (new Date()).getTime() self.lastInbound = (new Date()).getTime()
var topic = self.pendingSubscriptions[packet.messageId]; var topic = self.pendingSubscriptions[packet.messageId];
self.emit('unsubscribe',topic,packet.granted[0]); self.emit('unsubscribe',topic);
delete self.pendingSubscriptions[packet.messageId]; delete self.pendingSubscriptions[packet.messageId];
}); });
client.on('publish',function(packet) { client.on('publish',function(packet) {
@ -201,7 +201,7 @@ MQTTClient.prototype.unsubscribe = function(topic) {
var self = this; var self = this;
if (self.connected) { if (self.connected) {
var options = { var options = {
topic:topic, unsubscriptions:[topic],
messageId: self._nextMessageId() messageId: self._nextMessageId()
}; };
this.pendingSubscriptions[options.messageId] = topic; this.pendingSubscriptions[options.messageId] = topic;

View File

@ -42,7 +42,7 @@ module.exports = {
options.password = password; options.password = password;
options.will = will; options.will = will;
var queue = []; var queue = [];
var subscriptions = []; var subscriptions = {};
var connecting = false; var connecting = false;
var obj = { var obj = {
_instances: 0, _instances: 0,
@ -57,17 +57,40 @@ module.exports = {
queue.push(msg); queue.push(msg);
} }
}, },
subscribe: function(topic,qos,callback) { subscribe: function(topic,qos,callback,ref) {
subscriptions.push({topic:topic,qos:qos,callback:callback}); ref = ref||0;
client.on('message',function(mtopic,mpayload,mqos,mretain) { subscriptions[topic] = subscriptions[topic]||{};
var sub = {
topic:topic,
qos:qos,
handler:function(mtopic,mpayload,mqos,mretain) {
if (matchTopic(topic,mtopic)) { if (matchTopic(topic,mtopic)) {
callback(mtopic,mpayload,mqos,mretain); callback(mtopic,mpayload,mqos,mretain);
} }
}); },
ref: ref
};
subscriptions[topic][ref] = sub;
client.on('message',sub.handler);
if (client.isConnected()) { if (client.isConnected()) {
client.subscribe(topic,qos); client.subscribe(topic,qos);
} }
}, },
unsubscribe: function(topic,ref) {
ref = ref||0;
var sub = subscriptions[topic];
if (sub) {
if (sub[ref]) {
client.removeListener('message',sub[ref].handler);
delete sub[ref];
}
if (Object.keys(sub).length == 0) {
delete subscriptions[topic];
client.unsubscribe(topic);
}
}
},
on: function(a,b){ on: function(a,b){
client.on(a,b); client.on(a,b);
}, },
@ -80,13 +103,17 @@ module.exports = {
client.connect(options); client.connect(options);
} }
}, },
disconnect: function() { disconnect: function(ref) {
this._instances -= 1; this._instances -= 1;
if (this._instances == 0) { if (this._instances == 0) {
client.disconnect(); client.disconnect();
client = null; client = null;
delete connections[id]; delete connections[id];
} }
},
isConnected: function() {
return client.isConnected();
} }
}; };
client.on('connect',function() { client.on('connect',function() {
@ -94,9 +121,11 @@ module.exports = {
util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port); util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port);
connecting = false; connecting = false;
for (var s in subscriptions) { for (var s in subscriptions) {
var topic = subscriptions[s].topic; var topic = s;
var qos = subscriptions[s].qos; var qos = 0;
var callback = subscriptions[s].callback; for (var r in subscriptions[s]) {
qos = Math.max(qos,subscriptions[s][r].qos);
}
client.subscribe(topic,qos); client.subscribe(topic,qos);
} }
//console.log("connected - publishing",queue.length,"messages"); //console.log("connected - publishing",queue.length,"messages");