1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Merge pull request #3451 from Steve-Mcl/fix-mqtt-close-timeout

Fix "close timed out" error when performing full deploy or modifying broker node.
This commit is contained in:
Nick O'Leary 2022-02-18 14:43:34 +00:00 committed by GitHub
commit 6c7c1202ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -637,24 +637,8 @@ module.exports = function(RED) {
node.deregister = function(mqttNode,done) { node.deregister = function(mqttNode,done) {
delete node.users[mqttNode.id]; delete node.users[mqttNode.id];
if (node.closing) { if (!node.closing && node.connected && Object.keys(node.users).length === 0) {
return done(); node.disconnect();
}
if (Object.keys(node.users).length === 0) {
if (node.client && node.client.connected) {
// Send close message
if (node.closeMessage) {
node.publish(node.closeMessage,function(err) {
node.client.end(done);
});
} else {
node.client.end(done);
}
return;
} else {
if (node.client) { node.client.end(); }
return done();
}
} }
done(); done();
}; };
@ -663,6 +647,7 @@ module.exports = function(RED) {
} }
node.connect = function (callback) { node.connect = function (callback) {
if (node.canConnect()) { if (node.canConnect()) {
node.closing = false;
node.connecting = true; node.connecting = true;
setStatusConnecting(node, true); setStatusConnecting(node, true);
try { try {
@ -672,6 +657,7 @@ module.exports = function(RED) {
let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times
// Register successful connect or reconnect handler // Register successful connect or reconnect handler
node.client.on('connect', function (connack) { node.client.on('connect', function (connack) {
node.closing = false;
node.connecting = false; node.connecting = false;
node.connected = true; node.connected = true;
if(!callbackDone && typeof callback == "function") { if(!callbackDone && typeof callback == "function") {
@ -740,6 +726,7 @@ module.exports = function(RED) {
reasonCode: rc, reasonCode: rc,
reasonString: rs reasonString: rs
} }
node.connected = false;
node.log(RED._("mqtt.state.broker-disconnected", details)); node.log(RED._("mqtt.state.broker-disconnected", details));
setStatusDisconnected(node, true); setStatusDisconnected(node, true);
}); });
@ -764,25 +751,31 @@ module.exports = function(RED) {
} }
}; };
node.disconnect = function (callback) { node.disconnect = function (callback) {
const _callback = function () { const _callback = function (resetNodeConnectedState) {
setStatusDisconnected(node, true); setStatusDisconnected(node, true);
if(resetNodeConnectedState) {
node.closing = true;
node.connecting = false; node.connecting = false;
node.connected = false; node.connected = false;
}
callback && typeof callback == "function" && callback(); callback && typeof callback == "function" && callback();
}; };
if(node.client) { if(node.closing) {
if(node.client.connected && node.closeMessage) { return _callback(false);
node.publish(node.closeMessage, function (err) {
node.client.end(_callback);
});
} else if(node.client.connected || node.client.reconnecting) {
node.client.end(_callback);
} else if(node.client.disconnecting || node.client.connected === false) {
_callback();
} }
var endCallBack = function endCallBack() {
}
if(node.connected && node.closeMessage) {
node.publish(node.closeMessage, function (err) {
node.client.end(endCallBack);
_callback(true);
});
} else if(node.connected) {
node.client.end(endCallBack);
_callback(true);
} else { } else {
_callback(); _callback(false);
} }
} }
node.subscriptionIds = {}; node.subscriptionIds = {};
@ -1074,6 +1067,8 @@ module.exports = function(RED) {
node.brokerConn.unsubscribe(node.topic,node.id, removed); node.brokerConn.unsubscribe(node.topic,node.id, removed);
} }
node.brokerConn.deregister(node, done); node.brokerConn.deregister(node, done);
} else {
done();
} }
}); });
} else { } else {
@ -1134,7 +1129,11 @@ module.exports = function(RED) {
} }
node.brokerConn.register(node); node.brokerConn.register(node);
node.on('close', function(done) { node.on('close', function(done) {
if (node.brokerConn) {
node.brokerConn.deregister(node,done); node.brokerConn.deregister(node,done);
} else {
done();
}
}); });
} else { } else {
node.error(RED._("mqtt.errors.missing-config")); node.error(RED._("mqtt.errors.missing-config"));