Merge pull request #3594 from PhilDay-CT/issue-3593

Handle removal of event handlers to allow mqtt client.end() to work
This commit is contained in:
Stephen McLaughlin 2022-05-09 16:50:48 +01:00 committed by GitHub
commit 0385c72a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 53 additions and 16 deletions

View File

@ -453,6 +453,7 @@ module.exports = function(RED) {
node.options = {};
node.queue = [];
node.subscriptions = {};
node.clientListeners = []
/** @type {mqtt.MqttClient}*/ this.client;
node.setOptions = function(opts, init) {
if(!opts || typeof opts !== "object") {
@ -718,11 +719,16 @@ module.exports = function(RED) {
setStatusConnecting(node, true);
try {
node.serverProperties = {};
if(node.client) {
//belt and braces to avoid left over clients
node.client.end(true);
node._clientRemoveListeners();
}
node.client = mqtt.connect(node.brokerurl, node.options);
node.client.setMaxListeners(0);
let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times
let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times
// Register successful connect or reconnect handler
node.client.on('connect', function (connack) {
node._clientOn('connect', function (connack) {
node.closing = false;
node.connecting = false;
node.connected = true;
@ -754,7 +760,7 @@ module.exports = function(RED) {
}
setStatusConnected(node, true);
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
node.client.removeAllListeners('message');
node._clientRemoveListeners('message');
// Re-subscribe to stored topics
for (var s in node.subscriptions) {
@ -766,7 +772,7 @@ module.exports = function(RED) {
if (node.subscriptions[s].hasOwnProperty(r)) {
qos = Math.max(qos,node.subscriptions[s][r].qos);
_options = node.subscriptions[s][r].options;
node.client.on('message',node.subscriptions[s][r].handler);
node._clientOn('message',node.subscriptions[s][r].handler);
}
}
_options.qos = _options.qos || qos;
@ -779,11 +785,11 @@ module.exports = function(RED) {
node.publish(node.birthMessage);
}
});
node.client.on("reconnect", function() {
node._clientOn("reconnect", function() {
setStatusConnecting(node, true);
});
//Broker Disconnect - V5 event
node.client.on("disconnect", function(packet) {
node._clientOn("disconnect", function(packet) {
//Emitted after receiving disconnect packet from broker. MQTT 5.0 feature.
const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode;
const rs = packet && packet.properties && packet.properties.reasonString || "";
@ -797,7 +803,7 @@ module.exports = function(RED) {
setStatusDisconnected(node, true);
});
// Register disconnect handlers
node.client.on('close', function () {
node._clientOn('close', function () {
if (node.connected) {
node.connected = false;
node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
@ -809,7 +815,7 @@ module.exports = function(RED) {
// Register connect error handler
// The client's own reconnect logic will take care of errors
node.client.on('error', function (error) {
node._clientOn('error', function (error) {
});
}catch(err) {
console.log(err);
@ -822,7 +828,7 @@ module.exports = function(RED) {
if(node.connected || node.connecting) {
setStatusDisconnected(node, true);
}
if(node.client) { node.client.removeAllListeners(); }
if(node.client) { node._clientRemoveListeners(); }
node.connecting = false;
node.connected = false;
callback && typeof callback == "function" && callback();
@ -836,8 +842,12 @@ module.exports = function(RED) {
if(!client) {
resolve();
} else {
const t = setTimeout(reject, ms);
client.end(() => {
const t = setTimeout(() => {
//clean end() has exceeded WAIT_END, lets force end!
client && client.end(true);
reject();
}, ms);
client.end(() => {
clearTimeout(t);
resolve()
});
@ -894,7 +904,7 @@ module.exports = function(RED) {
};
node.subscriptions[topic][ref] = sub;
if (node.connected) {
node.client.on('message',sub.handler);
node._clientOn('message',sub.handler);
node.client.subscribe(topic, options);
}
};
@ -905,7 +915,7 @@ module.exports = function(RED) {
if (sub) {
if (sub[ref]) {
if(node.client) {
node.client.removeListener('message',sub[ref].handler);
node._clientRemoveListeners('message',sub[ref].handler);
}
delete sub[ref];
}
@ -995,13 +1005,40 @@ module.exports = function(RED) {
node.on('close', function(done) {
node.disconnect(function() {
if(node.client) {
node.client.removeAllListeners();
}
done();
});
});
/**
* Add event handlers to the MQTT.js client and track them so that
* we do not remove any handlers that the MQTT client uses internally.
* Use {@link node._clientRemoveListeners `node._clientRemoveListeners`} to remove handlers
* @param {string} event The name of the event
* @param {function} handler The handler for this event
*/
node._clientOn = function(event, handler) {
node.clientListeners.push({event, handler})
node.client.on(event, handler)
}
/**
* Remove event handlers from the MQTT.js client & only the events
* that we attached in {@link node._clientOn `node._clientOn`}.
* * If `event` is omitted, then all events matching `handler` are removed
* * If `handler` is omitted, then all events named `event` are removed
* * If both parameters are omitted, then all events are removed
* @param {string} [event] The name of the event (optional)
* @param {function} [handler] The handler for this event (optional)
*/
node._clientRemoveListeners = function(event, handler) {
node.clientListeners = node.clientListeners.filter((l) => {
if (event && event !== l.event) { return true; }
if (handler && handler !== l.handler) { return true; }
node.client.removeListener(l.event, l.handler)
return false; //found and removed, filter out this one
})
}
}
RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{