diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html index 7e08c8cb6..10acf4857 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html @@ -452,7 +452,17 @@ } return defaultContentType || 'none' } - + /** + * Test a topic string is valid for publishing + * @param {string} topic + * @returns `true` if it is a valid topic + */ + function validateMQTTPublishTopic(topic, opts) { + if(!topic || topic == "" || !/[\+#\b\f\n\r\t\v\0]/.test(topic)) { + return true; + } + return RED._("node-red:mqtt.errors.invalid-topic"); + } RED.nodes.registerType('mqtt-broker',{ category: 'config', defaults: { @@ -487,17 +497,17 @@ label: RED._("node-red:mqtt.label.keepalive"), validate:RED.validators.number(false)}, cleansession: {value: true}, - birthTopic: {value:""}, + birthTopic: {value:"", validate:validateMQTTPublishTopic}, birthQos: {value:"0"}, birthRetain: {value:false}, birthPayload: {value:""}, birthMsg: { value: {}}, - closeTopic: {value:""}, + closeTopic: {value:"", validate:validateMQTTPublishTopic}, closeQos: {value:"0"}, closeRetain: {value:false}, closePayload: {value:""}, closeMsg: { value: {}}, - willTopic: {value:""}, + willTopic: {value:"", validate:validateMQTTPublishTopic}, willQos: {value:"0"}, willRetain: {value:false}, willPayload: {value:""}, @@ -856,7 +866,7 @@ category: 'network', defaults: { name: {value:""}, - topic: {value:""}, + topic: {value:"", validate:validateMQTTPublishTopic}, qos: {value:""}, retain: {value:""}, respTopic: {value:""}, diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 9b365585b..1c0a7c7d9 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -91,12 +91,21 @@ module.exports = function(RED) { } /** - * Test a topic string is valid + * Test a topic string is valid for subscription * @param {string} topic * @returns `true` if it is a valid topic */ function isValidSubscriptionTopic(topic) { - return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic) + return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic); + } + + /** + * Test a topic string is valid for publishing + * @param {string} topic + * @returns `true` if it is a valid topic + */ + function isValidPublishTopic(topic) { + return !/[\+#\b\f\n\r\t\v\0]/.test(topic); } /** @@ -340,38 +349,15 @@ module.exports = function(RED) { msg.messageExpiryInterval = node.messageExpiryInterval; } } - if (msg.userProperties && typeof msg.userProperties !== "object") { - delete msg.userProperties; - } - if (hasProperty(msg, "topicAlias") && !isNaN(msg.topicAlias) && (msg.topicAlias === 0 || bsp.topicAliasMaximum === 0 || msg.topicAlias > bsp.topicAliasMaximum)) { - delete msg.topicAlias; - } - if (hasProperty(msg, "payload")) { - - //check & sanitise topic - let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (msg.topic !== ""); - - if (!topicOK && v5) { - //NOTE: A value of 0 (in server props topicAliasMaximum) indicates that the Server does not accept any Topic Aliases on this connection - if (hasProperty(msg, "topicAlias") && !isNaN(msg.topicAlias) && msg.topicAlias >= 0 && bsp.topicAliasMaximum && bsp.topicAliasMaximum >= msg.topicAlias) { - topicOK = true; - msg.topic = ""; //must be empty string - } else if (hasProperty(msg, "responseTopic") && (typeof msg.responseTopic === "string") && (msg.responseTopic !== "")) { - //TODO: if topic is empty but responseTopic has a string value, use that instead. Is this desirable? - topicOK = true; - msg.topic = msg.responseTopic; - //TODO: delete msg.responseTopic - to prevent it being resent? + // send the message + node.brokerConn.publish(msg, function(err) { + if(err && err.warn) { + node.warn(err); + return; } - } - topicOK = topicOK && !/[\+#\b\f\n\r\t\v\0]/.test(msg.topic); - - if (topicOK) { - node.brokerConn.publish(msg, done); // send the message - } else { - node.warn(RED._("mqtt.errors.invalid-topic")); - done(); - } + done(err); + }); } else { done(); } @@ -827,31 +813,46 @@ module.exports = function(RED) { } }; node.disconnect = function (callback) { - const _callback = function (resetNodeConnectedState) { - setStatusDisconnected(node, true); - if(resetNodeConnectedState) { - node.closing = true; - node.connecting = false; - node.connected = false; + const _callback = function () { + if(node.connected || node.connecting) { + setStatusDisconnected(node, true); } + if(node.client) { node.client.removeAllListeners(); } + node.connecting = false; + node.connected = false; callback && typeof callback == "function" && callback(); }; + if(!node.client) { return _callback(); } + if(node.closing) { return _callback(); } - if(node.closing) { - return _callback(false); - } - var endCallBack = function endCallBack() { - } + let waitEnd = (client, ms) => { + return new Promise( (resolve, reject) => { + node.closing = true; + if(!client) { + resolve(); + } else { + const t = setTimeout(reject, ms); + client.end(() => { + clearTimeout(t); + resolve() + }); + } + }); + }; if(node.connected && node.closeMessage) { node.publish(node.closeMessage, function (err) { - node.client.end(endCallBack); - _callback(true); + waitEnd(node.client, 2000).then(() => { + _callback(); + }).catch((e) => { + _callback(); + }) }); - } else if(node.connected) { - node.client.end(endCallBack); - _callback(true); } else { - _callback(false); + waitEnd(node.client, 2000).then(() => { + _callback(); + }).catch((e) => { + _callback(); + }) } } node.subscriptionIds = {}; @@ -933,8 +934,18 @@ module.exports = function(RED) { qos: msg.qos || 0, retain: msg.retain || false }; + let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (isValidPublishTopic(msg.topic)); //https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback if(node.options.protocolVersion == 5) { + const bsp = node.serverProperties || {}; + if (msg.userProperties && typeof msg.userProperties !== "object") { + delete msg.userProperties; + } + if (hasProperty(msg, "topicAlias") && !isNaN(Number(msg.topicAlias))) { + msg.topicAlias = parseInt(msg.topicAlias); + } else { + delete msg.topicAlias; + } options.properties = options.properties || {}; setStrProp(msg, options.properties, "responseTopic"); setBufferProp(msg, options.properties, "correlationData"); @@ -944,29 +955,46 @@ module.exports = function(RED) { setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0); setBoolProp(msg, options.properties, "payloadFormatIndicator"); //FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455); - if (options.properties.topicAlias) { - if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") { + + //check & sanitise topic + if (topicOK && options.properties.topicAlias) { + let aliasValid = (bsp.topicAliasMaximum && bsp.topicAliasMaximum >= options.properties.topicAlias); + if (!aliasValid) { done("Invalid topicAlias"); return } if (node.topicAliases[options.properties.topicAlias] === msg.topic) { - msg.topic = "" + msg.topic = ""; } else { - node.topicAliases[options.properties.topicAlias] = msg.topic + node.topicAliases[options.properties.topicAlias] = msg.topic; } + } else if (!msg.topic && options.properties.responseTopic) { + msg.topic = msg.responseTopic; + topicOK = isValidPublishTopic(msg.topic); + delete msg.responseTopic; //prevent responseTopic being resent? } } - node.client.publish(msg.topic, msg.payload, options, function(err) { - done && done(err); - return - }); + if (topicOK) { + node.client.publish(msg.topic, msg.payload, options, function(err) { + done && done(err); + return + }); + } else { + const error = new Error(RED._("mqtt.errors.invalid-topic")); + error.warn = true; + done(error); + } } }; node.on('close', function(done) { - node.closing = true; - node.disconnect(done); + node.disconnect(function() { + if(node.client) { + node.client.removeAllListeners(); + } + done(); + }, true); }); } @@ -1143,6 +1171,7 @@ module.exports = function(RED) { node.brokerConn.unsubscribe(node.topic,node.id, removed); } node.brokerConn.deregister(node, done); + node.brokerConn = null; } else { done(); } @@ -1207,6 +1236,7 @@ module.exports = function(RED) { node.on('close', function(done) { if (node.brokerConn) { node.brokerConn.deregister(node,done); + node.brokerConn = null; } else { done(); } diff --git a/test/nodes/core/network/21-mqtt_spec.js b/test/nodes/core/network/21-mqtt_spec.js index 8d2ef3ed9..655cc9c73 100644 --- a/test/nodes/core/network/21-mqtt_spec.js +++ b/test/nodes/core/network/21-mqtt_spec.js @@ -73,8 +73,7 @@ describe('MQTT Nodes', function () { done(); }); } - - /** Conditional test runner (only run if skipTests=false) */ + // Conditional test runner (only run if skipTests=false) function itConditional(title, test) { return !skipTests ? it(title, test) : it.skip(title, test); } @@ -444,7 +443,8 @@ describe('MQTT Nodes', function () { const mqttOut = helper.getNode("mqtt.out"); const mqttBroker1 = helper.getNode("mqtt.broker1"); const mqttBroker2 = helper.getNode("mqtt.broker2"); - waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() { + waitBrokerConnect([mqttBroker1, mqttBroker2]) + .then(() => { //connected - add the on handler and call to disconnect helperNode.on("input", function (msg) { try { @@ -458,6 +458,7 @@ describe('MQTT Nodes', function () { }) mqttOut.receive({ "action": "disconnect" });//close broker2 }) + .catch(done); }); }); itConditional('should publish will message', function (done) { @@ -473,7 +474,8 @@ describe('MQTT Nodes', function () { const helperNode = helper.getNode("helper.node"); const mqttBroker1 = helper.getNode("mqtt.broker1"); const mqttBroker2 = helper.getNode("mqtt.broker2"); - waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() { + waitBrokerConnect([mqttBroker1, mqttBroker2]) + .then(() => { //connected - add the on handler and call to disconnect helperNode.on("input", function (msg) { try { @@ -487,6 +489,7 @@ describe('MQTT Nodes', function () { }); mqttBroker2.client.end(true); //force closure }) + .catch(done); }); }); itConditional('should publish will message with V5 properties', function (done) { @@ -528,7 +531,8 @@ describe('MQTT Nodes', function () { const helperNode = helper.getNode("helper.node"); const mqttBroker1 = helper.getNode("mqtt.broker1"); const mqttBroker2 = helper.getNode("mqtt.broker2"); - waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() { + waitBrokerConnect([mqttBroker1, mqttBroker2]) + .then(() => { //connected - add the on handler and call to disconnect helperNode.on("input", function (msg) { try { @@ -540,6 +544,7 @@ describe('MQTT Nodes', function () { }); mqttBroker2.client.end(true); //force closure }) + .catch(done); }); }); //#endregion ADVANCED TESTS @@ -597,17 +602,24 @@ function testSendRecv(brokerOptions, inNodeOptions, outNodeOptions, options, hoo } }); } - waitBrokerConnect(mqttBroker, function () { + waitBrokerConnect(mqttBroker) + .then(() => { //finally, connected! if (hooks.afterConnect) { let handled = hooks.afterConnect(helperNode, mqttBroker, mqttIn, mqttOut); if (handled) { return } } - if (mqttIn.isDynamic) { - mqttIn.receive({ "action": "subscribe", "topic": sendMsg.topic }) + if(sendMsg.topic) { + if (mqttIn.isDynamic) { + mqttIn.receive({ "action": "subscribe", "topic": sendMsg.topic }) + } + mqttOut.receive(sendMsg); } - mqttOut.receive(sendMsg); }) + .catch((e) => { + if (hooks.done) { hooks.done(e); } + else { throw e; } + }); } catch (err) { if (hooks.done) { hooks.done(err); } else { throw err; } @@ -743,22 +755,33 @@ function compareMsgToExpected(msg, expectMsg) { if (hasProperty(expectMsg, "messageExpiryInterval")) { msg.should.have.property("messageExpiryInterval", expectMsg.messageExpiryInterval); } } -function waitBrokerConnect(broker, callback, timeLimit) { - timeLimit = timeLimit || 2000; - const brokers = Array.isArray(broker) ? broker : [broker]; - wait(); - function wait() { - if (brokers.every(e => e.connected == true)) { - callback(); //yey - connected! - } else { - timeLimit = timeLimit - 15; - if (timeLimit <= 0) { - throw new Error("Timeout waiting broker connect") +function waitBrokerConnect(broker, timeLimit) { + + let waitConnected = (broker, timeLimit) => { + const brokers = Array.isArray(broker) ? broker : [broker]; + timeLimit = timeLimit || 1000; + let timer, resolved = false; + return new Promise( (resolve, reject) => { + timer = wait(); + function wait() { + if (brokers.every(e => e.connected == true)) { + resolved = true; + clearTimeout(timer); + resolve(); + } else { + timeLimit = timeLimit - 15; + if (timeLimit <= 0) { + if(!resolved) { + reject("Timeout waiting broker connect") + } + } + timer = setTimeout(wait, 15); + return timer; + } } - setTimeout(wait, 15); - return; - } - } + }); + }; + return waitConnected(broker, timeLimit); } function hasProperty(obj, propName) {