diff --git a/package.json b/package.json index df031ca2f..3d714d10e 100644 --- a/package.json +++ b/package.json @@ -84,8 +84,8 @@ "bcrypt": "5.0.1" }, "devDependencies": { - "dompurify": "2.3.5", - "grunt": "1.4.1", + "dompurify": "2.3.6", + "grunt": "1.5.2", "grunt-chmod": "~1.1.1", "grunt-cli": "~1.4.3", "grunt-concurrent": "3.0.0", diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html index ecef71366..b22b9545f 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html @@ -442,7 +442,17 @@ } return defaultContentType || 'none' } - + /** + * Test a topic string is valid for publishing + * @param {string} topic + * @returns `true` if it is a valid topic + */ + function validateMQTTPublishTopic(topic, opts) { + if(!topic || topic == "" || !/[\+#\b\f\n\r\t\v\0]/.test(topic)) { + return true; + } + return RED._("node-red:mqtt.errors.invalid-topic"); + } RED.nodes.registerType('mqtt-broker',{ category: 'config', defaults: { diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 1e8f8ce9c..605a8baa1 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -68,12 +68,21 @@ module.exports = function(RED) { } /** - * Test a topic string is valid + * Test a topic string is valid for subscription * @param {string} topic * @returns `true` if it is a valid topic */ function isValidSubscriptionTopic(topic) { - return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic) + return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic); + } + + /** + * Test a topic string is valid for publishing + * @param {string} topic + * @returns `true` if it is a valid topic + */ + function isValidPublishTopic(topic) { + return !/[\+#\b\f\n\r\t\v\0]/.test(topic); } /** @@ -288,7 +297,7 @@ module.exports = function(RED) { //TODO: delete msg.responseTopic - to prevent it being resent? } } - topicOK = topicOK && !/[\+#\b\f\n\r\t\v\0]/.test(msg.topic); + topicOK = topicOK && isValidPublishTopic(msg.topic); if (topicOK) { node.brokerConn.publish(msg, done); // send the message @@ -362,7 +371,7 @@ module.exports = function(RED) { node.brokerConn.connect(function () { done(); }); - }, true) + }) } else { // Without force flag, we will refuse to cycle an active connection done(new Error(RED._('mqtt.errors.invalid-action-alreadyconnected'))); @@ -391,6 +400,7 @@ module.exports = function(RED) { node.options = {}; node.queue = []; node.subscriptions = {}; + node.clientListeners = [] /** @type {mqtt.MqttClient}*/ this.client; node.setOptions = function(opts, init) { if(!opts || typeof opts !== "object") { @@ -529,7 +539,7 @@ module.exports = function(RED) { // Only for ws or wss, check if proxy env var for additional configuration if (node.brokerurl.indexOf("wss://") > -1 || node.brokerurl.indexOf("ws://") > -1) { // check if proxy is set in env - let prox, noprox; + let prox, noprox, noproxy; if (process.env.http_proxy) { prox = process.env.http_proxy; } if (process.env.HTTP_PROXY) { prox = process.env.HTTP_PROXY; } if (process.env.no_proxy) { noprox = process.env.no_proxy.split(","); } @@ -656,11 +666,16 @@ module.exports = function(RED) { setStatusConnecting(node, true); try { node.serverProperties = {}; + if(node.client) { + //belt and braces to avoid left over clients + node.client.end(true); + node._clientRemoveListeners(); + } node.client = mqtt.connect(node.brokerurl, node.options); node.client.setMaxListeners(0); - let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times + let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times // Register successful connect or reconnect handler - node.client.on('connect', function (connack) { + node._clientOn('connect', function (connack) { node.closing = false; node.connecting = false; node.connected = true; @@ -692,7 +707,7 @@ module.exports = function(RED) { } setStatusConnected(node, true); // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection - node.client.removeAllListeners('message'); + node._clientRemoveListeners('message'); // Re-subscribe to stored topics for (var s in node.subscriptions) { @@ -704,7 +719,7 @@ module.exports = function(RED) { if (node.subscriptions[s].hasOwnProperty(r)) { qos = Math.max(qos,node.subscriptions[s][r].qos); _options = node.subscriptions[s][r].options; - node.client.on('message',node.subscriptions[s][r].handler); + node._clientOn('message',node.subscriptions[s][r].handler); } } _options.qos = _options.qos || qos; @@ -717,11 +732,11 @@ module.exports = function(RED) { node.publish(node.birthMessage); } }); - node.client.on("reconnect", function() { + node._clientOn("reconnect", function() { setStatusConnecting(node, true); }); //Broker Disconnect - V5 event - node.client.on("disconnect", function(packet) { + node._clientOn("disconnect", function(packet) { //Emitted after receiving disconnect packet from broker. MQTT 5.0 feature. const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode; const rs = packet && packet.properties && packet.properties.reasonString || ""; @@ -735,7 +750,7 @@ module.exports = function(RED) { setStatusDisconnected(node, true); }); // Register disconnect handlers - node.client.on('close', function () { + node._clientOn('close', function () { if (node.connected) { node.connected = false; node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); @@ -747,42 +762,59 @@ module.exports = function(RED) { // Register connect error handler // The client's own reconnect logic will take care of errors - node.client.on('error', function (error) { + node._clientOn('error', function (error) { }); }catch(err) { console.log(err); } } }; - node.disconnect = function (callback, force) { - const _callback = function (resetNodeConnectedState, _force) { - setStatusDisconnected(node, true); - if(resetNodeConnectedState || _force) { - node.client.removeAllListeners(); - node.closing = true; - node.connecting = false; - node.connected = false; + + node.disconnect = function (callback) { + const _callback = function () { + if(node.connected || node.connecting) { + setStatusDisconnected(node, true); } + if(node.client) { node._clientRemoveListeners(); } + node.connecting = false; + node.connected = false; callback && typeof callback == "function" && callback(); }; - if(node.closing) { - return _callback(false, force); - } - var endCallBack = function endCallBack() { - } + if(!node.client) { return _callback(); } + if(node.closing) { return _callback(); } + + let waitEnd = (client, ms) => { + return new Promise( (resolve, reject) => { + node.closing = true; + if(!client) { + resolve(); + } else { + const t = setTimeout(() => { + //clean end() has exceeded WAIT_END, lets force end! + client && client.end(true); + reject(); + }, ms); + client.end(() => { + clearTimeout(t); + resolve() + }); + } + }); + }; if(node.connected && node.closeMessage) { node.publish(node.closeMessage, function (err) { - node.client.end(endCallBack); - _callback(true, force); + waitEnd(node.client, 2000).then(() => { + _callback(); + }).catch((e) => { + _callback(); + }) }); - } else if(node.connected) { - node.client.end(endCallBack); - _callback(true, force); - } else if(node.connecting) { - node.client.end(); - _callback(true, true); } else { - _callback(false, force); + waitEnd(node.client, 2000).then(() => { + _callback(); + }).catch((e) => { + _callback(); + }) } } node.subscriptionIds = {}; @@ -819,7 +851,7 @@ module.exports = function(RED) { }; node.subscriptions[topic][ref] = sub; if (node.connected) { - node.client.on('message',sub.handler); + node._clientOn('message',sub.handler); node.client.subscribe(topic, options); } }; @@ -830,7 +862,7 @@ module.exports = function(RED) { if (sub) { if (sub[ref]) { if(node.client) { - node.client.removeListener('message',sub[ref].handler); + node._clientRemoveListeners('message',sub[ref].handler); } delete sub[ref]; } @@ -864,8 +896,18 @@ module.exports = function(RED) { qos: msg.qos || 0, retain: msg.retain || false }; + let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (isValidPublishTopic(msg.topic)); //https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback if(node.options.protocolVersion == 5) { + const bsp = node.serverProperties || {}; + if (msg.userProperties && typeof msg.userProperties !== "object") { + delete msg.userProperties; + } + if (hasProperty(msg, "topicAlias") && !isNaN(Number(msg.topicAlias))) { + msg.topicAlias = parseInt(msg.topicAlias); + } else { + delete msg.topicAlias; + } options.properties = options.properties || {}; setStrProp(msg, options.properties, "responseTopic"); setBufferProp(msg, options.properties, "correlationData"); @@ -875,31 +917,75 @@ module.exports = function(RED) { setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0); setBoolProp(msg, options.properties, "payloadFormatIndicator"); //FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455); - if (options.properties.topicAlias) { - if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") { + + //check & sanitise topic + if (topicOK && options.properties.topicAlias) { + let aliasValid = (bsp.topicAliasMaximum && bsp.topicAliasMaximum >= options.properties.topicAlias); + if (!aliasValid) { done("Invalid topicAlias"); return } if (node.topicAliases[options.properties.topicAlias] === msg.topic) { - msg.topic = "" + msg.topic = ""; } else { - node.topicAliases[options.properties.topicAlias] = msg.topic + node.topicAliases[options.properties.topicAlias] = msg.topic; } + } else if (!msg.topic && options.properties.responseTopic) { + msg.topic = msg.responseTopic; + topicOK = isValidPublishTopic(msg.topic); + delete msg.responseTopic; //prevent responseTopic being resent? } } - node.client.publish(msg.topic, msg.payload, options, function(err) { - done && done(err); - return - }); + if (topicOK) { + node.client.publish(msg.topic, msg.payload, options, function(err) { + done && done(err); + return + }); + } else { + const error = new Error(RED._("mqtt.errors.invalid-topic")); + error.warn = true; + done(error); + } } }; node.on('close', function(done) { - node.closing = true; - node.disconnect(done); + node.disconnect(function() { + done(); + }); }); + /** + * Add event handlers to the MQTT.js client and track them so that + * we do not remove any handlers that the MQTT client uses internally. + * Use {@link node._clientRemoveListeners `node._clientRemoveListeners`} to remove handlers + * @param {string} event The name of the event + * @param {function} handler The handler for this event + */ + node._clientOn = function(event, handler) { + node.clientListeners.push({event, handler}) + node.client.on(event, handler) + } + + /** + * Remove event handlers from the MQTT.js client & only the events + * that we attached in {@link node._clientOn `node._clientOn`}. + * * If `event` is omitted, then all events matching `handler` are removed + * * If `handler` is omitted, then all events named `event` are removed + * * If both parameters are omitted, then all events are removed + * @param {string} [event] The name of the event (optional) + * @param {function} [handler] The handler for this event (optional) + */ + node._clientRemoveListeners = function(event, handler) { + node.clientListeners = node.clientListeners.filter((l) => { + if (event && event !== l.event) { return true; } + if (handler && handler !== l.handler) { return true; } + node.client.removeListener(l.event, l.handler) + return false; //found and removed, filter out this one + }) + } + } RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{ @@ -1074,6 +1160,7 @@ module.exports = function(RED) { node.brokerConn.unsubscribe(node.topic,node.id, removed); } node.brokerConn.deregister(node, done); + node.brokerConn = null; } else { done(); } @@ -1138,6 +1225,7 @@ module.exports = function(RED) { node.on('close', function(done) { if (node.brokerConn) { node.brokerConn.deregister(node,done); + node.brokerConn = null; } else { done(); }