1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Fix mqtt node lifecycle with partial deployments

This commit is contained in:
Nick O'Leary 2015-11-17 22:19:56 +00:00
parent 43dad4c465
commit d840d0b67d

View File

@ -18,7 +18,6 @@ module.exports = function(RED) {
"use strict"; "use strict";
var mqtt = require("mqtt"); var mqtt = require("mqtt");
var util = require("util"); var util = require("util");
var events = require("events");
var isUtf8 = require('is-utf8'); var isUtf8 = require('is-utf8');
function matchTopic(ts,t) { function matchTopic(ts,t) {
@ -46,7 +45,6 @@ module.exports = function(RED) {
this.brokerurl = ""; this.brokerurl = "";
this.connected = false; this.connected = false;
this.connecting = false; this.connecting = false;
this.usecount = 0;
this.options = {}; this.options = {};
this.queue = []; this.queue = [];
this.subscriptions = {}; this.subscriptions = {};
@ -59,8 +57,6 @@ module.exports = function(RED) {
retain: n.birthRetain=="true"|| n.birthRetain===true retain: n.birthRetain=="true"|| n.birthRetain===true
}; };
} }
events.EventEmitter.call(this);
this.setMaxListeners(0);
if (this.credentials) { if (this.credentials) {
this.username = this.credentials.user; this.username = this.credentials.user;
@ -131,13 +127,18 @@ module.exports = function(RED) {
// Define functions called by MQTT in and out nodes // Define functions called by MQTT in and out nodes
var node = this; var node = this;
this.register = function(){ this.users = {};
node.usecount += 1;
this.register = function(mqttNode){
node.users[mqttNode.id] = mqttNode;
if (Object.keys(node.users).length === 1) {
node.connect();
}
}; };
this.deregister = function(){ this.deregister = function(mqttNode){
node.usecount -= 1; delete node.users[mqttNode.id];
if (node.usecount == 0) { if (Object.keys(node.users).length === 0) {
node.client.end(); node.client.end();
} }
}; };
@ -146,12 +147,17 @@ module.exports = function(RED) {
if (!node.connected && !node.connecting) { if (!node.connected && !node.connecting) {
node.connecting = true; node.connecting = true;
node.client = mqtt.connect(node.brokerurl ,node.options); node.client = mqtt.connect(node.brokerurl ,node.options);
node.client.setMaxListeners(0);
// Register successful connect or reconnect handler // Register successful connect or reconnect handler
node.client.on('connect', function () { node.client.on('connect', function () {
node.connecting = false;
node.connected = true; node.connected = true;
node.log(RED._("mqtt.state.connected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); node.log(RED._("mqtt.state.connected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
node.emit('connected'); for (var id in node.users) {
if (node.users.hasOwnProperty(id)) {
node.users[id].status({fill:"green",shape:"dot",text:"common.status.connected"});
}
}
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
node.client.removeAllListeners('message'); node.client.removeAllListeners('message');
@ -185,7 +191,11 @@ module.exports = function(RED) {
if (node.connected) { if (node.connected) {
node.connected = false; node.connected = false;
node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
node.emit('disconnected'); for (var id in node.users) {
if (node.users.hasOwnProperty(id)) {
node.users[id].status({fill:"red",shape:"ring",text:"common.status.disconnected"});
}
}
} else if (node.connecting) { } else if (node.connecting) {
node.log(RED._("mqtt.state.connect-failed",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); node.log(RED._("mqtt.state.connect-failed",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
} }
@ -193,7 +203,6 @@ module.exports = function(RED) {
// Register connect error handler // Register connect error handler
node.client.on('error', function (error) { node.client.on('error', function (error) {
console.log("ERROR",error);
if (node.connecting) { if (node.connecting) {
node.client.end(); node.client.end();
node.connecting = false; node.connecting = false;
@ -264,19 +273,18 @@ module.exports = function(RED) {
} }
}; };
this.on('close', function(closecomplete) { this.on('close', function(done) {
if (this.connected) { if (this.connected) {
this.on('disconnected', function() { this.client.on('close', function() {
closecomplete(); done();
}); });
this.client.end(); this.client.end();
} else { } else {
closecomplete(); done();
} }
}); });
} }
util.inherits(MQTTBrokerNode, events.EventEmitter);
RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{ RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{
credentials: { credentials: {
@ -290,10 +298,9 @@ module.exports = function(RED) {
this.topic = n.topic; this.topic = n.topic;
this.broker = n.broker; this.broker = n.broker;
this.brokerConn = RED.nodes.getNode(this.broker); this.brokerConn = RED.nodes.getNode(this.broker);
var node = this;
if (this.brokerConn) { if (this.brokerConn) {
this.status({fill:"red",shape:"ring",text:"common.status.disconnected"}); this.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
var node = this;
node.brokerConn.register();
if (this.topic) { if (this.topic) {
this.brokerConn.subscribe(this.topic,2,function(topic,payload,packet) { this.brokerConn.subscribe(this.topic,2,function(topic,payload,packet) {
if (isUtf8(payload)) { payload = payload.toString(); } if (isUtf8(payload)) { payload = payload.toString(); }
@ -303,30 +310,23 @@ module.exports = function(RED) {
} }
node.send(msg); node.send(msg);
}, this.id); }, this.id);
this.brokerConn.on("disconnected",function() {
node.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
});
this.brokerConn.on("connected",function() {
node.status({fill:"green",shape:"dot",text:"common.status.connected"});
});
if (this.brokerConn.connected) { if (this.brokerConn.connected) {
node.status({fill:"green",shape:"dot",text:"common.status.connected"}); node.status({fill:"green",shape:"dot",text:"common.status.connected"});
} else {
this.brokerConn.connect();
} }
node.brokerConn.register(this);
} }
else { else {
this.error(RED._("mqtt.errors.not-defined")); this.error(RED._("mqtt.errors.not-defined"));
} }
this.on('close', function() {
if (node.brokerConn) {
node.brokerConn.unsubscribe(node.topic,node.id);
node.brokerConn.deregister(node);
}
});
} else { } else {
this.error(RED._("mqtt.errors.missing-config")); this.error(RED._("mqtt.errors.missing-config"));
} }
this.on('close', function() {
if (this.brokerConn) {
this.brokerConn.unsubscribe(this.topic,this.id);
node.brokerConn.deregister();
}
});
} }
RED.nodes.registerType("mqtt in",MQTTInNode); RED.nodes.registerType("mqtt in",MQTTInNode);
@ -337,11 +337,10 @@ module.exports = function(RED) {
this.retain = n.retain; this.retain = n.retain;
this.broker = n.broker; this.broker = n.broker;
this.brokerConn = RED.nodes.getNode(this.broker); this.brokerConn = RED.nodes.getNode(this.broker);
var node = this;
if (this.brokerConn) { if (this.brokerConn) {
this.status({fill:"red",shape:"ring",text:"common.status.disconnected"}); this.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
var node = this;
node.brokerConn.register();
this.on("input",function(msg) { this.on("input",function(msg) {
if (msg.qos) { if (msg.qos) {
msg.qos = parseInt(msg.qos); msg.qos = parseInt(msg.qos);
@ -362,23 +361,16 @@ module.exports = function(RED) {
else { node.warn(RED._("mqtt.errors.invalid-topic")); } else { node.warn(RED._("mqtt.errors.invalid-topic")); }
} }
}); });
this.brokerConn.on("disconnected",function() {
node.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
});
this.brokerConn.on("connected",function() {
node.status({fill:"green",shape:"dot",text:"common.status.connected"});
});
if (this.brokerConn.connected) { if (this.brokerConn.connected) {
node.status({fill:"green",shape:"dot",text:"common.status.connected"}); node.status({fill:"green",shape:"dot",text:"common.status.connected"});
} else {
this.brokerConn.connect();
} }
node.brokerConn.register(node);
this.on('close', function() {
node.brokerConn.deregister(node);
});
} else { } else {
this.error(RED._("mqtt.errors.missing-config")); this.error(RED._("mqtt.errors.missing-config"));
} }
this.on('close', function() {
node.brokerConn.deregister();
});
} }
RED.nodes.registerType("mqtt out",MQTTOutNode); RED.nodes.registerType("mqtt out",MQTTOutNode);
}; };