diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 166f68381..605a8baa1 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -68,12 +68,21 @@ module.exports = function(RED) { } /** - * Test a topic string is valid + * Test a topic string is valid for subscription * @param {string} topic * @returns `true` if it is a valid topic */ function isValidSubscriptionTopic(topic) { - return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic) + return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic); + } + + /** + * Test a topic string is valid for publishing + * @param {string} topic + * @returns `true` if it is a valid topic + */ + function isValidPublishTopic(topic) { + return !/[\+#\b\f\n\r\t\v\0]/.test(topic); } /** @@ -288,7 +297,7 @@ module.exports = function(RED) { //TODO: delete msg.responseTopic - to prevent it being resent? } } - topicOK = topicOK && !/[\+#\b\f\n\r\t\v\0]/.test(msg.topic); + topicOK = topicOK && isValidPublishTopic(msg.topic); if (topicOK) { node.brokerConn.publish(msg, done); // send the message @@ -362,7 +371,7 @@ module.exports = function(RED) { node.brokerConn.connect(function () { done(); }); - }, true) + }) } else { // Without force flag, we will refuse to cycle an active connection done(new Error(RED._('mqtt.errors.invalid-action-alreadyconnected'))); @@ -760,14 +769,11 @@ module.exports = function(RED) { } } }; - node.disconnect = function (callback, force) { - const _callback = function (resetNodeConnectedState, _force) { - setStatusDisconnected(node, true); - if(resetNodeConnectedState || _force) { - node.client.removeAllListeners(); - node.closing = true; - node.connecting = false; - node.connected = false; + + node.disconnect = function (callback) { + const _callback = function () { + if(node.connected || node.connecting) { + setStatusDisconnected(node, true); } if(node.client) { node._clientRemoveListeners(); } node.connecting = false; @@ -797,17 +803,18 @@ module.exports = function(RED) { }; if(node.connected && node.closeMessage) { node.publish(node.closeMessage, function (err) { - node.client.end(endCallBack); - _callback(true, force); + waitEnd(node.client, 2000).then(() => { + _callback(); + }).catch((e) => { + _callback(); + }) }); - } else if(node.connected) { - node.client.end(endCallBack); - _callback(true, force); - } else if(node.connecting) { - node.client.end(); - _callback(true, true); } else { - _callback(false, force); + waitEnd(node.client, 2000).then(() => { + _callback(); + }).catch((e) => { + _callback(); + }) } } node.subscriptionIds = {}; @@ -889,8 +896,18 @@ module.exports = function(RED) { qos: msg.qos || 0, retain: msg.retain || false }; + let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (isValidPublishTopic(msg.topic)); //https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback if(node.options.protocolVersion == 5) { + const bsp = node.serverProperties || {}; + if (msg.userProperties && typeof msg.userProperties !== "object") { + delete msg.userProperties; + } + if (hasProperty(msg, "topicAlias") && !isNaN(Number(msg.topicAlias))) { + msg.topicAlias = parseInt(msg.topicAlias); + } else { + delete msg.topicAlias; + } options.properties = options.properties || {}; setStrProp(msg, options.properties, "responseTopic"); setBufferProp(msg, options.properties, "correlationData"); @@ -900,23 +917,36 @@ module.exports = function(RED) { setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0); setBoolProp(msg, options.properties, "payloadFormatIndicator"); //FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455); - if (options.properties.topicAlias) { - if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") { + + //check & sanitise topic + if (topicOK && options.properties.topicAlias) { + let aliasValid = (bsp.topicAliasMaximum && bsp.topicAliasMaximum >= options.properties.topicAlias); + if (!aliasValid) { done("Invalid topicAlias"); return } if (node.topicAliases[options.properties.topicAlias] === msg.topic) { - msg.topic = "" + msg.topic = ""; } else { - node.topicAliases[options.properties.topicAlias] = msg.topic + node.topicAliases[options.properties.topicAlias] = msg.topic; } + } else if (!msg.topic && options.properties.responseTopic) { + msg.topic = msg.responseTopic; + topicOK = isValidPublishTopic(msg.topic); + delete msg.responseTopic; //prevent responseTopic being resent? } } - node.client.publish(msg.topic, msg.payload, options, function(err) { - done && done(err); - return - }); + if (topicOK) { + node.client.publish(msg.topic, msg.payload, options, function(err) { + done && done(err); + return + }); + } else { + const error = new Error(RED._("mqtt.errors.invalid-topic")); + error.warn = true; + done(error); + } } }; @@ -1130,6 +1160,7 @@ module.exports = function(RED) { node.brokerConn.unsubscribe(node.topic,node.id, removed); } node.brokerConn.deregister(node, done); + node.brokerConn = null; } else { done(); } @@ -1194,6 +1225,7 @@ module.exports = function(RED) { node.on('close', function(done) { if (node.brokerConn) { node.brokerConn.deregister(node,done); + node.brokerConn = null; } else { done(); }