From 54ec71f16c2210ca7f8e8c5a8630c9c06fd85836 Mon Sep 17 00:00:00 2001 From: Olivier Verhaegen <56387556+OlivierVerhaegen@users.noreply.github.com> Date: Tue, 11 Apr 2023 09:32:36 +0200 Subject: [PATCH] Add support for ACK messages --- io/stomp/18-stomp.html | 58 ++++++++++++++++++++++++++++++++++ io/stomp/18-stomp.js | 72 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/io/stomp/18-stomp.html b/io/stomp/18-stomp.html index e643807c..2d652604 100644 --- a/io/stomp/18-stomp.html +++ b/io/stomp/18-stomp.html @@ -122,6 +122,14 @@ +
+ + +
+
+ Enabling the ACK (acknowledgement) will set the ack header to client while subscribing to topics. + This means the items on the broker queue will not be dequeue'd unless an ACK message is sent using the ack node. +
@@ -147,6 +155,7 @@ server: {required:true}, port: {value:61613,required:true,validate:RED.validators.number()}, protocolversion: {value:"1.0",required:true}, + ack: {value: false}, vhost: {}, reconnectretries: {value:"0",required:true,validate:RED.validators.number()}, reconnectdelay: {value:"0.5",required:true,validate:RED.validators.number()}, @@ -161,3 +170,52 @@ } }); + + + + + + diff --git a/io/stomp/18-stomp.js b/io/stomp/18-stomp.js index a35da1a7..b44b605c 100644 --- a/io/stomp/18-stomp.js +++ b/io/stomp/18-stomp.js @@ -9,6 +9,7 @@ module.exports = function(RED) { this.server = n.server; this.port = n.port; this.protocolversion = n.protocolversion; + this.ack = n.ack; this.vhost = n.vhost; this.reconnectretries = n.reconnectretries || 999999; this.reconnectdelay = (n.reconnectdelay || 15) * 1000; @@ -43,6 +44,7 @@ module.exports = function(RED) { if (this.serverConfig.vhost) { this.stompClientOpts.vhost = this.serverConfig.vhost; } + this.subscribeHeaders = this.serverConfig.ack ? { "ack": "client" } : {}; var node = this; var msg = {topic:this.topic}; @@ -69,7 +71,7 @@ module.exports = function(RED) { node.status({fill:"grey",shape:"ring",text:"connecting"}); node.client.connect(function(sessionId) { node.log('subscribing to: '+node.topic); - node.client.subscribe(node.topic, function(body, headers) { + node.client.subscribe(node.topic, node.subscribeHeaders, function(body, headers) { var newmsg={"headers":headers,"topic":node.topic} try { newmsg.payload = JSON.parse(body); @@ -158,4 +160,72 @@ module.exports = function(RED) { } RED.nodes.registerType("stomp out",StompOutNode); + function StompAckNode(n) { + RED.nodes.createNode(this,n); + this.server = n.server; + + this.serverConfig = RED.nodes.getNode(this.server); + this.stompClientOpts = { + address: this.serverConfig.server, + port: this.serverConfig.port * 1, + user: this.serverConfig.username, + pass: this.serverConfig.password, + protocolVersion: this.serverConfig.protocolversion, + reconnectOpts: { + retries: this.serverConfig.reconnectretries * 1, + delay: this.serverConfig.reconnectdelay * 1 + } + }; + if (this.serverConfig.vhost) { + this.stompClientOpts.vhost = this.serverConfig.vhost; + } + + var node = this; + + // only start connection etc. when acknowledgements are configured to be send by client + if (node.serverConfig.ack) { + node.client = new StompClient(node.stompClientOpts); + + node.client.on("connect", function() { + node.status({fill:"green",shape:"dot",text:"connected"}); + }); + + node.client.on("reconnecting", function() { + node.status({fill:"red",shape:"ring",text:"reconnecting"}); + node.warn("reconnecting"); + }); + + node.client.on("reconnect", function() { + node.status({fill:"green",shape:"dot",text:"connected"}); + }); + + node.client.on("error", function(error) { + node.status({fill:"grey",shape:"dot",text:"error"}); + node.warn(error); + }); + + node.status({fill:"grey",shape:"ring",text:"connecting"}); + node.client.connect(function(sessionId) { + }, function(error) { + node.status({fill:"grey",shape:"dot",text:"error"}); + node.warn(error); + }); + + node.on("input", function(msg) { + node.ack(msg.messageId, msg.subsriptionId, msg.transaction); + }); + + node.on("close", function(done) { + if (node.client) { + // disconnect can accept a callback - but it is not always called. + node.client.disconnect(); + } + done(); + }); + } else { + node.error("ACK not configured in server (config node)"); + } + } + RED.nodes.registerType("stomp ack",StompAckNode); + };