From 669aa769c2c3ce568e635eaf77af00b792de1c2a Mon Sep 17 00:00:00 2001 From: Steve-Mcl Date: Thu, 17 Feb 2022 10:18:46 +0000 Subject: [PATCH] Fix close timeout on MQTT nodes fixes #2934 --- .../@node-red/nodes/core/network/10-mqtt.js | 65 +++++++++---------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 5546dc868..bdf2b0e5b 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -637,24 +637,8 @@ module.exports = function(RED) { node.deregister = function(mqttNode,done) { delete node.users[mqttNode.id]; - if (node.closing) { - return done(); - } - if (Object.keys(node.users).length === 0) { - if (node.client && node.client.connected) { - // Send close message - if (node.closeMessage) { - node.publish(node.closeMessage,function(err) { - node.client.end(done); - }); - } else { - node.client.end(done); - } - return; - } else { - if (node.client) { node.client.end(); } - return done(); - } + if (!node.closing && node.connected && Object.keys(node.users).length === 0) { + node.disconnect(); } done(); }; @@ -663,6 +647,7 @@ module.exports = function(RED) { } node.connect = function (callback) { if (node.canConnect()) { + node.closing = false; node.connecting = true; setStatusConnecting(node, true); try { @@ -672,6 +657,7 @@ module.exports = function(RED) { let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times // Register successful connect or reconnect handler node.client.on('connect', function (connack) { + node.closing = false; node.connecting = false; node.connected = true; if(!callbackDone && typeof callback == "function") { @@ -740,6 +726,7 @@ module.exports = function(RED) { reasonCode: rc, reasonString: rs } + node.connected = false; node.log(RED._("mqtt.state.broker-disconnected", details)); setStatusDisconnected(node, true); }); @@ -764,25 +751,31 @@ module.exports = function(RED) { } }; node.disconnect = function (callback) { - const _callback = function () { + const _callback = function (resetNodeConnectedState) { setStatusDisconnected(node, true); - node.connecting = false; - node.connected = false; + if(resetNodeConnectedState) { + node.closing = true; + node.connecting = false; + node.connected = false; + } callback && typeof callback == "function" && callback(); }; - if(node.client) { - if(node.client.connected && node.closeMessage) { - node.publish(node.closeMessage, function (err) { - node.client.end(_callback); - }); - } else if(node.client.connected || node.client.reconnecting) { - node.client.end(_callback); - } else if(node.client.disconnecting || node.client.connected === false) { - _callback(); - } + if(node.closing) { + return _callback(false); + } + var endCallBack = function endCallBack() { + } + if(node.connected && node.closeMessage) { + node.publish(node.closeMessage, function (err) { + node.client.end(endCallBack); + _callback(true); + }); + } else if(node.connected) { + node.client.end(endCallBack); + _callback(true); } else { - _callback(); + _callback(false); } } node.subscriptionIds = {}; @@ -1074,6 +1067,8 @@ module.exports = function(RED) { node.brokerConn.unsubscribe(node.topic,node.id, removed); } node.brokerConn.deregister(node, done); + } else { + done(); } }); } else { @@ -1134,7 +1129,11 @@ module.exports = function(RED) { } node.brokerConn.register(node); node.on('close', function(done) { - node.brokerConn.deregister(node,done); + if (node.brokerConn) { + node.brokerConn.deregister(node,done); + } else { + done(); + } }); } else { node.error(RED._("mqtt.errors.missing-config"));