1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Track which event handlers we add to the mqtt client so we can removed them cleanly

This commit is contained in:
Phil Day 2022-05-06 15:29:42 +01:00
parent b2ec040a8d
commit 7845ebffc5
2 changed files with 33 additions and 13 deletions

View File

@ -86,7 +86,7 @@
}, },
"devDependencies": { "devDependencies": {
"dompurify": "2.3.6", "dompurify": "2.3.6",
"grunt": "1.5.2", "grunt": "^1.5.2",
"grunt-chmod": "~1.1.1", "grunt-chmod": "~1.1.1",
"grunt-cli": "~1.4.3", "grunt-cli": "~1.4.3",
"grunt-concurrent": "3.0.0", "grunt-concurrent": "3.0.0",

View File

@ -453,6 +453,7 @@ module.exports = function(RED) {
node.options = {}; node.options = {};
node.queue = []; node.queue = [];
node.subscriptions = {}; node.subscriptions = {};
node.clientListeners = []
/** @type {mqtt.MqttClient}*/ this.client; /** @type {mqtt.MqttClient}*/ this.client;
node.setOptions = function(opts, init) { node.setOptions = function(opts, init) {
if(!opts || typeof opts !== "object") { if(!opts || typeof opts !== "object") {
@ -720,9 +721,9 @@ module.exports = function(RED) {
node.serverProperties = {}; node.serverProperties = {};
node.client = mqtt.connect(node.brokerurl, node.options); node.client = mqtt.connect(node.brokerurl, node.options);
node.client.setMaxListeners(0); node.client.setMaxListeners(0);
let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times
// Register successful connect or reconnect handler // Register successful connect or reconnect handler
node.client.on('connect', function (connack) { node._clientOn('connect', function (connack) {
node.closing = false; node.closing = false;
node.connecting = false; node.connecting = false;
node.connected = true; node.connected = true;
@ -754,7 +755,7 @@ module.exports = function(RED) {
} }
setStatusConnected(node, true); setStatusConnected(node, true);
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
node.client.removeAllListeners('message'); node._clientRemoveAllListeners('message');
// Re-subscribe to stored topics // Re-subscribe to stored topics
for (var s in node.subscriptions) { for (var s in node.subscriptions) {
@ -766,7 +767,7 @@ module.exports = function(RED) {
if (node.subscriptions[s].hasOwnProperty(r)) { if (node.subscriptions[s].hasOwnProperty(r)) {
qos = Math.max(qos,node.subscriptions[s][r].qos); qos = Math.max(qos,node.subscriptions[s][r].qos);
_options = node.subscriptions[s][r].options; _options = node.subscriptions[s][r].options;
node.client.on('message',node.subscriptions[s][r].handler); node._clientOn('message',node.subscriptions[s][r].handler);
} }
} }
_options.qos = _options.qos || qos; _options.qos = _options.qos || qos;
@ -779,11 +780,11 @@ module.exports = function(RED) {
node.publish(node.birthMessage); node.publish(node.birthMessage);
} }
}); });
node.client.on("reconnect", function() { node._clientOn("reconnect", function() {
setStatusConnecting(node, true); setStatusConnecting(node, true);
}); });
//Broker Disconnect - V5 event //Broker Disconnect - V5 event
node.client.on("disconnect", function(packet) { node._clientOn("disconnect", function(packet) {
//Emitted after receiving disconnect packet from broker. MQTT 5.0 feature. //Emitted after receiving disconnect packet from broker. MQTT 5.0 feature.
const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode; const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode;
const rs = packet && packet.properties && packet.properties.reasonString || ""; const rs = packet && packet.properties && packet.properties.reasonString || "";
@ -797,7 +798,7 @@ module.exports = function(RED) {
setStatusDisconnected(node, true); setStatusDisconnected(node, true);
}); });
// Register disconnect handlers // Register disconnect handlers
node.client.on('close', function () { node._clientOn('close', function () {
if (node.connected) { if (node.connected) {
node.connected = false; node.connected = false;
node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
@ -809,7 +810,7 @@ module.exports = function(RED) {
// Register connect error handler // Register connect error handler
// The client's own reconnect logic will take care of errors // The client's own reconnect logic will take care of errors
node.client.on('error', function (error) { node._clientOn('error', function (error) {
}); });
}catch(err) { }catch(err) {
console.log(err); console.log(err);
@ -822,7 +823,7 @@ module.exports = function(RED) {
if(node.connected || node.connecting) { if(node.connected || node.connecting) {
setStatusDisconnected(node, true); setStatusDisconnected(node, true);
} }
if(node.client) { node.client.removeAllListeners(); } if(node.client) { node._clientRemoveAllListeners(); }
node.connecting = false; node.connecting = false;
node.connected = false; node.connected = false;
callback && typeof callback == "function" && callback(); callback && typeof callback == "function" && callback();
@ -837,7 +838,7 @@ module.exports = function(RED) {
resolve(); resolve();
} else { } else {
const t = setTimeout(reject, ms); const t = setTimeout(reject, ms);
client.end(true, () => { client.end(() => {
clearTimeout(t); clearTimeout(t);
resolve() resolve()
}); });
@ -894,7 +895,7 @@ module.exports = function(RED) {
}; };
node.subscriptions[topic][ref] = sub; node.subscriptions[topic][ref] = sub;
if (node.connected) { if (node.connected) {
node.client.on('message',sub.handler); node._clientOn('message',sub.handler);
node.client.subscribe(topic, options); node.client.subscribe(topic, options);
} }
}; };
@ -996,12 +997,31 @@ module.exports = function(RED) {
node.on('close', function(done) { node.on('close', function(done) {
node.disconnect(function() { node.disconnect(function() {
if(node.client) { if(node.client) {
node.client.removeAllListeners(); node._clientRemoveAllListeners();
} }
done(); done();
}); });
}); });
// Helper functions to track the event listners we add to the
// client. The mqtt client also uses it own set of listeners
// so we can't use removeAllListeners() wothout breaking it
node._clientOn = function(event, f) {
node.clientListeners.push({event, f})
node.client.on(event, f)
}
node._clientRemoveAllListeners = function(event) {
node.clientListeners = node.clientListeners.filter((l) => {
if (!event || (event == l.e)) {
node.client.removeListener(l.event, l.f)
return false;
}
else
return true;
})
}
} }
RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{ RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{