From 02d9ad479d81bbd69b099295b0ffe4777b053c21 Mon Sep 17 00:00:00 2001 From: Phil Day Date: Fri, 6 May 2022 15:29:42 +0100 Subject: [PATCH] Track which event handlers we add to the mqtt client so we can removed them cleanly --- package.json | 4 +- .../@node-red/nodes/core/network/10-mqtt.js | 51 ++++++++++++++----- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/package.json b/package.json index df031ca2f..234c88881 100644 --- a/package.json +++ b/package.json @@ -84,8 +84,8 @@ "bcrypt": "5.0.1" }, "devDependencies": { - "dompurify": "2.3.5", - "grunt": "1.4.1", + "dompurify": "2.3.6", + "grunt": "^1.5.2", "grunt-chmod": "~1.1.1", "grunt-cli": "~1.4.3", "grunt-concurrent": "3.0.0", diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 2478da4b2..79f2aee67 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -391,6 +391,7 @@ module.exports = function(RED) { node.options = {}; node.queue = []; node.subscriptions = {}; + node.clientListeners = [] /** @type {mqtt.MqttClient}*/ this.client; node.setOptions = function(opts, init) { if(!opts || typeof opts !== "object") { @@ -658,9 +659,9 @@ module.exports = function(RED) { node.serverProperties = {}; node.client = mqtt.connect(node.brokerurl, node.options); node.client.setMaxListeners(0); - let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times + let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times // Register successful connect or reconnect handler - node.client.on('connect', function (connack) { + node._clientOn('connect', function (connack) { node.closing = false; node.connecting = false; node.connected = true; @@ -692,7 +693,7 @@ module.exports = function(RED) { } setStatusConnected(node, true); // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection - node.client.removeAllListeners('message'); + node._clientRemoveAllListeners('message'); // Re-subscribe to stored topics for (var s in node.subscriptions) { @@ -704,7 +705,7 @@ module.exports = function(RED) { if (node.subscriptions[s].hasOwnProperty(r)) { qos = Math.max(qos,node.subscriptions[s][r].qos); _options = node.subscriptions[s][r].options; - node.client.on('message',node.subscriptions[s][r].handler); + node._clientOn('message',node.subscriptions[s][r].handler); } } _options.qos = _options.qos || qos; @@ -717,11 +718,11 @@ module.exports = function(RED) { node.publish(node.birthMessage); } }); - node.client.on("reconnect", function() { + node._clientOn("reconnect", function() { setStatusConnecting(node, true); }); //Broker Disconnect - V5 event - node.client.on("disconnect", function(packet) { + node._clientOn("disconnect", function(packet) { //Emitted after receiving disconnect packet from broker. MQTT 5.0 feature. const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode; const rs = packet && packet.properties && packet.properties.reasonString || ""; @@ -735,7 +736,7 @@ module.exports = function(RED) { setStatusDisconnected(node, true); }); // Register disconnect handlers - node.client.on('close', function () { + node._clientOn('close', function () { if (node.connected) { node.connected = false; node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); @@ -747,7 +748,7 @@ module.exports = function(RED) { // Register connect error handler // The client's own reconnect logic will take care of errors - node.client.on('error', function (error) { + node._clientOn('error', function (error) { }); }catch(err) { console.log(err); @@ -763,6 +764,9 @@ module.exports = function(RED) { node.connecting = false; node.connected = false; } + if(node.client) { node._clientRemoveAllListeners(); } + node.connecting = false; + node.connected = false; callback && typeof callback == "function" && callback(); }; if(!node.client) { return _callback(); } @@ -775,7 +779,7 @@ module.exports = function(RED) { resolve(); } else { const t = setTimeout(reject, ms); - client.end(true, () => { + client.end(() => { clearTimeout(t); resolve() }); @@ -831,7 +835,7 @@ module.exports = function(RED) { }; node.subscriptions[topic][ref] = sub; if (node.connected) { - node.client.on('message',sub.handler); + node._clientOn('message',sub.handler); node.client.subscribe(topic, options); } }; @@ -908,10 +912,33 @@ module.exports = function(RED) { }; node.on('close', function(done) { - node.closing = true; - node.disconnect(done); + node.disconnect(function() { + if(node.client) { + node._clientRemoveAllListeners(); + } + done(); + }); }); + // Helper functions to track the event listners we add to the + // client. The mqtt client also uses it own set of listeners + // so we can't use removeAllListeners() wothout breaking it + node._clientOn = function(event, f) { + node.clientListeners.push({event, f}) + node.client.on(event, f) + } + + node._clientRemoveAllListeners = function(event) { + node.clientListeners = node.clientListeners.filter((l) => { + if (!event || (event == l.e)) { + node.client.removeListener(l.event, l.f) + return false; + } + else + return true; + }) + } + } RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{