1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Fix close timeout on MQTT nodes

fixes #2934
This commit is contained in:
Steve-Mcl 2022-02-17 10:18:46 +00:00
parent fcf2994015
commit 669aa769c2

View File

@ -637,24 +637,8 @@ module.exports = function(RED) {
node.deregister = function(mqttNode,done) {
delete node.users[mqttNode.id];
if (node.closing) {
return done();
}
if (Object.keys(node.users).length === 0) {
if (node.client && node.client.connected) {
// Send close message
if (node.closeMessage) {
node.publish(node.closeMessage,function(err) {
node.client.end(done);
});
} else {
node.client.end(done);
}
return;
} else {
if (node.client) { node.client.end(); }
return done();
}
if (!node.closing && node.connected && Object.keys(node.users).length === 0) {
node.disconnect();
}
done();
};
@ -663,6 +647,7 @@ module.exports = function(RED) {
}
node.connect = function (callback) {
if (node.canConnect()) {
node.closing = false;
node.connecting = true;
setStatusConnecting(node, true);
try {
@ -672,6 +657,7 @@ module.exports = function(RED) {
let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times
// Register successful connect or reconnect handler
node.client.on('connect', function (connack) {
node.closing = false;
node.connecting = false;
node.connected = true;
if(!callbackDone && typeof callback == "function") {
@ -740,6 +726,7 @@ module.exports = function(RED) {
reasonCode: rc,
reasonString: rs
}
node.connected = false;
node.log(RED._("mqtt.state.broker-disconnected", details));
setStatusDisconnected(node, true);
});
@ -764,25 +751,31 @@ module.exports = function(RED) {
}
};
node.disconnect = function (callback) {
const _callback = function () {
const _callback = function (resetNodeConnectedState) {
setStatusDisconnected(node, true);
node.connecting = false;
node.connected = false;
if(resetNodeConnectedState) {
node.closing = true;
node.connecting = false;
node.connected = false;
}
callback && typeof callback == "function" && callback();
};
if(node.client) {
if(node.client.connected && node.closeMessage) {
node.publish(node.closeMessage, function (err) {
node.client.end(_callback);
});
} else if(node.client.connected || node.client.reconnecting) {
node.client.end(_callback);
} else if(node.client.disconnecting || node.client.connected === false) {
_callback();
}
if(node.closing) {
return _callback(false);
}
var endCallBack = function endCallBack() {
}
if(node.connected && node.closeMessage) {
node.publish(node.closeMessage, function (err) {
node.client.end(endCallBack);
_callback(true);
});
} else if(node.connected) {
node.client.end(endCallBack);
_callback(true);
} else {
_callback();
_callback(false);
}
}
node.subscriptionIds = {};
@ -1074,6 +1067,8 @@ module.exports = function(RED) {
node.brokerConn.unsubscribe(node.topic,node.id, removed);
}
node.brokerConn.deregister(node, done);
} else {
done();
}
});
} else {
@ -1134,7 +1129,11 @@ module.exports = function(RED) {
}
node.brokerConn.register(node);
node.on('close', function(done) {
node.brokerConn.deregister(node,done);
if (node.brokerConn) {
node.brokerConn.deregister(node,done);
} else {
done();
}
});
} else {
node.error(RED._("mqtt.errors.missing-config"));