From 8ab9aff3de0752ba5dc739238c5fec7e672578b3 Mon Sep 17 00:00:00 2001 From: Olivier Verhaegen <56387556+OlivierVerhaegen@users.noreply.github.com> Date: Fri, 14 Apr 2023 14:16:30 +0200 Subject: [PATCH] Complete refactor based on MQTT nodes to be able to share server connection between nodes --- io/stomp/18-stomp.html | 34 ++- io/stomp/18-stomp.js | 478 ++++++++++++++++++----------------------- 2 files changed, 223 insertions(+), 289 deletions(-) diff --git a/io/stomp/18-stomp.html b/io/stomp/18-stomp.html index cf005eea..b490ca33 100644 --- a/io/stomp/18-stomp.html +++ b/io/stomp/18-stomp.html @@ -9,12 +9,16 @@
- - + +
-
- - +
+ Enabling the ACK (acknowledgement) will set the ack header to client while subscribing to topics. + This means the items on the broker queue will not be dequeue'd unless an ACK message is sent using the ack node.
@@ -41,8 +45,7 @@ defaults: { name: {value:""}, server: {type:"stomp-server",required:true}, - enable_subscriptionid: {value: false}, - subscriptionid: {value: 0}, + ack: {value: "auto"}, topic: {value:"",required:true} }, inputs:0, @@ -132,14 +135,6 @@
-
- - -
-
- Enabling the ACK (acknowledgement) will set the ack header to client while subscribing to topics. - This means the items on the broker queue will not be dequeue'd unless an ACK message is sent using the ack node. -
@@ -165,7 +160,6 @@ address: {required:true}, port: {value:61613,required:true,validate:RED.validators.number()}, protocolVersion: {value:"1.0",required:true}, - clientAcknowledgement: {value: false}, vhost: {}, reconnectRetries: {value:0,required:true,validate:RED.validators.number()}, reconnectDelay: {value:1,required:true,validate:RED.validators.number()}, @@ -186,6 +180,10 @@
+
+ + +
@@ -200,7 +198,6 @@ The node allows for following inputs:

@@ -215,7 +212,8 @@ color:"#e8cfe8", defaults: { name: {value:""}, - server: {type:"stomp-server",required:true} + server: {type:"stomp-server",required:true}, + topic: {value:""} }, inputs:1, outputs:0, diff --git a/io/stomp/18-stomp.js b/io/stomp/18-stomp.js index 4394f798..4043810e 100644 --- a/io/stomp/18-stomp.js +++ b/io/stomp/18-stomp.js @@ -2,7 +2,6 @@ module.exports = function(RED) { "use strict"; var StompClient = require('stomp-client'); - var querystring = require('querystring'); // ---------------------------------------------- // ------------------- State -------------------- @@ -68,40 +67,8 @@ module.exports = function(RED) { } // ---------------------------------------------- - // ---------------- Connection ------------------ + // ------------------- Nodes -------------------- // ---------------------------------------------- - function handleConnectAction(node, msg, done) { - const clientOptions = typeof msg.clientOptions === 'object' ? msg.clientOptions : null; - if (clientOptions) { - if (!node.client) { - // Client has not been initialized - initialize client and connect - node.client = new StompClient(clientOptions); - node.client.connect(function(sessionId) { - done(sessionId); - }); - } else { - // Already connected/connecting/reconnecting - if (clientOptions.forceReconnect) { - // The force flag tells us to cycle the connection - node.client.disconnect(function() { - node.client = new StompClient(clientOptions); - node.client.connect(function(sessionId) { - done(sessionId) - }) - }); - } - } - } else { - done(new Error("No connection options provided")); - } - } - - function handleDisconnectAction(node, done) { - node.client.disconnect(function() { - done(); - }); - } - function StompServerNode(n) { RED.nodes.createNode(this,n); const node = this; @@ -112,9 +79,10 @@ module.exports = function(RED) { node.connecting = false; node.closing = false; node.options = {}; - // node.subscriptions = {}; node.sessionId = null; - /** @type {StompClient} */ + node.subscribtionIndex = 1; + node.subscriptionIds = {}; + /** @type { StompClient } */ node.client; node.setOptions = function(options, init) { if (!options || typeof options !== "object") { @@ -125,7 +93,6 @@ module.exports = function(RED) { setIfHasProperty(options, node, "address", init); setIfHasProperty(options, node, "port", init); setIfHasProperty(options, node, "protocolVersion", init); - setIfHasProperty(options, node, "clientAcknowledgement", init); setIfHasProperty(options, node, "vhost", init); setIfHasProperty(options, node, "reconnectRetries", init); setIfHasProperty(options, node, "reconnectDelay", init); @@ -151,36 +118,52 @@ module.exports = function(RED) { reconnectOpts: { retries: node.reconnectRetries * 1, delay: node.reconnectDelay * 1 - } + }, + vhost: node.vhost }; } node.setOptions(n, true); - // Define functions called by STOMP processing nodes + /** + * Register a STOMP processing node to the connection. + * @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to register + */ node.register = function(stompNode) { node.users[stompNode.id] = stompNode; // Auto connect when first STOMP processing node is added if (Object.keys(node.users).length === 1) { node.connect(); - // Update nodes status - setTimeout(function() { updateStatus(node, true) }, 1); } } - node.deregister = function(stompNode, done, autoDisconnect) { + /** + * Remove registered STOMP processing nodes from the connection. + * @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to unregister + * @param { Boolean } autoDisconnect Automatically disconnect from the STOM server when no processing nodes registered to the connection + * @param { Function } callback + */ + node.deregister = function(stompNode, autoDisconnect, callback) { delete node.users[stompNode.id]; if (autoDisconnect && !node.closing && node.connected && Object.keys(node.users).length === 0) { - node.disconnect(done); + node.disconnect(callback); } else { - done(); + callback(); } } + /** + * Wether a new connection can be made. + * @returns `true` or `false` + */ node.canConnect = function() { return !node.connected && !node.connecting; } + /** + * Connect to the STOMP server. + * @param {Function} callback + */ node.connect = function(callback) { if (node.canConnect()) { node.closing = false; @@ -195,9 +178,6 @@ module.exports = function(RED) { } node.client = new StompClient(node.options); - node.client.connect(function(sessionId) { - node.sessionId = sessionId; - }); node.client.on("connect", function() { node.closing = false; @@ -234,119 +214,156 @@ module.exports = function(RED) { setStatusError(node, true); }); + node.client.connect(function(sessionId) { + node.sessionId = sessionId; + }); + } catch (err) { node.error(err); } } } + /** + * Disconnect from the STOMP server. + * @param {Function} callback + */ node.disconnect = function(callback) { - + if (!node.client) { + node.warn("Can't disconnect, connection not initialized."); + callback(); + } else { + node.client.disconnect(function() { + node.log("Disconnected from STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion}) + + node.sessionId = null; + node.subscribtionIndex = 1; + node.subscriptionIds = {}; + node.connected = false; + node.connecting = false; + setStatusDisconnected(node, true); + callback(); + }); + } } + + /** + * Subscribe to a given STOMP queue. + * @param { String} queue The queue to subscribe to + * @param { "auto" | "client" | "client-individual" } clientAck Can be `auto`, `client` or `client-individual` (the latter only starting from STOMP v1.1) + * @param { Function } callback + */ + node.subscribe = function(queue, acknowledgement, callback) { + node.log(`Subscribing to: ${queue}`); + + if (node.connected) { + if (!node.subscriptionIds[queue]) { + node.subscriptionIds[queue] = node.subscribtionIndex++; + } + + const headers = { + id: node.subscriptionIds[queue], + // Only set client-individual if not v1.0 + ack: acknowledgement === "client-individual" && node.options.protocolVersion === "1.0" ? "client" : acknowledgement + } + + node.client.subscribe(queue, headers, function(body, responseHeaders) { + let msg = { headers: responseHeaders, topic: queue }; + try { + msg.payload = JSON.parse(body); + } catch { + msg.payload = body; + } + callback(msg); + }); + } else { + node.error("Can't subscribe, not connected"); + } + } + + /** + * Unsubscribe from a STOMP queue. + * @param {String} queue The STOMP queue to unsubscribe from + * @param {Object} headers Headers to add to the unsubscribe message + */ + node.unsubscribe = function(queue, headers = {}) { + delete node.subscriptionIds[queue]; + if (node.connected) { + node.client.unsubscribe(queue, headers); + node.log(`Unsubscribed from ${queue}`, headers); + } + } + + /** + * Publish a STOMP message on a queue. + * @param {String} queue The STOMP queue to publish to + * @param {any} message The message to send + * @param {Object} headers STOMP headers to add to the SEND command + */ + node.publish = function(queue, message, headers = {}) { + if (node.connected) { + node.client.publish(queue, message, headers); + } else { + node.error("Can't publish, not connected"); + } + } + + node.ack = function(queue, messageId, transaction = undefined) { + if (node.connected) { + node.client.ack(messageId, subscriptionIds[queue], transaction); + } else { + node.error("Can't publish, not connected"); + } + } + + node.on("close", function(done) { + node.disconnect(function() { done (); }); + }); } - RED.nodes.registerType("stomp-server",StompServerNode,{ + RED.nodes.registerType("stomp-server", StompServerNode, { credentials: { - user: {type:"text"}, - password: {type: "password"} + user: { type: "text" }, + password: { type: "password" } } }); function StompInNode(n) { RED.nodes.createNode(this,n); - this.server = n.server; - this.topic = n.topic; - this.enableSubscriptionId = n.enable_subscriptionid; - this.subscriptionid = n.subscriptionid; + /** @type { StompInNode } */ + const node = this; + node.server = n.server; + /** @type { StompServerNode } */ + node.serverConnection = RED.nodes.getNode(node.server); + node.topic = n.topic; + node.ack = n.ack; - this.serverConfig = RED.nodes.getNode(this.server); - this.stompClientOpts = { - address: this.serverConfig.server, - port: this.serverConfig.port * 1, - user: this.serverConfig.username, - pass: this.serverConfig.password, - protocolVersion: this.serverConfig.protocolversion, - reconnectOpts: { - retries: this.serverConfig.reconnectretries * 1, - delay: this.serverConfig.reconnectdelay * 1 + if (node.serverConnection) { + setStatusDisconnected(node); + + if (node.topic) { + node.serverConnection.register(node); + node.serverConnection.subscribe(topic, ack, function(msg) { + node.send(msg); + }); + + if (node.serverConnection.connected) { + setStatusConnected(node); + } } - }; - if (this.serverConfig.vhost) { - this.stompClientOpts.vhost = this.serverConfig.vhost; - } - - this.subscribeHeaders = {}; - if (this.enableSubscriptionId) { - this.subscribeHeaders.id = this.subscriptionid; - } - if (this.serverConfig.ack) { - this.subscribeHeaders.ack = "client"; - } - - var node = this; - var msg = {topic:this.topic}; - // Save the client connection to the shared server instance if needed - if (!node.serverConfig.clientConnection) { - node.serverConfig.clientConnection = new StompClient(node.stompClientOpts); - } - node.client = node.serverConfig.clientConnection; - - node.client.on("connect", function() { - node.status({fill:"green",shape:"dot",text:"connected"}); - node.serverConfig.connected = true; - }); - - node.client.on("reconnecting", function() { - node.status({fill:"red",shape:"ring",text:"reconnecting"}); - node.warn("reconnecting"); - node.serverConfig.connected = false; - }); - - node.client.on("reconnect", function() { - node.status({fill:"green",shape:"dot",text:"connected"}); - node.serverConfig.connected = true; - }); - - node.client.on("error", function(error) { - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); - }); - - // Connect to server if needed and subscribe - const subscribe = () => { - node.log('subscribing to: '+node.topic); - node.client.subscribe(node.topic, node.subscribeHeaders, function(body, headers) { - var newmsg={"headers":headers,"topic":node.topic} - try { - newmsg.payload = JSON.parse(body); - } - catch(e) { - newmsg.payload = body; - } - node.send(newmsg); - }); - } - - if (!node.serverConfig.connected) { - node.status({fill:"grey",shape:"ring",text:"connecting"}); - node.client.connect(function(sessionId) { - node.serverConfig.connected = true; - subscribe(); - }, function(error) { - node.serverConfig.connected = false; - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); - }); } else { - subscribe(); + node.error("Missing server config"); } - - node.on("close", function(done) { - if (node.client) { - // disconnect can accept a callback - but it is not always called. - node.client.disconnect(); + node.on("close", function(removed, done) { + if (node.serverConnection) { + if (node.serverConnection.connected) { + node.serverConnection.unsubscribe(node.topic); + } + node.serverConnection.deregister(node, true, done); + node.serverConnection = null; + } else { + done(); } - done(); }); } RED.nodes.registerType("stomp in",StompInNode); @@ -354,156 +371,75 @@ module.exports = function(RED) { function StompOutNode(n) { RED.nodes.createNode(this,n); - this.server = n.server; - this.topic = n.topic; + /** @type { StompOutNode } */ + const node = this; + node.server = n.server; + /** @type { StompServerNode } */ + node.serverConnection = RED.nodes.getNode(node.server); + node.topic = n.topic; - this.serverConfig = RED.nodes.getNode(this.server); - this.stompClientOpts = { - address: this.serverConfig.server, - port: this.serverConfig.port * 1, - user: this.serverConfig.username, - pass: this.serverConfig.password, - protocolVersion: this.serverConfig.protocolversion, - reconnectOpts: { - retries: this.serverConfig.reconnectretries * 1, - delay: this.serverConfig.reconnectdelay * 1 - } - }; - if (this.serverConfig.vhost) { - this.stompClientOpts.vhost = this.serverConfig.vhost; - } + if (node.serverConnection) { + setStatusDisconnected(node); - var node = this; - // Save the client connection to the shared server instance if needed - if (!node.serverConfig.clientConnection) { - node.serverConfig.clientConnection = new StompClient(node.stompClientOpts); - } - node.client = node.serverConfig.clientConnection; - - node.client.on("connect", function() { - node.status({fill:"green",shape:"dot",text:"connected"}); - node.serverConfig.connected = true; - }); - - node.client.on("reconnecting", function() { - node.status({fill:"red",shape:"ring",text:"reconnecting"}); - node.warn("reconnecting"); - node.serverConfig.connected = false; - }); - - node.client.on("reconnect", function() { - node.status({fill:"green",shape:"dot",text:"connected"}); - node.serverConfig.connected = true; - }); - - node.client.on("error", function(error) { - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); - }); - - // Connect to server if needed - if(!node.serverConfig.connected) { - node.status({fill:"grey",shape:"ring",text:"connecting"}); - node.client.connect(function(sessionId) { - node.serverConfig.connected = true; - }, function(error) { - node.serverConfig.connected = false; - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); + node.on("input", function(msg, send, done) { + if (node.topic && msg.payload) { + node.serverConnection.publish(topic, msg.payload, msg.headers || {}); + done(); + } }); + + node.serverConnection.register(node); + if (node.serverConnection.connected) { + setStatusConnected(node); + } + } else { + node.error("Missing server config"); } - node.on("input", function(msg) { - node.client.publish(node.topic || msg.topic, msg.payload, msg.headers); - }); - - node.on("close", function(done) { - if (node.client) { - // disconnect can accept a callback - but it is not always called. - node.client.disconnect(); + node.on("close", function(removed, done) { + if (node.serverConnection) { + node.serverConnection.deregister(node, true, done); + node.serverConnection = null; + } else { + done(); } - done(); }); } RED.nodes.registerType("stomp out",StompOutNode); function StompAckNode(n) { RED.nodes.createNode(this,n); - this.server = n.server; + /** @type { StompOutNode } */ + const node = this; + node.server = n.server; + /** @type { StompServerNode } */ + node.serverConnection = RED.nodes.getNode(node.server); + node.topic = n.topic; - this.serverConfig = RED.nodes.getNode(this.server); - this.stompClientOpts = { - address: this.serverConfig.server, - port: this.serverConfig.port * 1, - user: this.serverConfig.username, - pass: this.serverConfig.password, - protocolVersion: this.serverConfig.protocolversion, - reconnectOpts: { - retries: this.serverConfig.reconnectretries * 1, - delay: this.serverConfig.reconnectdelay * 1 - } - }; - if (this.serverConfig.vhost) { - this.stompClientOpts.vhost = this.serverConfig.vhost; - } + if (node.serverConnection) { + setStatusDisconnected(node); - var node = this; - - // only start connection etc. when acknowledgements are configured to be send by client - if (node.serverConfig.ack) { - // Save the client connection to the shared server instance if needed - if (!node.serverConfig.clientConnection) { - node.serverConfig.clientConnection = new StompClient(node.stompClientOpts); - } - node.client = node.serverConfig.clientConnection; - - node.client.on("connect", function() { - node.status({fill:"green",shape:"dot",text:"connected"}); - node.serverConfig.connected = true; - }); - - node.client.on("reconnecting", function() { - node.status({fill:"red",shape:"ring",text:"reconnecting"}); - node.warn("reconnecting"); - node.serverConfig.connected = false; - }); - - node.client.on("reconnect", function() { - node.status({fill:"green",shape:"dot",text:"connected"}); - node.serverConfig.connected = true; - }); - - node.client.on("error", function(error) { - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); - }); - - // Connect to server if needed - if(!node.serverConfig.connected) { - node.status({fill:"grey",shape:"ring",text:"connecting"}); - node.client.connect(function(sessionId) { - node.serverConfig.connected = true; - }, function(error) { - node.serverConfig.connected = false; - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); - }); - } - - node.on("input", function(msg) { - node.client.ack(msg.messageId, msg.subsriptionId, msg.transaction); - }); - - node.on("close", function(done) { - if (node.client) { - // disconnect can accept a callback - but it is not always called. - node.client.disconnect(); - } + node.on("input", function(msg, send, done) { + node.client.ack(msg.messageId, n.topic, msg.transaction); done(); }); + + node.serverConnection.register(node); + if (node.serverConnection.connected) { + setStatusConnected(node); + } } else { - node.error("ACK not configured in server (config node)"); + node.error("Missing server config"); } + + node.on("close", function(removed, done) { + if (node.serverConnection) { + node.serverConnection.deregister(node, true, done); + node.serverConnection = null; + } else { + done(); + } + }); } RED.nodes.registerType("stomp ack",StompAckNode);