Bugfix: subscription before connected

This commit is contained in:
Olivier Verhaegen 2023-04-17 13:25:49 +02:00 committed by GitHub
parent c575c12514
commit c398b3a60a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -130,12 +130,15 @@ module.exports = function(RED) {
/** /**
* Register a STOMP processing node to the connection. * Register a STOMP processing node to the connection.
* @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to register * @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to register
* @param { Function } callback
*/ */
node.register = function(stompNode) { node.register = function(stompNode, callback = () => {}) {
node.users[stompNode.id] = stompNode; node.users[stompNode.id] = stompNode;
// Auto connect when first STOMP processing node is added // Auto connect when first STOMP processing node is added
if (Object.keys(node.users).length === 1) { if (Object.keys(node.users).length === 1) {
node.connect(); node.connect(callback);
} else {
callback();
} }
} }
@ -145,7 +148,7 @@ module.exports = function(RED) {
* @param { Boolean } autoDisconnect Automatically disconnect from the STOM server when no processing nodes registered to the connection * @param { Boolean } autoDisconnect Automatically disconnect from the STOM server when no processing nodes registered to the connection
* @param { Function } callback * @param { Function } callback
*/ */
node.deregister = function(stompNode, autoDisconnect, callback) { node.deregister = function(stompNode, autoDisconnect, callback = () => {}) {
delete node.users[stompNode.id]; delete node.users[stompNode.id];
if (autoDisconnect && !node.closing && node.connected && Object.keys(node.users).length === 0) { if (autoDisconnect && !node.closing && node.connected && Object.keys(node.users).length === 0) {
node.disconnect(callback); node.disconnect(callback);
@ -166,7 +169,7 @@ module.exports = function(RED) {
* Connect to the STOMP server. * Connect to the STOMP server.
* @param {Function} callback * @param {Function} callback
*/ */
node.connect = function(callback) { node.connect = function(callback = () => {}) {
if (node.canConnect()) { if (node.canConnect()) {
node.closing = false; node.closing = false;
node.connecting = true; node.connecting = true;
@ -185,9 +188,7 @@ module.exports = function(RED) {
node.closing = false; node.closing = false;
node.connecting = false; node.connecting = false;
node.connected = true; node.connected = true;
if (typeof callback === "function") { callback();
callback();
}
node.log("Connected to STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion}); node.log("Connected to STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion});
setStatusConnected(node, true); setStatusConnected(node, true);
@ -223,6 +224,9 @@ module.exports = function(RED) {
} catch (err) { } catch (err) {
node.error(err); node.error(err);
} }
} else {
node.log("Not connecting to STOMP server, already connected");
callback();
} }
} }
@ -230,7 +234,7 @@ module.exports = function(RED) {
* Disconnect from the STOMP server. * Disconnect from the STOMP server.
* @param {Function} callback * @param {Function} callback
*/ */
node.disconnect = function(callback) { node.disconnect = function(callback = () => {}) {
const waitDisconnect = (client, timeout) => { const waitDisconnect = (client, timeout) => {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
// Set flag to avoid race conditions for disconnect as every node tries to call it directly or indirectly using deregister // Set flag to avoid race conditions for disconnect as every node tries to call it directly or indirectly using deregister
@ -363,9 +367,10 @@ module.exports = function(RED) {
setStatusDisconnected(node); setStatusDisconnected(node);
if (node.topic) { if (node.topic) {
node.serverConnection.register(node); node.serverConnection.register(node, function() {
node.serverConnection.subscribe(node.topic, node.ack, function(msg) { node.serverConnection.subscribe(node.topic, node.ack, function(msg) {
node.send(msg); node.send(msg);
});
}); });
if (node.serverConnection.connected) { if (node.serverConnection.connected) {