1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

add changes from #3563

Add client and Runtime MQTT topic validation and fix subsequent connection lockup
(that arises due to bad birth/will topic)
backport-2.x
This commit is contained in:
Steve-Mcl 2022-06-13 12:57:22 +01:00
parent 8133c5e834
commit 933cad888f

View File

@ -68,12 +68,21 @@ module.exports = function(RED) {
} }
/** /**
* Test a topic string is valid * Test a topic string is valid for subscription
* @param {string} topic * @param {string} topic
* @returns `true` if it is a valid topic * @returns `true` if it is a valid topic
*/ */
function isValidSubscriptionTopic(topic) { function isValidSubscriptionTopic(topic) {
return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic) return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic);
}
/**
* Test a topic string is valid for publishing
* @param {string} topic
* @returns `true` if it is a valid topic
*/
function isValidPublishTopic(topic) {
return !/[\+#\b\f\n\r\t\v\0]/.test(topic);
} }
/** /**
@ -288,7 +297,7 @@ module.exports = function(RED) {
//TODO: delete msg.responseTopic - to prevent it being resent? //TODO: delete msg.responseTopic - to prevent it being resent?
} }
} }
topicOK = topicOK && !/[\+#\b\f\n\r\t\v\0]/.test(msg.topic); topicOK = topicOK && isValidPublishTopic(msg.topic);
if (topicOK) { if (topicOK) {
node.brokerConn.publish(msg, done); // send the message node.brokerConn.publish(msg, done); // send the message
@ -362,7 +371,7 @@ module.exports = function(RED) {
node.brokerConn.connect(function () { node.brokerConn.connect(function () {
done(); done();
}); });
}, true) })
} else { } else {
// Without force flag, we will refuse to cycle an active connection // Without force flag, we will refuse to cycle an active connection
done(new Error(RED._('mqtt.errors.invalid-action-alreadyconnected'))); done(new Error(RED._('mqtt.errors.invalid-action-alreadyconnected')));
@ -760,14 +769,11 @@ module.exports = function(RED) {
} }
} }
}; };
node.disconnect = function (callback, force) {
const _callback = function (resetNodeConnectedState, _force) { node.disconnect = function (callback) {
setStatusDisconnected(node, true); const _callback = function () {
if(resetNodeConnectedState || _force) { if(node.connected || node.connecting) {
node.client.removeAllListeners(); setStatusDisconnected(node, true);
node.closing = true;
node.connecting = false;
node.connected = false;
} }
if(node.client) { node._clientRemoveListeners(); } if(node.client) { node._clientRemoveListeners(); }
node.connecting = false; node.connecting = false;
@ -797,17 +803,18 @@ module.exports = function(RED) {
}; };
if(node.connected && node.closeMessage) { if(node.connected && node.closeMessage) {
node.publish(node.closeMessage, function (err) { node.publish(node.closeMessage, function (err) {
node.client.end(endCallBack); waitEnd(node.client, 2000).then(() => {
_callback(true, force); _callback();
}).catch((e) => {
_callback();
})
}); });
} else if(node.connected) {
node.client.end(endCallBack);
_callback(true, force);
} else if(node.connecting) {
node.client.end();
_callback(true, true);
} else { } else {
_callback(false, force); waitEnd(node.client, 2000).then(() => {
_callback();
}).catch((e) => {
_callback();
})
} }
} }
node.subscriptionIds = {}; node.subscriptionIds = {};
@ -889,8 +896,18 @@ module.exports = function(RED) {
qos: msg.qos || 0, qos: msg.qos || 0,
retain: msg.retain || false retain: msg.retain || false
}; };
let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (isValidPublishTopic(msg.topic));
//https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback //https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback
if(node.options.protocolVersion == 5) { if(node.options.protocolVersion == 5) {
const bsp = node.serverProperties || {};
if (msg.userProperties && typeof msg.userProperties !== "object") {
delete msg.userProperties;
}
if (hasProperty(msg, "topicAlias") && !isNaN(Number(msg.topicAlias))) {
msg.topicAlias = parseInt(msg.topicAlias);
} else {
delete msg.topicAlias;
}
options.properties = options.properties || {}; options.properties = options.properties || {};
setStrProp(msg, options.properties, "responseTopic"); setStrProp(msg, options.properties, "responseTopic");
setBufferProp(msg, options.properties, "correlationData"); setBufferProp(msg, options.properties, "correlationData");
@ -900,23 +917,36 @@ module.exports = function(RED) {
setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0); setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0);
setBoolProp(msg, options.properties, "payloadFormatIndicator"); setBoolProp(msg, options.properties, "payloadFormatIndicator");
//FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455); //FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455);
if (options.properties.topicAlias) {
if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") { //check & sanitise topic
if (topicOK && options.properties.topicAlias) {
let aliasValid = (bsp.topicAliasMaximum && bsp.topicAliasMaximum >= options.properties.topicAlias);
if (!aliasValid) {
done("Invalid topicAlias"); done("Invalid topicAlias");
return return
} }
if (node.topicAliases[options.properties.topicAlias] === msg.topic) { if (node.topicAliases[options.properties.topicAlias] === msg.topic) {
msg.topic = "" msg.topic = "";
} else { } else {
node.topicAliases[options.properties.topicAlias] = msg.topic node.topicAliases[options.properties.topicAlias] = msg.topic;
} }
} else if (!msg.topic && options.properties.responseTopic) {
msg.topic = msg.responseTopic;
topicOK = isValidPublishTopic(msg.topic);
delete msg.responseTopic; //prevent responseTopic being resent?
} }
} }
node.client.publish(msg.topic, msg.payload, options, function(err) { if (topicOK) {
done && done(err); node.client.publish(msg.topic, msg.payload, options, function(err) {
return done && done(err);
}); return
});
} else {
const error = new Error(RED._("mqtt.errors.invalid-topic"));
error.warn = true;
done(error);
}
} }
}; };
@ -1130,6 +1160,7 @@ module.exports = function(RED) {
node.brokerConn.unsubscribe(node.topic,node.id, removed); node.brokerConn.unsubscribe(node.topic,node.id, removed);
} }
node.brokerConn.deregister(node, done); node.brokerConn.deregister(node, done);
node.brokerConn = null;
} else { } else {
done(); done();
} }
@ -1194,6 +1225,7 @@ module.exports = function(RED) {
node.on('close', function(done) { node.on('close', function(done) {
if (node.brokerConn) { if (node.brokerConn) {
node.brokerConn.deregister(node,done); node.brokerConn.deregister(node,done);
node.brokerConn = null;
} else { } else {
done(); done();
} }