From b2ec040a8d43d120066604755567500bb0251b27 Mon Sep 17 00:00:00 2001 From: Phil Day Date: Thu, 5 May 2022 16:12:28 +0100 Subject: [PATCH 1/9] Add Force parameter mqtt client.end() when called in disconnect --- packages/node_modules/@node-red/nodes/core/network/10-mqtt.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index f8689ef9a..5e64677e9 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -837,7 +837,7 @@ module.exports = function(RED) { resolve(); } else { const t = setTimeout(reject, ms); - client.end(() => { + client.end(true, () => { clearTimeout(t); resolve() }); From 7845ebffc503d0f8306ca65369d7a122bed4f0db Mon Sep 17 00:00:00 2001 From: Phil Day Date: Fri, 6 May 2022 15:29:42 +0100 Subject: [PATCH 2/9] Track which event handlers we add to the mqtt client so we can removed them cleanly --- package.json | 2 +- .../@node-red/nodes/core/network/10-mqtt.js | 44 ++++++++++++++----- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/package.json b/package.json index 28b09f46e..603067086 100644 --- a/package.json +++ b/package.json @@ -86,7 +86,7 @@ }, "devDependencies": { "dompurify": "2.3.6", - "grunt": "1.5.2", + "grunt": "^1.5.2", "grunt-chmod": "~1.1.1", "grunt-cli": "~1.4.3", "grunt-concurrent": "3.0.0", diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 5e64677e9..666e01f62 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -453,6 +453,7 @@ module.exports = function(RED) { node.options = {}; node.queue = []; node.subscriptions = {}; + node.clientListeners = [] /** @type {mqtt.MqttClient}*/ this.client; node.setOptions = function(opts, init) { if(!opts || typeof opts !== "object") { @@ -720,9 +721,9 @@ module.exports = function(RED) { node.serverProperties = {}; node.client = mqtt.connect(node.brokerurl, node.options); node.client.setMaxListeners(0); - let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times + let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times // Register successful connect or reconnect handler - node.client.on('connect', function (connack) { + node._clientOn('connect', function (connack) { node.closing = false; node.connecting = false; node.connected = true; @@ -754,7 +755,7 @@ module.exports = function(RED) { } setStatusConnected(node, true); // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection - node.client.removeAllListeners('message'); + node._clientRemoveAllListeners('message'); // Re-subscribe to stored topics for (var s in node.subscriptions) { @@ -766,7 +767,7 @@ module.exports = function(RED) { if (node.subscriptions[s].hasOwnProperty(r)) { qos = Math.max(qos,node.subscriptions[s][r].qos); _options = node.subscriptions[s][r].options; - node.client.on('message',node.subscriptions[s][r].handler); + node._clientOn('message',node.subscriptions[s][r].handler); } } _options.qos = _options.qos || qos; @@ -779,11 +780,11 @@ module.exports = function(RED) { node.publish(node.birthMessage); } }); - node.client.on("reconnect", function() { + node._clientOn("reconnect", function() { setStatusConnecting(node, true); }); //Broker Disconnect - V5 event - node.client.on("disconnect", function(packet) { + node._clientOn("disconnect", function(packet) { //Emitted after receiving disconnect packet from broker. MQTT 5.0 feature. const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode; const rs = packet && packet.properties && packet.properties.reasonString || ""; @@ -797,7 +798,7 @@ module.exports = function(RED) { setStatusDisconnected(node, true); }); // Register disconnect handlers - node.client.on('close', function () { + node._clientOn('close', function () { if (node.connected) { node.connected = false; node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); @@ -809,7 +810,7 @@ module.exports = function(RED) { // Register connect error handler // The client's own reconnect logic will take care of errors - node.client.on('error', function (error) { + node._clientOn('error', function (error) { }); }catch(err) { console.log(err); @@ -822,7 +823,7 @@ module.exports = function(RED) { if(node.connected || node.connecting) { setStatusDisconnected(node, true); } - if(node.client) { node.client.removeAllListeners(); } + if(node.client) { node._clientRemoveAllListeners(); } node.connecting = false; node.connected = false; callback && typeof callback == "function" && callback(); @@ -837,7 +838,7 @@ module.exports = function(RED) { resolve(); } else { const t = setTimeout(reject, ms); - client.end(true, () => { + client.end(() => { clearTimeout(t); resolve() }); @@ -894,7 +895,7 @@ module.exports = function(RED) { }; node.subscriptions[topic][ref] = sub; if (node.connected) { - node.client.on('message',sub.handler); + node._clientOn('message',sub.handler); node.client.subscribe(topic, options); } }; @@ -996,12 +997,31 @@ module.exports = function(RED) { node.on('close', function(done) { node.disconnect(function() { if(node.client) { - node.client.removeAllListeners(); + node._clientRemoveAllListeners(); } done(); }); }); + // Helper functions to track the event listners we add to the + // client. The mqtt client also uses it own set of listeners + // so we can't use removeAllListeners() wothout breaking it + node._clientOn = function(event, f) { + node.clientListeners.push({event, f}) + node.client.on(event, f) + } + + node._clientRemoveAllListeners = function(event) { + node.clientListeners = node.clientListeners.filter((l) => { + if (!event || (event == l.e)) { + node.client.removeListener(l.event, l.f) + return false; + } + else + return true; + }) + } + } RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{ From 9bc8adc715688d3a6e18c3a38a96999c4cb8cb75 Mon Sep 17 00:00:00 2001 From: Phil Day Date: Fri, 6 May 2022 15:34:25 +0100 Subject: [PATCH 3/9] Revent change of grunt version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 603067086..28b09f46e 100644 --- a/package.json +++ b/package.json @@ -86,7 +86,7 @@ }, "devDependencies": { "dompurify": "2.3.6", - "grunt": "^1.5.2", + "grunt": "1.5.2", "grunt-chmod": "~1.1.1", "grunt-cli": "~1.4.3", "grunt-concurrent": "3.0.0", From 3d3090a8f20a6d06f803595a0dea32f39f9acacc Mon Sep 17 00:00:00 2001 From: Phil Day Date: Mon, 9 May 2022 15:39:12 +0100 Subject: [PATCH 4/9] Updated to cover the removal of individual event handlers --- .../@node-red/nodes/core/network/10-mqtt.js | 38 +++++++++++++------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 666e01f62..2f600796f 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -755,7 +755,7 @@ module.exports = function(RED) { } setStatusConnected(node, true); // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection - node._clientRemoveAllListeners('message'); + node._clientRemoveListeners('message'); // Re-subscribe to stored topics for (var s in node.subscriptions) { @@ -823,7 +823,7 @@ module.exports = function(RED) { if(node.connected || node.connecting) { setStatusDisconnected(node, true); } - if(node.client) { node._clientRemoveAllListeners(); } + if(node.client) { node._clientRemoveListeners(); } node.connecting = false; node.connected = false; callback && typeof callback == "function" && callback(); @@ -906,7 +906,7 @@ module.exports = function(RED) { if (sub) { if (sub[ref]) { if(node.client) { - node.client.removeListener('message',sub[ref].handler); + node._clientRemoveListeners('message',sub[ref].handler); } delete sub[ref]; } @@ -997,7 +997,7 @@ module.exports = function(RED) { node.on('close', function(done) { node.disconnect(function() { if(node.client) { - node._clientRemoveAllListeners(); + node._clientRemoveListeners(); } done(); }); @@ -1006,19 +1006,33 @@ module.exports = function(RED) { // Helper functions to track the event listners we add to the // client. The mqtt client also uses it own set of listeners // so we can't use removeAllListeners() wothout breaking it - node._clientOn = function(event, f) { + + /** + * Add an event handlers to the MQTT.js client + * @param {string} [event] The name of the event (optional) + * @param {function} [handler] The handler for this event + */ + node._clientOn = function(event, f) { node.clientListeners.push({event, f}) node.client.on(event, f) } - node._clientRemoveAllListeners = function(event) { + /** + * Remove event handlers from the MQTT.js client & only the events + * that we attached in {@link node._clientOn `node._clientOn`}. + * * If `event` is omitted, then all events matching `handler` are removed + * * If `handler` is omitted, then all events named `event` are removed + * * If both parameters are omitted, then all events are removed + * @param {string} [event] The name of the event (optional) + * @param {function} [handler] The handler for this event + */ + node._clientRemoveListeners = function(event, handler) { node.clientListeners = node.clientListeners.filter((l) => { - if (!event || (event == l.e)) { - node.client.removeListener(l.event, l.f) - return false; - } - else - return true; + if (event && event !== l.e) { return true; } + if (handler && handler !== l.h) { return true; } + node.client.removeListener(l.event, l.f) + return false; //found and removed, fliter out this one + }) } From 98d524e82dad404af77512ff79f39f435c556a79 Mon Sep 17 00:00:00 2001 From: Phil Day <69908274+PhilDay-CT@users.noreply.github.com> Date: Mon, 9 May 2022 16:20:27 +0100 Subject: [PATCH 5/9] Update packages/node_modules/@node-red/nodes/core/network/10-mqtt.js Co-authored-by: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> --- .../@node-red/nodes/core/network/10-mqtt.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 2f600796f..2fcfaad10 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -1008,13 +1008,13 @@ module.exports = function(RED) { // so we can't use removeAllListeners() wothout breaking it /** - * Add an event handlers to the MQTT.js client - * @param {string} [event] The name of the event (optional) - * @param {function} [handler] The handler for this event + * Add an event handler to the MQTT.js client + * @param {string} event The name of the event + * @param {function} handler The handler for this event */ - node._clientOn = function(event, f) { - node.clientListeners.push({event, f}) - node.client.on(event, f) + node._clientOn = function(event, handler) { + node.clientListeners.push({event, handler}) + node.client.on(event, handler) } /** From 82672a825df1379970b61101d235b9323d13cb9e Mon Sep 17 00:00:00 2001 From: Phil Day <69908274+PhilDay-CT@users.noreply.github.com> Date: Mon, 9 May 2022 16:20:44 +0100 Subject: [PATCH 6/9] Update packages/node_modules/@node-red/nodes/core/network/10-mqtt.js Co-authored-by: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> --- .../@node-red/nodes/core/network/10-mqtt.js | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 2fcfaad10..2a0241316 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -1024,15 +1024,14 @@ module.exports = function(RED) { * * If `handler` is omitted, then all events named `event` are removed * * If both parameters are omitted, then all events are removed * @param {string} [event] The name of the event (optional) - * @param {function} [handler] The handler for this event + * @param {function} [handler] The handler for this event (optional) */ node._clientRemoveListeners = function(event, handler) { node.clientListeners = node.clientListeners.filter((l) => { - if (event && event !== l.e) { return true; } - if (handler && handler !== l.h) { return true; } - node.client.removeListener(l.event, l.f) - return false; //found and removed, fliter out this one - + if (event && event !== l.event) { return true; } + if (handler && handler !== l.handler) { return true; } + node.client.removeListener(l.event, l.handler) + return false; //found and removed, filter out this one }) } From c87ff3ca26e09e322fd47876099d76cf0bc382b5 Mon Sep 17 00:00:00 2001 From: Phil Day <69908274+PhilDay-CT@users.noreply.github.com> Date: Mon, 9 May 2022 16:22:50 +0100 Subject: [PATCH 7/9] Update packages/node_modules/@node-red/nodes/core/network/10-mqtt.js Co-authored-by: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> --- .../node_modules/@node-red/nodes/core/network/10-mqtt.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 2a0241316..0b89b29cd 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -719,6 +719,11 @@ module.exports = function(RED) { setStatusConnecting(node, true); try { node.serverProperties = {}; + if(node.client) { + //belt and braces to avoid left over clients + node.client.end(true); + node._clientRemoveListeners(); + } node.client = mqtt.connect(node.brokerurl, node.options); node.client.setMaxListeners(0); let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times From a0f7e92e40c41228f8af4814bdf277b789098a89 Mon Sep 17 00:00:00 2001 From: Phil Day Date: Mon, 9 May 2022 16:29:39 +0100 Subject: [PATCH 8/9] call client.end with force=true on timeout --- .../node_modules/@node-red/nodes/core/network/10-mqtt.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 0b89b29cd..3b2a17cdd 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -842,8 +842,12 @@ module.exports = function(RED) { if(!client) { resolve(); } else { - const t = setTimeout(reject, ms); - client.end(() => { + const t = setTimeout(() => { + //clean end() has exceeded WAIT_END, lets force end! + client && client.end(true); + reject(); + }, ms); + client.end(() => { clearTimeout(t); resolve() }); From e223b20cbda28334032ff1231d7ded12d9544f9b Mon Sep 17 00:00:00 2001 From: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> Date: Mon, 9 May 2022 16:37:25 +0100 Subject: [PATCH 9/9] Remove unnecessary call to `clientRemoveListeners` Also, merge the non JSDOC comment into the JSDOC comment --- .../@node-red/nodes/core/network/10-mqtt.js | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 3b2a17cdd..901fc9819 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -1005,19 +1005,14 @@ module.exports = function(RED) { node.on('close', function(done) { node.disconnect(function() { - if(node.client) { - node._clientRemoveListeners(); - } done(); }); }); - // Helper functions to track the event listners we add to the - // client. The mqtt client also uses it own set of listeners - // so we can't use removeAllListeners() wothout breaking it - /** - * Add an event handler to the MQTT.js client + * Add event handlers to the MQTT.js client and track them so that + * we do not remove any handlers that the MQTT client uses internally. + * Use {@link node._clientRemoveListeners `node._clientRemoveListeners`} to remove handlers * @param {string} event The name of the event * @param {function} handler The handler for this event */