From 1a0d1ce1732625a7cb6fbea52d5f20cacee72e1c Mon Sep 17 00:00:00 2001 From: Olivier Verhaegen <56387556+OlivierVerhaegen@users.noreply.github.com> Date: Wed, 12 Apr 2023 14:12:14 +0200 Subject: [PATCH] Improvements & bugfixes to shared connection --- io/stomp/18-stomp.js | 55 ++++++++++++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/io/stomp/18-stomp.js b/io/stomp/18-stomp.js index 1a2b2c12..56b6bcae 100644 --- a/io/stomp/18-stomp.js +++ b/io/stomp/18-stomp.js @@ -17,6 +17,7 @@ module.exports = function(RED) { this.username = this.credentials.user; this.password = this.credentials.password; this.clientConnection = null; + this.connected = false; } RED.nodes.registerType("stomp-server",StompServerNode,{ credentials: { @@ -66,15 +67,18 @@ module.exports = function(RED) { node.client.on("connect", function() { node.status({fill:"green",shape:"dot",text:"connected"}); + node.serverConfig.connected = true; }); node.client.on("reconnecting", function() { node.status({fill:"red",shape:"ring",text:"reconnecting"}); node.warn("reconnecting"); + node.serverConfig.connected = false; }); node.client.on("reconnect", function() { node.status({fill:"green",shape:"dot",text:"connected"}); + node.serverConfig.connected = true; }); node.client.on("error", function(error) { @@ -82,8 +86,10 @@ module.exports = function(RED) { node.warn(error); }); + // Connect to server if needed and subscribe node.status({fill:"grey",shape:"ring",text:"connecting"}); - node.client.connect(function(sessionId) { + + function subscribe() { node.log('subscribing to: '+node.topic); node.client.subscribe(node.topic, node.subscribeHeaders, function(body, headers) { var newmsg={"headers":headers,"topic":node.topic} @@ -95,10 +101,19 @@ module.exports = function(RED) { } node.send(newmsg); }); - }, function(error) { - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); - }); + } + + if (!node.server.connected) { + node.client.connect(function(sessionId) { + subscribe(); + }, function(error) { + node.status({fill:"grey",shape:"dot",text:"error"}); + node.warn(error); + }); + } else { + subscribe(); + } + node.on("close", function(done) { if (node.client) { @@ -141,15 +156,18 @@ module.exports = function(RED) { node.client.on("connect", function() { node.status({fill:"green",shape:"dot",text:"connected"}); + node.serverConfig.connected = true; }); node.client.on("reconnecting", function() { node.status({fill:"red",shape:"ring",text:"reconnecting"}); node.warn("reconnecting"); + node.serverConfig.connected = false; }); node.client.on("reconnect", function() { node.status({fill:"green",shape:"dot",text:"connected"}); + node.serverConfig.connected = true; }); node.client.on("error", function(error) { @@ -157,12 +175,14 @@ module.exports = function(RED) { node.warn(error); }); + // Connect to server if needed node.status({fill:"grey",shape:"ring",text:"connecting"}); - node.client.connect(function(sessionId) { - }, function(error) { - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); - }); + if(!node.serverConfig.connected) { + node.client.connect(function(sessionId) {}, function(error) { + node.status({fill:"grey",shape:"dot",text:"error"}); + node.warn(error); + }); + } node.on("input", function(msg) { node.client.publish(node.topic || msg.topic, msg.payload, msg.headers); @@ -210,15 +230,18 @@ module.exports = function(RED) { node.client.on("connect", function() { node.status({fill:"green",shape:"dot",text:"connected"}); + node.serverConfig.connected = true; }); node.client.on("reconnecting", function() { node.status({fill:"red",shape:"ring",text:"reconnecting"}); node.warn("reconnecting"); + node.serverConfig.connected = false; }); node.client.on("reconnect", function() { node.status({fill:"green",shape:"dot",text:"connected"}); + node.serverConfig.connected = true; }); node.client.on("error", function(error) { @@ -226,12 +249,14 @@ module.exports = function(RED) { node.warn(error); }); + // Connect to server if needed node.status({fill:"grey",shape:"ring",text:"connecting"}); - node.client.connect(function(sessionId) { - }, function(error) { - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); - }); + if(!node.serverConfig.connected) { + node.client.connect(function(sessionId) {}, function(error) { + node.status({fill:"grey",shape:"dot",text:"error"}); + node.warn(error); + }); + } node.on("input", function(msg) { node.client.ack(msg.messageId, msg.subsriptionId, msg.transaction);