From 45b43ebb21cebcc55d7f02653373307a7637f033 Mon Sep 17 00:00:00 2001 From: Olivier Verhaegen <56387556+OlivierVerhaegen@users.noreply.github.com> Date: Tue, 11 Jul 2023 20:58:51 +0200 Subject: [PATCH] STOMP bugfix: usage of multiple stomp in nodes (#1015) * Bugfix: only execute node register callbacks after connection to the server has been made * Bugfix: only execute direct callback of register node register when connected to STOMP server * Doc: Add tip about reconnection to server --- io/stomp/18-stomp.html | 7 +++++-- io/stomp/18-stomp.js | 16 ++++++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/io/stomp/18-stomp.html b/io/stomp/18-stomp.html index 4f2549a9..7d9b307c 100644 --- a/io/stomp/18-stomp.html +++ b/io/stomp/18-stomp.html @@ -16,7 +16,7 @@ -
+
Enabling the ACK (acknowledgment) will set the ack header to client while subscribing to topics. This means the items on the broker queue will not be dequeue'd unless an ACK message is sent using the ack node.
@@ -73,7 +73,7 @@
-
The Destination field is optional. If not set uses the msg.topic +
The Destination field is optional. If not set uses the msg.topic property of the message.
@@ -147,6 +147,9 @@
+
+ Reconnection timings are calculated using exponential backoff. The first reconnection happens immediately, the second reconnection happens at +delay ms, the third at + 2*delay ms, etc. +
diff --git a/io/stomp/18-stomp.js b/io/stomp/18-stomp.js index 9b893a6e..97a2ec22 100644 --- a/io/stomp/18-stomp.js +++ b/io/stomp/18-stomp.js @@ -84,6 +84,8 @@ module.exports = function(RED) { node.sessionId = null; node.subscribtionIndex = 1; node.subscriptionIds = {}; + /** Array of callbacks to be called once the connection to the broker has been made */ + node.connectedCallbacks = []; /** @type { StompClient } */ node.client; node.setOptions = function(options, init) { @@ -134,10 +136,20 @@ module.exports = function(RED) { */ node.register = function(stompNode, callback = () => {}) { node.users[stompNode.id] = stompNode; + + if (!node.connected) { + node.connectedCallbacks.push(callback); + } + // Auto connect when first STOMP processing node is added if (Object.keys(node.users).length === 1) { - node.connect(callback); - } else { + node.connect(() => { + while (node.connectedCallbacks.length) { + node.connectedCallbacks.shift().call(); + } + }); + } else if (node.connected) { + // Execute callback directly as the connection to the STOMP server has already been made callback(); } }