MQTT topic validation and lockup fix

closes #3557
This commit is contained in:
Steve-Mcl 2022-04-29 19:56:37 +01:00
parent 4fb8292618
commit be3b5b7fe2
3 changed files with 151 additions and 88 deletions

View File

@ -452,7 +452,17 @@
} }
return defaultContentType || 'none' return defaultContentType || 'none'
} }
/**
* Test a topic string is valid for publishing
* @param {string} topic
* @returns `true` if it is a valid topic
*/
function validateMQTTPublishTopic(topic) {
if(!topic || topic == "") {
return true;
}
return !/[\+#\b\f\n\r\t\v\0]/.test(topic);
}
RED.nodes.registerType('mqtt-broker',{ RED.nodes.registerType('mqtt-broker',{
category: 'config', category: 'config',
defaults: { defaults: {
@ -487,17 +497,17 @@
label: RED._("node-red:mqtt.label.keepalive"), label: RED._("node-red:mqtt.label.keepalive"),
validate:RED.validators.number(false)}, validate:RED.validators.number(false)},
cleansession: {value: true}, cleansession: {value: true},
birthTopic: {value:""}, birthTopic: {value:"", validate:validateMQTTPublishTopic},
birthQos: {value:"0"}, birthQos: {value:"0"},
birthRetain: {value:false}, birthRetain: {value:false},
birthPayload: {value:""}, birthPayload: {value:""},
birthMsg: { value: {}}, birthMsg: { value: {}},
closeTopic: {value:""}, closeTopic: {value:"", validate:validateMQTTPublishTopic},
closeQos: {value:"0"}, closeQos: {value:"0"},
closeRetain: {value:false}, closeRetain: {value:false},
closePayload: {value:""}, closePayload: {value:""},
closeMsg: { value: {}}, closeMsg: { value: {}},
willTopic: {value:""}, willTopic: {value:"", validate:validateMQTTPublishTopic},
willQos: {value:"0"}, willQos: {value:"0"},
willRetain: {value:false}, willRetain: {value:false},
willPayload: {value:""}, willPayload: {value:""},
@ -856,7 +866,7 @@
category: 'network', category: 'network',
defaults: { defaults: {
name: {value:""}, name: {value:""},
topic: {value:""}, topic: {value:"", validate:validateMQTTPublishTopic},
qos: {value:""}, qos: {value:""},
retain: {value:""}, retain: {value:""},
respTopic: {value:""}, respTopic: {value:""},

View File

@ -91,12 +91,21 @@ module.exports = function(RED) {
} }
/** /**
* Test a topic string is valid * Test a topic string is valid for subscription
* @param {string} topic * @param {string} topic
* @returns `true` if it is a valid topic * @returns `true` if it is a valid topic
*/ */
function isValidSubscriptionTopic(topic) { function isValidSubscriptionTopic(topic) {
return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic) return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic);
}
/**
* Test a topic string is valid for publishing
* @param {string} topic
* @returns `true` if it is a valid topic
*/
function isValidPublishTopic(topic) {
return !/[\+#\b\f\n\r\t\v\0]/.test(topic);
} }
/** /**
@ -340,38 +349,15 @@ module.exports = function(RED) {
msg.messageExpiryInterval = node.messageExpiryInterval; msg.messageExpiryInterval = node.messageExpiryInterval;
} }
} }
if (msg.userProperties && typeof msg.userProperties !== "object") {
delete msg.userProperties;
}
if (hasProperty(msg, "topicAlias") && !isNaN(msg.topicAlias) && (msg.topicAlias === 0 || bsp.topicAliasMaximum === 0 || msg.topicAlias > bsp.topicAliasMaximum)) {
delete msg.topicAlias;
}
if (hasProperty(msg, "payload")) { if (hasProperty(msg, "payload")) {
// send the message
//check & sanitise topic node.brokerConn.publish(msg, function(err) {
let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (msg.topic !== ""); if(err && err.warn) {
node.warn(err);
if (!topicOK && v5) { return;
//NOTE: A value of 0 (in server props topicAliasMaximum) indicates that the Server does not accept any Topic Aliases on this connection
if (hasProperty(msg, "topicAlias") && !isNaN(msg.topicAlias) && msg.topicAlias >= 0 && bsp.topicAliasMaximum && bsp.topicAliasMaximum >= msg.topicAlias) {
topicOK = true;
msg.topic = ""; //must be empty string
} else if (hasProperty(msg, "responseTopic") && (typeof msg.responseTopic === "string") && (msg.responseTopic !== "")) {
//TODO: if topic is empty but responseTopic has a string value, use that instead. Is this desirable?
topicOK = true;
msg.topic = msg.responseTopic;
//TODO: delete msg.responseTopic - to prevent it being resent?
} }
} done(err);
topicOK = topicOK && !/[\+#\b\f\n\r\t\v\0]/.test(msg.topic); });
if (topicOK) {
node.brokerConn.publish(msg, done); // send the message
} else {
node.warn(RED._("mqtt.errors.invalid-topic"));
done();
}
} else { } else {
done(); done();
} }
@ -827,31 +813,46 @@ module.exports = function(RED) {
} }
}; };
node.disconnect = function (callback) { node.disconnect = function (callback) {
const _callback = function (resetNodeConnectedState) { const _callback = function () {
setStatusDisconnected(node, true); if(node.connected || node.connecting) {
if(resetNodeConnectedState) { setStatusDisconnected(node, true);
node.closing = true;
node.connecting = false;
node.connected = false;
} }
if(node.client) { node.client.removeAllListeners(); }
node.connecting = false;
node.connected = false;
callback && typeof callback == "function" && callback(); callback && typeof callback == "function" && callback();
}; };
if(!node.client) { return _callback(); }
if(node.closing) { return _callback(); }
if(node.closing) { let waitEnd = (client, ms) => {
return _callback(false); return new Promise( (resolve, reject) => {
} node.closing = true;
var endCallBack = function endCallBack() { if(!client) {
} resolve();
} else {
const t = setTimeout(reject, ms);
client.end(() => {
clearTimeout(t);
resolve()
});
}
});
};
if(node.connected && node.closeMessage) { if(node.connected && node.closeMessage) {
node.publish(node.closeMessage, function (err) { node.publish(node.closeMessage, function (err) {
node.client.end(endCallBack); waitEnd(node.client, 2000).then(() => {
_callback(true); _callback();
}).catch((e) => {
_callback();
})
}); });
} else if(node.connected) {
node.client.end(endCallBack);
_callback(true);
} else { } else {
_callback(false); waitEnd(node.client, 2000).then(() => {
_callback();
}).catch((e) => {
_callback();
})
} }
} }
node.subscriptionIds = {}; node.subscriptionIds = {};
@ -933,8 +934,18 @@ module.exports = function(RED) {
qos: msg.qos || 0, qos: msg.qos || 0,
retain: msg.retain || false retain: msg.retain || false
}; };
let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (isValidPublishTopic(msg.topic));
//https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback //https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback
if(node.options.protocolVersion == 5) { if(node.options.protocolVersion == 5) {
const bsp = node.serverProperties || {};
if (msg.userProperties && typeof msg.userProperties !== "object") {
delete msg.userProperties;
}
if (hasProperty(msg, "topicAlias") && !isNaN(Number(msg.topicAlias))) {
msg.topicAlias = parseInt(msg.topicAlias);
} else {
delete msg.topicAlias;
}
options.properties = options.properties || {}; options.properties = options.properties || {};
setStrProp(msg, options.properties, "responseTopic"); setStrProp(msg, options.properties, "responseTopic");
setBufferProp(msg, options.properties, "correlationData"); setBufferProp(msg, options.properties, "correlationData");
@ -944,29 +955,46 @@ module.exports = function(RED) {
setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0); setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0);
setBoolProp(msg, options.properties, "payloadFormatIndicator"); setBoolProp(msg, options.properties, "payloadFormatIndicator");
//FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455); //FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455);
if (options.properties.topicAlias) {
if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") { //check & sanitise topic
if (topicOK && options.properties.topicAlias) {
let aliasValid = (bsp.topicAliasMaximum && bsp.topicAliasMaximum >= options.properties.topicAlias);
if (!aliasValid) {
done("Invalid topicAlias"); done("Invalid topicAlias");
return return
} }
if (node.topicAliases[options.properties.topicAlias] === msg.topic) { if (node.topicAliases[options.properties.topicAlias] === msg.topic) {
msg.topic = "" msg.topic = "";
} else { } else {
node.topicAliases[options.properties.topicAlias] = msg.topic node.topicAliases[options.properties.topicAlias] = msg.topic;
} }
} else if (!msg.topic && options.properties.responseTopic) {
msg.topic = msg.responseTopic;
topicOK = isValidPublishTopic(msg.topic);
delete msg.responseTopic; //prevent responseTopic being resent?
} }
} }
node.client.publish(msg.topic, msg.payload, options, function(err) { if (topicOK) {
done && done(err); node.client.publish(msg.topic, msg.payload, options, function(err) {
return done && done(err);
}); return
});
} else {
const error = new Error(RED._("mqtt.errors.invalid-topic"));
error.warn = true;
done(error);
}
} }
}; };
node.on('close', function(done) { node.on('close', function(done) {
node.closing = true; node.disconnect(function() {
node.disconnect(done); if(node.client) {
node.client.removeAllListeners();
}
done();
}, true);
}); });
} }
@ -1143,6 +1171,7 @@ module.exports = function(RED) {
node.brokerConn.unsubscribe(node.topic,node.id, removed); node.brokerConn.unsubscribe(node.topic,node.id, removed);
} }
node.brokerConn.deregister(node, done); node.brokerConn.deregister(node, done);
node.brokerConn = null;
} else { } else {
done(); done();
} }
@ -1207,6 +1236,7 @@ module.exports = function(RED) {
node.on('close', function(done) { node.on('close', function(done) {
if (node.brokerConn) { if (node.brokerConn) {
node.brokerConn.deregister(node,done); node.brokerConn.deregister(node,done);
node.brokerConn = null;
} else { } else {
done(); done();
} }

View File

@ -73,8 +73,7 @@ describe('MQTT Nodes', function () {
done(); done();
}); });
} }
// Conditional test runner (only run if skipTests=false)
/** Conditional test runner (only run if skipTests=false) */
function itConditional(title, test) { function itConditional(title, test) {
return !skipTests ? it(title, test) : it.skip(title, test); return !skipTests ? it(title, test) : it.skip(title, test);
} }
@ -444,7 +443,8 @@ describe('MQTT Nodes', function () {
const mqttOut = helper.getNode("mqtt.out"); const mqttOut = helper.getNode("mqtt.out");
const mqttBroker1 = helper.getNode("mqtt.broker1"); const mqttBroker1 = helper.getNode("mqtt.broker1");
const mqttBroker2 = helper.getNode("mqtt.broker2"); const mqttBroker2 = helper.getNode("mqtt.broker2");
waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() { waitBrokerConnect([mqttBroker1, mqttBroker2])
.then(() => {
//connected - add the on handler and call to disconnect //connected - add the on handler and call to disconnect
helperNode.on("input", function (msg) { helperNode.on("input", function (msg) {
try { try {
@ -458,6 +458,7 @@ describe('MQTT Nodes', function () {
}) })
mqttOut.receive({ "action": "disconnect" });//close broker2 mqttOut.receive({ "action": "disconnect" });//close broker2
}) })
.catch(done);
}); });
}); });
itConditional('should publish will message', function (done) { itConditional('should publish will message', function (done) {
@ -473,7 +474,8 @@ describe('MQTT Nodes', function () {
const helperNode = helper.getNode("helper.node"); const helperNode = helper.getNode("helper.node");
const mqttBroker1 = helper.getNode("mqtt.broker1"); const mqttBroker1 = helper.getNode("mqtt.broker1");
const mqttBroker2 = helper.getNode("mqtt.broker2"); const mqttBroker2 = helper.getNode("mqtt.broker2");
waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() { waitBrokerConnect([mqttBroker1, mqttBroker2])
.then(() => {
//connected - add the on handler and call to disconnect //connected - add the on handler and call to disconnect
helperNode.on("input", function (msg) { helperNode.on("input", function (msg) {
try { try {
@ -487,6 +489,7 @@ describe('MQTT Nodes', function () {
}); });
mqttBroker2.client.end(true); //force closure mqttBroker2.client.end(true); //force closure
}) })
.catch(done);
}); });
}); });
itConditional('should publish will message with V5 properties', function (done) { itConditional('should publish will message with V5 properties', function (done) {
@ -528,7 +531,8 @@ describe('MQTT Nodes', function () {
const helperNode = helper.getNode("helper.node"); const helperNode = helper.getNode("helper.node");
const mqttBroker1 = helper.getNode("mqtt.broker1"); const mqttBroker1 = helper.getNode("mqtt.broker1");
const mqttBroker2 = helper.getNode("mqtt.broker2"); const mqttBroker2 = helper.getNode("mqtt.broker2");
waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() { waitBrokerConnect([mqttBroker1, mqttBroker2])
.then(() => {
//connected - add the on handler and call to disconnect //connected - add the on handler and call to disconnect
helperNode.on("input", function (msg) { helperNode.on("input", function (msg) {
try { try {
@ -540,6 +544,7 @@ describe('MQTT Nodes', function () {
}); });
mqttBroker2.client.end(true); //force closure mqttBroker2.client.end(true); //force closure
}) })
.catch(done);
}); });
}); });
//#endregion ADVANCED TESTS //#endregion ADVANCED TESTS
@ -597,17 +602,24 @@ function testSendRecv(brokerOptions, inNodeOptions, outNodeOptions, options, hoo
} }
}); });
} }
waitBrokerConnect(mqttBroker, function () { waitBrokerConnect(mqttBroker)
.then(() => {
//finally, connected! //finally, connected!
if (hooks.afterConnect) { if (hooks.afterConnect) {
let handled = hooks.afterConnect(helperNode, mqttBroker, mqttIn, mqttOut); let handled = hooks.afterConnect(helperNode, mqttBroker, mqttIn, mqttOut);
if (handled) { return } if (handled) { return }
} }
if (mqttIn.isDynamic) { if(sendMsg.topic) {
mqttIn.receive({ "action": "subscribe", "topic": sendMsg.topic }) if (mqttIn.isDynamic) {
mqttIn.receive({ "action": "subscribe", "topic": sendMsg.topic })
}
mqttOut.receive(sendMsg);
} }
mqttOut.receive(sendMsg);
}) })
.catch((e) => {
if (hooks.done) { hooks.done(e); }
else { throw e; }
});
} catch (err) { } catch (err) {
if (hooks.done) { hooks.done(err); } if (hooks.done) { hooks.done(err); }
else { throw err; } else { throw err; }
@ -743,22 +755,33 @@ function compareMsgToExpected(msg, expectMsg) {
if (hasProperty(expectMsg, "messageExpiryInterval")) { msg.should.have.property("messageExpiryInterval", expectMsg.messageExpiryInterval); } if (hasProperty(expectMsg, "messageExpiryInterval")) { msg.should.have.property("messageExpiryInterval", expectMsg.messageExpiryInterval); }
} }
function waitBrokerConnect(broker, callback, timeLimit) { function waitBrokerConnect(broker, timeLimit) {
timeLimit = timeLimit || 2000;
const brokers = Array.isArray(broker) ? broker : [broker]; let waitConnected = (broker, timeLimit) => {
wait(); const brokers = Array.isArray(broker) ? broker : [broker];
function wait() { timeLimit = timeLimit || 1000;
if (brokers.every(e => e.connected == true)) { let timer, resolved = false;
callback(); //yey - connected! return new Promise( (resolve, reject) => {
} else { timer = wait();
timeLimit = timeLimit - 15; function wait() {
if (timeLimit <= 0) { if (brokers.every(e => e.connected == true)) {
throw new Error("Timeout waiting broker connect") resolved = true;
clearTimeout(timer);
resolve();
} else {
timeLimit = timeLimit - 15;
if (timeLimit <= 0) {
if(!resolved) {
reject("Timeout waiting broker connect")
}
}
timer = setTimeout(wait, 15);
return timer;
}
} }
setTimeout(wait, 15); });
return; };
} return waitConnected(broker, timeLimit);
}
} }
function hasProperty(obj, propName) { function hasProperty(obj, propName) {