From ca204dea2de9e5bbf77b046ff70e659de5d8bc2c Mon Sep 17 00:00:00 2001 From: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> Date: Mon, 2 May 2022 21:28:04 +0100 Subject: [PATCH 01/12] Use new validation option to return better label Co-authored-by: Nick O'Leary --- .../@node-red/nodes/core/network/10-mqtt.html | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html index ecef71366..b22b9545f 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html @@ -442,7 +442,17 @@ } return defaultContentType || 'none' } - + /** + * Test a topic string is valid for publishing + * @param {string} topic + * @returns `true` if it is a valid topic + */ + function validateMQTTPublishTopic(topic, opts) { + if(!topic || topic == "" || !/[\+#\b\f\n\r\t\v\0]/.test(topic)) { + return true; + } + return RED._("node-red:mqtt.errors.invalid-topic"); + } RED.nodes.registerType('mqtt-broker',{ category: 'config', defaults: { From 3be75fa82256f76e7853cfb27178df3ac11aaf33 Mon Sep 17 00:00:00 2001 From: Phil Day Date: Thu, 5 May 2022 16:12:28 +0100 Subject: [PATCH 02/12] Add Force parameter mqtt client.end() when called in disconnect --- .../@node-red/nodes/core/network/10-mqtt.js | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 1e8f8ce9c..2478da4b2 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -765,11 +765,23 @@ module.exports = function(RED) { } callback && typeof callback == "function" && callback(); }; - if(node.closing) { - return _callback(false, force); - } - var endCallBack = function endCallBack() { - } + if(!node.client) { return _callback(); } + if(node.closing) { return _callback(); } + + let waitEnd = (client, ms) => { + return new Promise( (resolve, reject) => { + node.closing = true; + if(!client) { + resolve(); + } else { + const t = setTimeout(reject, ms); + client.end(true, () => { + clearTimeout(t); + resolve() + }); + } + }); + }; if(node.connected && node.closeMessage) { node.publish(node.closeMessage, function (err) { node.client.end(endCallBack); From 02d9ad479d81bbd69b099295b0ffe4777b053c21 Mon Sep 17 00:00:00 2001 From: Phil Day Date: Fri, 6 May 2022 15:29:42 +0100 Subject: [PATCH 03/12] Track which event handlers we add to the mqtt client so we can removed them cleanly --- package.json | 4 +- .../@node-red/nodes/core/network/10-mqtt.js | 51 ++++++++++++++----- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/package.json b/package.json index df031ca2f..234c88881 100644 --- a/package.json +++ b/package.json @@ -84,8 +84,8 @@ "bcrypt": "5.0.1" }, "devDependencies": { - "dompurify": "2.3.5", - "grunt": "1.4.1", + "dompurify": "2.3.6", + "grunt": "^1.5.2", "grunt-chmod": "~1.1.1", "grunt-cli": "~1.4.3", "grunt-concurrent": "3.0.0", diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 2478da4b2..79f2aee67 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -391,6 +391,7 @@ module.exports = function(RED) { node.options = {}; node.queue = []; node.subscriptions = {}; + node.clientListeners = [] /** @type {mqtt.MqttClient}*/ this.client; node.setOptions = function(opts, init) { if(!opts || typeof opts !== "object") { @@ -658,9 +659,9 @@ module.exports = function(RED) { node.serverProperties = {}; node.client = mqtt.connect(node.brokerurl, node.options); node.client.setMaxListeners(0); - let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times + let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times // Register successful connect or reconnect handler - node.client.on('connect', function (connack) { + node._clientOn('connect', function (connack) { node.closing = false; node.connecting = false; node.connected = true; @@ -692,7 +693,7 @@ module.exports = function(RED) { } setStatusConnected(node, true); // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection - node.client.removeAllListeners('message'); + node._clientRemoveAllListeners('message'); // Re-subscribe to stored topics for (var s in node.subscriptions) { @@ -704,7 +705,7 @@ module.exports = function(RED) { if (node.subscriptions[s].hasOwnProperty(r)) { qos = Math.max(qos,node.subscriptions[s][r].qos); _options = node.subscriptions[s][r].options; - node.client.on('message',node.subscriptions[s][r].handler); + node._clientOn('message',node.subscriptions[s][r].handler); } } _options.qos = _options.qos || qos; @@ -717,11 +718,11 @@ module.exports = function(RED) { node.publish(node.birthMessage); } }); - node.client.on("reconnect", function() { + node._clientOn("reconnect", function() { setStatusConnecting(node, true); }); //Broker Disconnect - V5 event - node.client.on("disconnect", function(packet) { + node._clientOn("disconnect", function(packet) { //Emitted after receiving disconnect packet from broker. MQTT 5.0 feature. const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode; const rs = packet && packet.properties && packet.properties.reasonString || ""; @@ -735,7 +736,7 @@ module.exports = function(RED) { setStatusDisconnected(node, true); }); // Register disconnect handlers - node.client.on('close', function () { + node._clientOn('close', function () { if (node.connected) { node.connected = false; node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); @@ -747,7 +748,7 @@ module.exports = function(RED) { // Register connect error handler // The client's own reconnect logic will take care of errors - node.client.on('error', function (error) { + node._clientOn('error', function (error) { }); }catch(err) { console.log(err); @@ -763,6 +764,9 @@ module.exports = function(RED) { node.connecting = false; node.connected = false; } + if(node.client) { node._clientRemoveAllListeners(); } + node.connecting = false; + node.connected = false; callback && typeof callback == "function" && callback(); }; if(!node.client) { return _callback(); } @@ -775,7 +779,7 @@ module.exports = function(RED) { resolve(); } else { const t = setTimeout(reject, ms); - client.end(true, () => { + client.end(() => { clearTimeout(t); resolve() }); @@ -831,7 +835,7 @@ module.exports = function(RED) { }; node.subscriptions[topic][ref] = sub; if (node.connected) { - node.client.on('message',sub.handler); + node._clientOn('message',sub.handler); node.client.subscribe(topic, options); } }; @@ -908,10 +912,33 @@ module.exports = function(RED) { }; node.on('close', function(done) { - node.closing = true; - node.disconnect(done); + node.disconnect(function() { + if(node.client) { + node._clientRemoveAllListeners(); + } + done(); + }); }); + // Helper functions to track the event listners we add to the + // client. The mqtt client also uses it own set of listeners + // so we can't use removeAllListeners() wothout breaking it + node._clientOn = function(event, f) { + node.clientListeners.push({event, f}) + node.client.on(event, f) + } + + node._clientRemoveAllListeners = function(event) { + node.clientListeners = node.clientListeners.filter((l) => { + if (!event || (event == l.e)) { + node.client.removeListener(l.event, l.f) + return false; + } + else + return true; + }) + } + } RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{ From 1eea3711249eb2ff0f7cb42db09643004c4b45ae Mon Sep 17 00:00:00 2001 From: Phil Day Date: Fri, 6 May 2022 15:34:25 +0100 Subject: [PATCH 04/12] Revent change of grunt version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 234c88881..3d714d10e 100644 --- a/package.json +++ b/package.json @@ -85,7 +85,7 @@ }, "devDependencies": { "dompurify": "2.3.6", - "grunt": "^1.5.2", + "grunt": "1.5.2", "grunt-chmod": "~1.1.1", "grunt-cli": "~1.4.3", "grunt-concurrent": "3.0.0", From 03484fd5cd77343c5f74ed8170f460544c096472 Mon Sep 17 00:00:00 2001 From: Phil Day Date: Mon, 9 May 2022 15:39:12 +0100 Subject: [PATCH 05/12] Updated to cover the removal of individual event handlers --- .../@node-red/nodes/core/network/10-mqtt.js | 38 +++++++++++++------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 79f2aee67..5271c4273 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -693,7 +693,7 @@ module.exports = function(RED) { } setStatusConnected(node, true); // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection - node._clientRemoveAllListeners('message'); + node._clientRemoveListeners('message'); // Re-subscribe to stored topics for (var s in node.subscriptions) { @@ -764,7 +764,7 @@ module.exports = function(RED) { node.connecting = false; node.connected = false; } - if(node.client) { node._clientRemoveAllListeners(); } + if(node.client) { node._clientRemoveListeners(); } node.connecting = false; node.connected = false; callback && typeof callback == "function" && callback(); @@ -846,7 +846,7 @@ module.exports = function(RED) { if (sub) { if (sub[ref]) { if(node.client) { - node.client.removeListener('message',sub[ref].handler); + node._clientRemoveListeners('message',sub[ref].handler); } delete sub[ref]; } @@ -914,7 +914,7 @@ module.exports = function(RED) { node.on('close', function(done) { node.disconnect(function() { if(node.client) { - node._clientRemoveAllListeners(); + node._clientRemoveListeners(); } done(); }); @@ -923,19 +923,33 @@ module.exports = function(RED) { // Helper functions to track the event listners we add to the // client. The mqtt client also uses it own set of listeners // so we can't use removeAllListeners() wothout breaking it - node._clientOn = function(event, f) { + + /** + * Add an event handlers to the MQTT.js client + * @param {string} [event] The name of the event (optional) + * @param {function} [handler] The handler for this event + */ + node._clientOn = function(event, f) { node.clientListeners.push({event, f}) node.client.on(event, f) } - node._clientRemoveAllListeners = function(event) { + /** + * Remove event handlers from the MQTT.js client & only the events + * that we attached in {@link node._clientOn `node._clientOn`}. + * * If `event` is omitted, then all events matching `handler` are removed + * * If `handler` is omitted, then all events named `event` are removed + * * If both parameters are omitted, then all events are removed + * @param {string} [event] The name of the event (optional) + * @param {function} [handler] The handler for this event + */ + node._clientRemoveListeners = function(event, handler) { node.clientListeners = node.clientListeners.filter((l) => { - if (!event || (event == l.e)) { - node.client.removeListener(l.event, l.f) - return false; - } - else - return true; + if (event && event !== l.e) { return true; } + if (handler && handler !== l.h) { return true; } + node.client.removeListener(l.event, l.f) + return false; //found and removed, fliter out this one + }) } From bcab1cf0960f8d311e0baecbda6226cc7760b065 Mon Sep 17 00:00:00 2001 From: Phil Day <69908274+PhilDay-CT@users.noreply.github.com> Date: Mon, 9 May 2022 16:20:27 +0100 Subject: [PATCH 06/12] Update packages/node_modules/@node-red/nodes/core/network/10-mqtt.js Co-authored-by: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> --- .../@node-red/nodes/core/network/10-mqtt.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 5271c4273..ce9746aa3 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -925,13 +925,13 @@ module.exports = function(RED) { // so we can't use removeAllListeners() wothout breaking it /** - * Add an event handlers to the MQTT.js client - * @param {string} [event] The name of the event (optional) - * @param {function} [handler] The handler for this event + * Add an event handler to the MQTT.js client + * @param {string} event The name of the event + * @param {function} handler The handler for this event */ - node._clientOn = function(event, f) { - node.clientListeners.push({event, f}) - node.client.on(event, f) + node._clientOn = function(event, handler) { + node.clientListeners.push({event, handler}) + node.client.on(event, handler) } /** From 1d40378f8c682d2b4852d0a61180c5a961625281 Mon Sep 17 00:00:00 2001 From: Phil Day <69908274+PhilDay-CT@users.noreply.github.com> Date: Mon, 9 May 2022 16:20:44 +0100 Subject: [PATCH 07/12] Update packages/node_modules/@node-red/nodes/core/network/10-mqtt.js Co-authored-by: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> --- .../@node-red/nodes/core/network/10-mqtt.js | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index ce9746aa3..695d98574 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -941,15 +941,14 @@ module.exports = function(RED) { * * If `handler` is omitted, then all events named `event` are removed * * If both parameters are omitted, then all events are removed * @param {string} [event] The name of the event (optional) - * @param {function} [handler] The handler for this event + * @param {function} [handler] The handler for this event (optional) */ node._clientRemoveListeners = function(event, handler) { node.clientListeners = node.clientListeners.filter((l) => { - if (event && event !== l.e) { return true; } - if (handler && handler !== l.h) { return true; } - node.client.removeListener(l.event, l.f) - return false; //found and removed, fliter out this one - + if (event && event !== l.event) { return true; } + if (handler && handler !== l.handler) { return true; } + node.client.removeListener(l.event, l.handler) + return false; //found and removed, filter out this one }) } From 6a799e7e2a1400f5f3d92bfe6c8526ee8981f747 Mon Sep 17 00:00:00 2001 From: Phil Day <69908274+PhilDay-CT@users.noreply.github.com> Date: Mon, 9 May 2022 16:22:50 +0100 Subject: [PATCH 08/12] Update packages/node_modules/@node-red/nodes/core/network/10-mqtt.js Co-authored-by: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> --- .../node_modules/@node-red/nodes/core/network/10-mqtt.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 695d98574..454d7eab4 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -657,6 +657,11 @@ module.exports = function(RED) { setStatusConnecting(node, true); try { node.serverProperties = {}; + if(node.client) { + //belt and braces to avoid left over clients + node.client.end(true); + node._clientRemoveListeners(); + } node.client = mqtt.connect(node.brokerurl, node.options); node.client.setMaxListeners(0); let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times From 2ded07c765f5f7d636a089b71b23f39616f273d1 Mon Sep 17 00:00:00 2001 From: Phil Day Date: Mon, 9 May 2022 16:29:39 +0100 Subject: [PATCH 09/12] call client.end with force=true on timeout --- .../node_modules/@node-red/nodes/core/network/10-mqtt.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 454d7eab4..cca28fde1 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -783,8 +783,12 @@ module.exports = function(RED) { if(!client) { resolve(); } else { - const t = setTimeout(reject, ms); - client.end(() => { + const t = setTimeout(() => { + //clean end() has exceeded WAIT_END, lets force end! + client && client.end(true); + reject(); + }, ms); + client.end(() => { clearTimeout(t); resolve() }); From e7f6549cb6ab35e012289934b64608e5ca273570 Mon Sep 17 00:00:00 2001 From: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> Date: Mon, 9 May 2022 16:37:25 +0100 Subject: [PATCH 10/12] Remove unnecessary call to `clientRemoveListeners` Also, merge the non JSDOC comment into the JSDOC comment --- .../@node-red/nodes/core/network/10-mqtt.js | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index cca28fde1..13d724ed9 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -922,19 +922,14 @@ module.exports = function(RED) { node.on('close', function(done) { node.disconnect(function() { - if(node.client) { - node._clientRemoveListeners(); - } done(); }); }); - // Helper functions to track the event listners we add to the - // client. The mqtt client also uses it own set of listeners - // so we can't use removeAllListeners() wothout breaking it - /** - * Add an event handler to the MQTT.js client + * Add event handlers to the MQTT.js client and track them so that + * we do not remove any handlers that the MQTT client uses internally. + * Use {@link node._clientRemoveListeners `node._clientRemoveListeners`} to remove handlers * @param {string} event The name of the event * @param {function} handler The handler for this event */ From 8133c5e8345477f9d71d61c89fa23e99b78aad57 Mon Sep 17 00:00:00 2001 From: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> Date: Tue, 24 May 2022 20:43:29 +0100 Subject: [PATCH 11/12] define noproxy variable --- packages/node_modules/@node-red/nodes/core/network/10-mqtt.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 13d724ed9..166f68381 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -530,7 +530,7 @@ module.exports = function(RED) { // Only for ws or wss, check if proxy env var for additional configuration if (node.brokerurl.indexOf("wss://") > -1 || node.brokerurl.indexOf("ws://") > -1) { // check if proxy is set in env - let prox, noprox; + let prox, noprox, noproxy; if (process.env.http_proxy) { prox = process.env.http_proxy; } if (process.env.HTTP_PROXY) { prox = process.env.HTTP_PROXY; } if (process.env.no_proxy) { noprox = process.env.no_proxy.split(","); } From 933cad888f1730fc89815b62f25b561ae4519b13 Mon Sep 17 00:00:00 2001 From: Steve-Mcl Date: Mon, 13 Jun 2022 12:57:22 +0100 Subject: [PATCH 12/12] add changes from #3563 Add client and Runtime MQTT topic validation and fix subsequent connection lockup (that arises due to bad birth/will topic) backport-2.x --- .../@node-red/nodes/core/network/10-mqtt.js | 90 +++++++++++++------ 1 file changed, 61 insertions(+), 29 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 166f68381..605a8baa1 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -68,12 +68,21 @@ module.exports = function(RED) { } /** - * Test a topic string is valid + * Test a topic string is valid for subscription * @param {string} topic * @returns `true` if it is a valid topic */ function isValidSubscriptionTopic(topic) { - return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic) + return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic); + } + + /** + * Test a topic string is valid for publishing + * @param {string} topic + * @returns `true` if it is a valid topic + */ + function isValidPublishTopic(topic) { + return !/[\+#\b\f\n\r\t\v\0]/.test(topic); } /** @@ -288,7 +297,7 @@ module.exports = function(RED) { //TODO: delete msg.responseTopic - to prevent it being resent? } } - topicOK = topicOK && !/[\+#\b\f\n\r\t\v\0]/.test(msg.topic); + topicOK = topicOK && isValidPublishTopic(msg.topic); if (topicOK) { node.brokerConn.publish(msg, done); // send the message @@ -362,7 +371,7 @@ module.exports = function(RED) { node.brokerConn.connect(function () { done(); }); - }, true) + }) } else { // Without force flag, we will refuse to cycle an active connection done(new Error(RED._('mqtt.errors.invalid-action-alreadyconnected'))); @@ -760,14 +769,11 @@ module.exports = function(RED) { } } }; - node.disconnect = function (callback, force) { - const _callback = function (resetNodeConnectedState, _force) { - setStatusDisconnected(node, true); - if(resetNodeConnectedState || _force) { - node.client.removeAllListeners(); - node.closing = true; - node.connecting = false; - node.connected = false; + + node.disconnect = function (callback) { + const _callback = function () { + if(node.connected || node.connecting) { + setStatusDisconnected(node, true); } if(node.client) { node._clientRemoveListeners(); } node.connecting = false; @@ -797,17 +803,18 @@ module.exports = function(RED) { }; if(node.connected && node.closeMessage) { node.publish(node.closeMessage, function (err) { - node.client.end(endCallBack); - _callback(true, force); + waitEnd(node.client, 2000).then(() => { + _callback(); + }).catch((e) => { + _callback(); + }) }); - } else if(node.connected) { - node.client.end(endCallBack); - _callback(true, force); - } else if(node.connecting) { - node.client.end(); - _callback(true, true); } else { - _callback(false, force); + waitEnd(node.client, 2000).then(() => { + _callback(); + }).catch((e) => { + _callback(); + }) } } node.subscriptionIds = {}; @@ -889,8 +896,18 @@ module.exports = function(RED) { qos: msg.qos || 0, retain: msg.retain || false }; + let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (isValidPublishTopic(msg.topic)); //https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback if(node.options.protocolVersion == 5) { + const bsp = node.serverProperties || {}; + if (msg.userProperties && typeof msg.userProperties !== "object") { + delete msg.userProperties; + } + if (hasProperty(msg, "topicAlias") && !isNaN(Number(msg.topicAlias))) { + msg.topicAlias = parseInt(msg.topicAlias); + } else { + delete msg.topicAlias; + } options.properties = options.properties || {}; setStrProp(msg, options.properties, "responseTopic"); setBufferProp(msg, options.properties, "correlationData"); @@ -900,23 +917,36 @@ module.exports = function(RED) { setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0); setBoolProp(msg, options.properties, "payloadFormatIndicator"); //FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455); - if (options.properties.topicAlias) { - if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") { + + //check & sanitise topic + if (topicOK && options.properties.topicAlias) { + let aliasValid = (bsp.topicAliasMaximum && bsp.topicAliasMaximum >= options.properties.topicAlias); + if (!aliasValid) { done("Invalid topicAlias"); return } if (node.topicAliases[options.properties.topicAlias] === msg.topic) { - msg.topic = "" + msg.topic = ""; } else { - node.topicAliases[options.properties.topicAlias] = msg.topic + node.topicAliases[options.properties.topicAlias] = msg.topic; } + } else if (!msg.topic && options.properties.responseTopic) { + msg.topic = msg.responseTopic; + topicOK = isValidPublishTopic(msg.topic); + delete msg.responseTopic; //prevent responseTopic being resent? } } - node.client.publish(msg.topic, msg.payload, options, function(err) { - done && done(err); - return - }); + if (topicOK) { + node.client.publish(msg.topic, msg.payload, options, function(err) { + done && done(err); + return + }); + } else { + const error = new Error(RED._("mqtt.errors.invalid-topic")); + error.warn = true; + done(error); + } } }; @@ -1130,6 +1160,7 @@ module.exports = function(RED) { node.brokerConn.unsubscribe(node.topic,node.id, removed); } node.brokerConn.deregister(node, done); + node.brokerConn = null; } else { done(); } @@ -1194,6 +1225,7 @@ module.exports = function(RED) { node.on('close', function(done) { if (node.brokerConn) { node.brokerConn.deregister(node,done); + node.brokerConn = null; } else { done(); }