1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Merge pull request #4190 from Steve-Mcl/fix-mqtt-keep-subscription-4132

Fix new feature "mqtt keep subscription"
This commit is contained in:
Nick O'Leary 2023-05-26 13:31:15 +01:00 committed by GitHub
commit dfe145a0ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -219,8 +219,10 @@ module.exports = function(RED) {
* Handle the payload / packet recieved in MQTT In and MQTT Sub nodes * Handle the payload / packet recieved in MQTT In and MQTT Sub nodes
*/ */
function subscriptionHandler(node, datatype ,topic, payload, packet) { function subscriptionHandler(node, datatype ,topic, payload, packet) {
const v5 = node.brokerConn.options && node.brokerConn.options.protocolVersion == 5; const msg = {topic:topic, payload:null, qos:packet.qos, retain:packet.retain};
var msg = {topic:topic, payload:null, qos:packet.qos, retain:packet.retain}; const v5 = (node && node.brokerConn)
? node.brokerConn.v5()
: Object.prototype.hasOwnProperty.call(packet, "properties");
if(v5 && packet.properties) { if(v5 && packet.properties) {
setStrProp(packet.properties, msg, "responseTopic"); setStrProp(packet.properties, msg, "responseTopic");
setBufferProp(packet.properties, msg, "correlationData"); setBufferProp(packet.properties, msg, "correlationData");
@ -300,7 +302,7 @@ module.exports = function(RED) {
//} //}
} }
msg.payload = payload; msg.payload = payload;
if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) { if (node.brokerConn && (node.brokerConn.broker === "localhost" || node.brokerConn.broker === "127.0.0.1")) {
msg._topic = topic; msg._topic = topic;
} }
node.send(msg); node.send(msg);
@ -412,6 +414,12 @@ module.exports = function(RED) {
} }
} }
/**
* Perform the connect action
* @param {MQTTInNode|MQTTOutNode} node
* @param {Object} msg
* @param {Function} done
*/
function handleConnectAction(node, msg, done) { function handleConnectAction(node, msg, done) {
let actionData = typeof msg.broker === 'object' ? msg.broker : null; let actionData = typeof msg.broker === 'object' ? msg.broker : null;
if (node.brokerConn.canConnect()) { if (node.brokerConn.canConnect()) {
@ -442,12 +450,17 @@ module.exports = function(RED) {
} }
} }
/**
* Perform the disconnect action
* @param {MQTTInNode|MQTTOutNode} node
* @param {Function} done
*/
function handleDisconnectAction(node, done) { function handleDisconnectAction(node, done) {
node.brokerConn.disconnect(function () { node.brokerConn.disconnect(function () {
done(); done();
}); });
} }
const unsubscribeCandidates = {}
//#endregion "Supporting functions" //#endregion "Supporting functions"
//#region "Broker node" //#region "Broker node"
@ -591,10 +604,9 @@ module.exports = function(RED) {
if (typeof node.cleansession === 'undefined') { if (typeof node.cleansession === 'undefined') {
node.cleansession = true; node.cleansession = true;
} }
if (typeof node.autoUnsubscribe === 'undefined') { if (typeof node.autoUnsubscribe !== 'boolean') {
node.autoUnsubscribe = true; node.autoUnsubscribe = true;
} }
//use url or build a url from usetls://broker:port //use url or build a url from usetls://broker:port
if (node.url && node.brokerurl !== node.url) { if (node.url && node.brokerurl !== node.url) {
node.brokerurl = node.url; node.brokerurl = node.url;
@ -664,7 +676,6 @@ module.exports = function(RED) {
node.options.password = node.password; node.options.password = node.password;
node.options.keepalive = node.keepalive; node.options.keepalive = node.keepalive;
node.options.clean = node.cleansession; node.options.clean = node.cleansession;
node.options.autoUnsubscribe = node.autoUnsubscribe;
node.options.clientId = node.clientid || 'nodered_' + RED.util.generateId(); node.options.clientId = node.clientid || 'nodered_' + RED.util.generateId();
node.options.reconnectPeriod = RED.settings.mqttReconnectTime||5000; node.options.reconnectPeriod = RED.settings.mqttReconnectTime||5000;
delete node.options.protocolId; //V4+ default delete node.options.protocolId; //V4+ default
@ -785,18 +796,11 @@ module.exports = function(RED) {
// Re-subscribe to stored topics // Re-subscribe to stored topics
for (var s in node.subscriptions) { for (var s in node.subscriptions) {
if (node.subscriptions.hasOwnProperty(s)) { if (node.subscriptions.hasOwnProperty(s)) {
let topic = s;
let qos = 0;
let _options = {};
for (var r in node.subscriptions[s]) { for (var r in node.subscriptions[s]) {
if (node.subscriptions[s].hasOwnProperty(r)) { if (node.subscriptions[s].hasOwnProperty(r)) {
qos = Math.max(qos,node.subscriptions[s][r].qos); node.subscribe(node.subscriptions[s][r])
_options = node.subscriptions[s][r].options;
node._clientOn('message',node.subscriptions[s][r].handler);
} }
} }
_options.qos = _options.qos || qos;
node.client.subscribe(topic, _options);
} }
} }
@ -858,22 +862,28 @@ module.exports = function(RED) {
if(!node.client) { return _callback(); } if(!node.client) { return _callback(); }
if(node.closing) { return _callback(); } if(node.closing) { return _callback(); }
/**
* Call end and wait for the client to end (or timeout)
* @param {mqtt.MqttClient} client The broker client
* @param {number} ms The time to wait for the client to end
* @returns
*/
let waitEnd = (client, ms) => { let waitEnd = (client, ms) => {
return new Promise( (resolve, reject) => { return new Promise( (resolve, reject) => {
node.closing = true; node.closing = true;
if(!client) { if (!client) {
resolve(); resolve();
} else { } else {
const t = setTimeout(() => { const t = setTimeout(() => {
//clean end() has exceeded WAIT_END, lets force end! //clean end() has exceeded WAIT_END, lets force end!
client && client.end(true); client && client.end(true);
reject(); resolve();
}, ms); }, ms);
client.end(() => { client.end(() => {
clearTimeout(t); clearTimeout(t);
resolve() resolve()
}); });
} }
}); });
}; };
if(node.connected && node.closeMessage) { if(node.connected && node.closeMessage) {
@ -894,69 +904,222 @@ module.exports = function(RED) {
} }
node.subscriptionIds = {}; node.subscriptionIds = {};
node.subid = 1; node.subid = 1;
node.subscribe = function (topic,options,callback,ref) {
ref = ref||0; //typedef for subscription object:
var qos; /**
if(typeof options == "object") { * @typedef {Object} Subscription
qos = options.qos; * @property {String} topic - topic to subscribe to
} else { * @property {Object} [options] - options object
qos = options; * @property {Number} [options.qos] - quality of service
options = {}; * @property {Number} [options.nl] - no local
* @property {Number} [options.rap] - retain as published
* @property {Number} [options.rh] - retain handling
* @property {Number} [options.properties] - MQTT 5.0 properties
* @property {Number} [options.properties.subscriptionIdentifier] - MQTT 5.0 subscription identifier
* @property {Number} [options.properties.userProperties] - MQTT 5.0 user properties
* @property {Function} callback
* @property {String} ref - reference to the node that created the subscription
*/
/**
* Create a subscription object
* @param {String} _topic - topic to subscribe to
* @param {Object} _options - options object
* @param {String} _ref - reference to the node that created the subscription
* @returns {Subscription}
*/
function createSubscriptionObject(_topic, _options, _ref, _brokerId) {
/** @type {Subscription} */
const subscription = {};
const ref = _ref || 0;
let options
let qos = 1 // default to QoS 1 (AWS and several other brokers don't support QoS 2)
// if options is an object, then clone it
if (typeof _options == "object") {
options = RED.util.cloneMessage(_options || {})
qos = _options.qos;
} else if (typeof _options == "number") {
qos = _options;
} }
options.qos = qos; options = options || {};
// sanitise qos
if (typeof qos === "number" && qos >= 0 && qos <= 2) {
options.qos = qos;
}
subscription.topic = _topic;
subscription.qos = qos;
subscription.options = RED.util.cloneMessage(options);
subscription.ref = ref;
subscription.brokerId = _brokerId;
return subscription;
}
/**
* If topic is a subscription object, then use that, otherwise look up the topic in
* the subscriptions object. If the topic is not found, then create a new subscription
* object and add it to the subscriptions object.
* @param {Subscription|String} topic
* @param {*} options
* @param {*} callback
* @param {*} ref
*/
node.subscribe = function (topic, options, callback, ref) {
/** @type {Subscription} */
let subscription
let doCompare = false
let changesFound = false
// function signature 1: subscribe(subscription: Subscription)
if (typeof topic === "object" && topic !== null) {
subscription = topic
topic = subscription.topic
options = subscription.options
ref = subscription.ref
callback = subscription.callback
}
// function signature 2: subscribe(topic: String, options: Object, callback: Function, ref: String)
else if (typeof topic === "string") {
// since this is a call where all params are provided, it might be
// a node change (modification) so we need to check for changes
doCompare = true
subscription = node.subscriptions[topic] && node.subscriptions[topic][ref]
}
// bad function call
else {
console.warn('Invalid call to node.subscribe')
return
}
const thisBrokerId = node.type === 'mqtt-broker' ? node.id : node.broker
// unsubscribe topics where the broker has changed
const oldBrokerSubs = (unsubscribeCandidates[ref] || []).filter(sub => sub.brokerId !== thisBrokerId)
oldBrokerSubs.forEach(sub => {
/** @type {MQTTBrokerNode} */
const _brokerConn = RED.nodes.getNode(sub.brokerId)
if (_brokerConn) {
_brokerConn.unsubscribe(sub.topic, sub.ref, true)
}
})
// if subscription is found (or sent in as a parameter), then check for changes.
// if there are any changes requested, tidy up the old subscription
if (subscription) {
if (doCompare) {
// compare the current sub to the passed in parameters. Use RED.util.compareObjects against
// only the minimal set of properties to identify if the subscription has changed
const currentSubscription = createSubscriptionObject(subscription.topic, subscription.options, subscription.ref)
const newSubscription = createSubscriptionObject(topic, options, ref)
changesFound = RED.util.compareObjects(currentSubscription, newSubscription) === false
}
}
if (changesFound) {
if (subscription.handler) {
node._clientRemoveListeners('message', subscription.handler)
subscription.handler = null
}
const _brokerConn = RED.nodes.getNode(subscription.brokerId)
if (_brokerConn) {
_brokerConn.unsubscribe(subscription.topic, subscription.ref, true)
}
}
// clean up the unsubscribe candidate list
delete unsubscribeCandidates[ref]
// determine if this is an existing subscription
const existingSubscription = typeof subscription === "object" && subscription !== null
// if existing subscription is not found or has changed, create a new subscription object
if (existingSubscription === false || changesFound) {
subscription = createSubscriptionObject(topic, options, ref, node.id)
}
// setup remainder of subscription properties and event handling
node.subscriptions[topic] = node.subscriptions[topic] || {};
node.subscriptions[topic][ref] = subscription
if (!node.subscriptionIds[topic]) { if (!node.subscriptionIds[topic]) {
node.subscriptionIds[topic] = node.subid++; node.subscriptionIds[topic] = node.subid++;
} }
options.properties = options.properties || {}; subscription.options = subscription.options || {};
options.properties.subscriptionIdentifier = node.subscriptionIds[topic]; subscription.options.properties = options.properties || {};
subscription.options.properties.subscriptionIdentifier = node.subscriptionIds[topic];
subscription.callback = callback;
node.subscriptions[topic] = node.subscriptions[topic]||{}; // if the client is connected, then setup the handler and subscribe
var sub = {
topic:topic,
qos:qos,
options:options,
handler:function(mtopic,mpayload, mpacket) {
if(mpacket.properties && options.properties && mpacket.properties.subscriptionIdentifier && options.properties.subscriptionIdentifier && (mpacket.properties.subscriptionIdentifier !== options.properties.subscriptionIdentifier) ) {
//do nothing as subscriptionIdentifier does not match
} else if (matchTopic(topic,mtopic)) {
callback(mtopic,mpayload, mpacket);
}
},
ref: ref
};
node.subscriptions[topic][ref] = sub;
if (node.connected) { if (node.connected) {
const subIdsAvailable = node.subscriptionIdentifiersAvailable() const subIdsAvailable = node.subscriptionIdentifiersAvailable()
node._clientOn('message',sub.handler);
// if the broker doesn't support subscription identifiers (e.g. AWS core), then don't send them
if (options.properties && options.properties.subscriptionIdentifier && subIdsAvailable !== true) {
delete options.properties.subscriptionIdentifier
}
node.client.subscribe(topic, options);
}
};
node.unsubscribe = function (topic, ref, removed) { if (!subscription.handler) {
ref = ref||0; subscription.handler = function (mtopic, mpayload, mpacket) {
var sub = node.subscriptions[topic]; const sops = subscription.options ? subscription.options.properties : {}
if (sub) { const pops = mpacket.properties || {}
if (sub[ref]) { if (subIdsAvailable && pops.subscriptionIdentifier && sops.subscriptionIdentifier && (pops.subscriptionIdentifier !== sops.subscriptionIdentifier)) {
if(node.client) { //do nothing as subscriptionIdentifier does not match
node._clientRemoveListeners('message',sub[ref].handler); } else if (matchTopic(topic, mtopic)) {
} subscription.callback && subscription.callback(mtopic, mpayload, mpacket)
delete sub[ref];
}
//TODO: Review. The `if(removed)` was commented out to always delete and remove subscriptions.
// if we dont then property changes dont get applied and old subs still trigger
//if (removed) {
if (Object.keys(sub).length === 0) {
delete node.subscriptions[topic];
delete node.subscriptionIds[topic];
if (node.connected) {
node.client.unsubscribe(topic);
} }
} }
//} }
node._clientOn('message', subscription.handler)
// if the broker doesn't support subscription identifiers, then don't send them (AWS support)
if (subscription.options.properties && subscription.options.properties.subscriptionIdentifier && subIdsAvailable !== true) {
delete subscription.options.properties.subscriptionIdentifier
}
node.client.subscribe(topic, subscription.options)
}
}
node.unsubscribe = function (topic, ref, removeClientSubscription) {
ref = ref||0;
const unsub = removeClientSubscription || node.autoUnsubscribe !== false
const sub = node.subscriptions[topic];
let brokerId = node.id
if (sub) {
if (sub[ref]) {
brokerId = sub[ref].brokerId || brokerId
if(node.client && sub[ref].handler) {
node._clientRemoveListeners('message', sub[ref].handler);
sub[ref].handler = null
}
if (unsub) {
delete sub[ref]
}
}
// if instructed to remove the actual MQTT client subscription
if (unsub) {
// if there are no more subscriptions for the topic, then remove the topic
if (Object.keys(sub).length === 0) {
try {
node.client.unsubscribe(topic)
} catch (_err) {
// do nothing
} finally {
// remove unsubscribe candidate as it is now REALLY unsubscribed
delete node.subscriptions[topic];
delete node.subscriptionIds[topic];
if (unsubscribeCandidates[ref]) {
unsubscribeCandidates[ref] = unsubscribeCandidates[ref].filter(sub => sub.topic !== topic)
}
}
}
} else {
// if instructed to not remove the client subscription, then add it to the candidate list
// of subscriptions to be removed when the the same ref is used in a subsequent subscribe
// and the topic has changed
unsubscribeCandidates[ref] = unsubscribeCandidates[ref] || [];
unsubscribeCandidates[ref].push({
topic: topic,
ref: ref,
brokerId: brokerId
})
}
} }
}; };
node.topicAliases = {}; node.topicAliases = {};
@ -994,7 +1157,7 @@ module.exports = function(RED) {
setStrProp(msg, options.properties, "contentType"); setStrProp(msg, options.properties, "contentType");
setIntProp(msg, options.properties, "messageExpiryInterval", 0); setIntProp(msg, options.properties, "messageExpiryInterval", 0);
setUserProperties(msg.userProperties, options.properties); setUserProperties(msg.userProperties, options.properties);
setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0); setIntProp(msg, options.properties, "topicAlias", 1, bsp.topicAliasMaximum || 0);
setBoolProp(msg, options.properties, "payloadFormatIndicator"); setBoolProp(msg, options.properties, "payloadFormatIndicator");
//FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455); //FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455);
@ -1130,7 +1293,7 @@ module.exports = function(RED) {
if(node.rap === "true" || node.rap === true) options.rap = true; if(node.rap === "true" || node.rap === true) options.rap = true;
else if(node.rap === "false" || node.rap === false) options.rap = false; else if(node.rap === "false" || node.rap === false) options.rap = false;
} }
node._topic = node.topic; // store the original topic incase node is later changed
node.brokerConn.subscribe(node.topic,options,function(topic, payload, packet) { node.brokerConn.subscribe(node.topic,options,function(topic, payload, packet) {
subscriptionHandler(node, node.datatype, topic, payload, packet); subscriptionHandler(node, node.datatype, topic, payload, packet);
},node.id); },node.id);
@ -1183,7 +1346,7 @@ module.exports = function(RED) {
} }
if (action === Actions.UNSUBSCRIBE) { if (action === Actions.UNSUBSCRIBE) {
subscriptions.forEach(function (sub) { subscriptions.forEach(function (sub) {
node.brokerConn.unsubscribe(sub.topic, node.id); node.brokerConn.unsubscribe(sub.topic, node.id, true);
delete node.dynamicSubs[sub.topic]; delete node.dynamicSubs[sub.topic];
}) })
//user can access current subscriptions through the complete node is so desired //user can access current subscriptions through the complete node is so desired
@ -1193,7 +1356,7 @@ module.exports = function(RED) {
subscriptions.forEach(function (sub) { subscriptions.forEach(function (sub) {
//always unsubscribe before subscribe to prevent multiple subs to same topic //always unsubscribe before subscribe to prevent multiple subs to same topic
if (node.dynamicSubs[sub.topic]) { if (node.dynamicSubs[sub.topic]) {
node.brokerConn.unsubscribe(sub.topic, node.id); node.brokerConn.unsubscribe(sub.topic, node.id, true);
delete node.dynamicSubs[sub.topic]; delete node.dynamicSubs[sub.topic];
} }
@ -1239,16 +1402,12 @@ module.exports = function(RED) {
node.on('close', function(removed, done) { node.on('close', function(removed, done) {
if (node.brokerConn) { if (node.brokerConn) {
if(node.isDynamic) { if(node.isDynamic) {
if (node.brokerConn.options.autoUnsubscribe) { Object.keys(node.dynamicSubs).forEach(function (topic) {
Object.keys(node.dynamicSubs).forEach(function (topic) { node.brokerConn.unsubscribe(topic, node.id, removed);
node.brokerConn.unsubscribe(topic, node.id, removed); });
}); node.dynamicSubs = {};
node.dynamicSubs = {};
}
} else { } else {
if (node.brokerConn.options.autoUnsubscribe) { node.brokerConn.unsubscribe(node.topic, node.id, removed);
node.brokerConn.unsubscribe(node.topic, node.id, removed);
}
} }
node.brokerConn.deregister(node, done, removed); node.brokerConn.deregister(node, done, removed);
node.brokerConn = null; node.brokerConn = null;