+ Enabling the ACK (acknowledgment) will set the ack header to client while subscribing to topics.
+ This means the items on the broker queue will not be dequeue'd unless an ACK message is sent using the ack node.
+
@@ -33,6 +45,7 @@
defaults: {
name: {value:""},
server: {type:"stomp-server",required:true},
+ ack: {value: "auto", required: true},
topic: {value:"",required:true}
},
inputs:0,
@@ -148,8 +161,8 @@
port: {value:61613,required:true,validate:RED.validators.number()},
protocolversion: {value:"1.0",required:true},
vhost: {},
- reconnectretries: {value:"0",required:true,validate:RED.validators.number()},
- reconnectdelay: {value:"0.5",required:true,validate:RED.validators.number()},
+ reconnectretries: {value:0,required:true,validate:RED.validators.number()},
+ reconnectdelay: {value:1,required:true,validate:RED.validators.number()},
name: {}
},
credentials: {
@@ -161,3 +174,56 @@
}
});
+
+
+
+
+
+
diff --git a/io/stomp/18-stomp.js b/io/stomp/18-stomp.js
index a35da1a7..8ff77156 100644
--- a/io/stomp/18-stomp.js
+++ b/io/stomp/18-stomp.js
@@ -2,94 +2,406 @@
module.exports = function(RED) {
"use strict";
var StompClient = require('stomp-client');
- var querystring = require('querystring');
+ // ----------------------------------------------
+ // ------------------- State --------------------
+ // ----------------------------------------------
+ function updateStatus(node, allNodes) {
+ let setStatus = setStatusDisconnected;
+
+ if (node.connecting) {
+ setStatus = setStatusConnecting;
+ } else if (node.connected) {
+ setStatus = setStatusConnected;
+ }
+
+ setStatus(node, allNodes);
+ }
+
+ function setStatusDisconnected(node, allNodes) {
+ if (allNodes) {
+ for (let id in node.users) {
+ if (hasProperty(node.users, id)) {
+ node.users[id].status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected"});
+ }
+ }
+ } else {
+ node.status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected" })
+ }
+ }
+
+ function setStatusConnecting(node, allNodes) {
+ if(allNodes) {
+ for (var id in node.users) {
+ if (hasProperty(node.users, id)) {
+ node.users[id].status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" });
+ }
+ }
+ } else {
+ node.status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" });
+ }
+ }
+
+ function setStatusConnected(node, allNodes) {
+ if(allNodes) {
+ for (var id in node.users) {
+ if (hasProperty(node.users, id)) {
+ node.users[id].status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" });
+ }
+ }
+ } else {
+ node.status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" });
+ }
+ }
+
+ function setStatusError(node, allNodes) {
+ if(allNodes) {
+ for (var id in node.users) {
+ if (hasProperty(node.users, id)) {
+ node.users[id].status({ fill: "red", shape: "dot", text: "error" });
+ }
+ }
+ } else {
+ node.status({ fill: "red", shape: "dot", text: "error" });
+ }
+ }
+
+ // ----------------------------------------------
+ // ------------------- Nodes --------------------
+ // ----------------------------------------------
function StompServerNode(n) {
RED.nodes.createNode(this,n);
- this.server = n.server;
- this.port = n.port;
- this.protocolversion = n.protocolversion;
- this.vhost = n.vhost;
- this.reconnectretries = n.reconnectretries || 999999;
- this.reconnectdelay = (n.reconnectdelay || 15) * 1000;
- this.name = n.name;
- this.username = this.credentials.user;
- this.password = this.credentials.password;
+ const node = this;
+ // To keep track of processing nodes that use this config node for their connection
+ node.users = {};
+ // Config node state
+ node.connected = false;
+ node.connecting = false;
+ /** Flag to avoid race conditions between `deregister` and the `close` event of the config node (ex. on redeploy) */
+ node.closing = false;
+ /** Options to pass to the stomp-client API */
+ node.options = {};
+ node.sessionId = null;
+ node.subscribtionIndex = 1;
+ node.subscriptionIds = {};
+ /** @type { StompClient } */
+ node.client;
+ node.setOptions = function(options, init) {
+ if (!options || typeof options !== "object") {
+ return; // Nothing to change
+ }
+
+ // Apply property changes (only if the property exists in the options object)
+ setIfHasProperty(options, node, "server", init);
+ setIfHasProperty(options, node, "port", init);
+ setIfHasProperty(options, node, "protocolversion", init);
+ setIfHasProperty(options, node, "vhost", init);
+ setIfHasProperty(options, node, "reconnectretries", init);
+ setIfHasProperty(options, node, "reconnectdelay", init);
+
+ if (node.credentials) {
+ node.username = node.credentials.username;
+ node.password = node.credentials.password;
+ }
+ if (!init && hasProperty(options, "username")) {
+ node.username = options.username;
+ }
+ if (!init && hasProperty(options, "password")) {
+ node.password = options.password;
+ }
+
+ // Build options for passing to the stomp-client API
+ node.options = {
+ address: node.server,
+ port: node.port * 1,
+ user: node.username,
+ pass: node.password,
+ protocolVersion: node.protocolversion,
+ reconnectOpts: {
+ retries: node.reconnectretries * 1,
+ delay: node.reconnectdelay * 1000
+ },
+ vhost: node.vhost
+ };
+ }
+
+ node.setOptions(n, true);
+
+ /**
+ * Register a STOMP processing node to the connection.
+ * @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to register
+ * @param { Function } callback
+ */
+ node.register = function(stompNode, callback = () => {}) {
+ node.users[stompNode.id] = stompNode;
+ // Auto connect when first STOMP processing node is added
+ if (Object.keys(node.users).length === 1) {
+ node.connect(callback);
+ } else {
+ callback();
+ }
+ }
+
+ /**
+ * Remove registered STOMP processing nodes from the connection.
+ * @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to unregister
+ * @param { Boolean } autoDisconnect Automatically disconnect from the STOM server when no processing nodes registered to the connection
+ * @param { Function } callback
+ */
+ node.deregister = function(stompNode, autoDisconnect, callback = () => {}) {
+ delete node.users[stompNode.id];
+ if (autoDisconnect && !node.closing && node.connected && Object.keys(node.users).length === 0) {
+ node.disconnect(callback);
+ } else {
+ callback();
+ }
+ }
+
+ /**
+ * Wether a new connection can be made.
+ * @returns `true` or `false`
+ */
+ node.canConnect = function() {
+ return !node.connected && !node.connecting;
+ }
+
+ /**
+ * Connect to the STOMP server.
+ * @param {Function} callback
+ */
+ node.connect = function(callback = () => {}) {
+ if (node.canConnect()) {
+ node.closing = false;
+ node.connecting = true;
+ setStatusConnecting(node, true);
+
+ try {
+ // Remove left over client if needed
+ if (node.client) {
+ node.client.disconnect();
+ node.client = null;
+ }
+
+ node.client = new StompClient(node.options);
+
+ node.client.on("connect", function() {
+ node.closing = false;
+ node.connecting = false;
+ node.connected = true;
+ callback();
+
+ node.log("Connected to STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion});
+ setStatusConnected(node, true);
+ });
+
+ node.client.on("reconnect", function(sessionId, numOfRetries) {
+ node.closing = false;
+ node.connecting = false;
+ node.connected = true;
+ node.sessionId = sessionId;
+ callback();
+
+ node.log("Reconnected to STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion, retries: numOfRetries});
+ setStatusConnected(node, true);
+ });
+
+ node.client.on("reconnecting", function() {
+ node.warn("reconnecting");
+ node.connecting = true;
+ node.connected = false;
+
+ node.log("Reconnecting to STOMP server...", {url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion});
+ setStatusConnecting(node, true);
+ });
+
+ node.client.on("error", function(err) {
+ node.error(err);
+ setStatusError(node, true);
+ });
+
+ node.client.connect(function(sessionId) {
+ node.sessionId = sessionId;
+ });
+
+ } catch (err) {
+ node.error(err);
+ }
+ } else {
+ node.log("Not connecting to STOMP server, already connected");
+ callback();
+ }
+ }
+
+ /**
+ * Disconnect from the STOMP server.
+ * @param {Function} callback
+ */
+ node.disconnect = function(callback = () => {}) {
+ const waitDisconnect = (client, timeout) => {
+ return new Promise((resolve, reject) => {
+ // Set flag to avoid race conditions for disconnect as every node tries to call it directly or indirectly using deregister
+ node.closing = true;
+ const t = setTimeout(() => {
+ reject();
+ }, timeout);
+ client.disconnect(() => {
+ clearTimeout(t);
+ resolve();
+ });
+ });
+ }
+
+ if (!node.client) {
+ node.warn("Can't disconnect, connection not initialized.");
+ callback();
+ } else if (node.closing || !node.connected) {
+ // Disconnection already in progress or not connected
+ callback();
+ } else {
+ node.log("Unsubscribing from STOMP queue's...");
+ const subscribedQueues = Object.keys(node.subscriptionIds);
+ subscribedQueues.forEach(function(queue) {
+ node.unsubscribe(queue);
+ });
+ node.log('Disconnecting from STOMP server...');
+ waitDisconnect(node.client, 2000).then(() => {
+ node.log("Disconnected from STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion})
+ }).catch(() => {
+ node.log("Disconnect timeout closing node...");
+ }).finally(() => {
+ node.sessionId = null;
+ node.subscribtionIndex = 1;
+ node.subscriptionIds = {};
+ node.connected = false;
+ node.connecting = false;
+ setStatusDisconnected(node, true);
+ callback();
+ });
+ }
+ }
+
+ /**
+ * Subscribe to a given STOMP queue.
+ * @param { String} queue The queue to subscribe to
+ * @param { "auto" | "client" | "client-individual" } clientAck Can be `auto`, `client` or `client-individual` (the latter only starting from STOMP v1.1)
+ * @param { Function } callback
+ */
+ node.subscribe = function(queue, acknowledgment, callback) {
+ node.log(`Subscribing to: ${queue}`);
+
+ if (node.connected && !node.closing) {
+ if (!node.subscriptionIds[queue]) {
+ node.subscriptionIds[queue] = node.subscribtionIndex++;
+ }
+
+ const headers = {
+ id: node.subscriptionIds[queue],
+ // Only set client-individual if not v1.0
+ ack: acknowledgment === "client-individual" && node.options.protocolVersion === "1.0" ? "client" : acknowledgment
+ }
+
+ node.client.subscribe(queue, headers, function(body, responseHeaders) {
+ let msg = { headers: responseHeaders, topic: queue };
+ try {
+ msg.payload = JSON.parse(body);
+ } catch {
+ msg.payload = body;
+ }
+ callback(msg);
+ });
+ } else {
+ node.error("Can't subscribe, not connected");
+ }
+ }
+
+ /**
+ * Unsubscribe from a STOMP queue.
+ * @param {String} queue The STOMP queue to unsubscribe from
+ * @param {Object} headers Headers to add to the unsubscribe message
+ */
+ node.unsubscribe = function(queue, headers = {}) {
+ delete node.subscriptionIds[queue];
+ if (node.connected && !node.closing) {
+ node.client.unsubscribe(queue, headers);
+ node.log(`Unsubscribed from ${queue}`, headers);
+ }
+ }
+
+ /**
+ * Publish a STOMP message on a queue.
+ * @param {String} queue The STOMP queue to publish to
+ * @param {any} message The message to send
+ * @param {Object} headers STOMP headers to add to the SEND command
+ */
+ node.publish = function(queue, message, headers = {}) {
+ if (node.connected && !node.closing) {
+ node.client.publish(queue, message, headers);
+ } else {
+ node.error("Can't publish, not connected");
+ }
+ }
+
+ /**
+ * Acknowledge (a) message(s) that was received from the specified queue.
+ * @param {String} queue The queue/topic to send an acknowledgment for
+ * @param {String} messageId ID of the message that was received from the server, which can be found in the reponse header as `message-id`
+ * @param {String} transaction Optional transaction name
+ */
+ node.ack = function(queue, messageId, transaction = undefined) {
+ if (node.connected && !node.closing) {
+ node.client.ack(messageId, node.subscriptionIds[queue], transaction);
+ } else {
+ node.error("Can't send acknowledgment, not connected");
+ }
+ }
+
+ node.on("close", function(done) {
+ node.disconnect(function() { done (); });
+ });
}
- RED.nodes.registerType("stomp-server",StompServerNode,{
+ RED.nodes.registerType("stomp-server", StompServerNode, {
credentials: {
- user: {type:"text"},
- password: {type: "password"}
+ user: { type: "text" },
+ password: { type: "password" }
}
});
function StompInNode(n) {
RED.nodes.createNode(this,n);
- this.server = n.server;
- this.topic = n.topic;
+ /** @type { StompInNode } */
+ const node = this;
+ node.server = n.server;
+ /** @type { StompServerNode } */
+ node.serverConnection = RED.nodes.getNode(node.server);
+ node.topic = n.topic;
+ node.ack = n.ack;
- this.serverConfig = RED.nodes.getNode(this.server);
- this.stompClientOpts = {
- address: this.serverConfig.server,
- port: this.serverConfig.port * 1,
- user: this.serverConfig.username,
- pass: this.serverConfig.password,
- protocolVersion: this.serverConfig.protocolversion,
- reconnectOpts: {
- retries: this.serverConfig.reconnectretries * 1,
- delay: this.serverConfig.reconnectdelay * 1
+ if (node.serverConnection) {
+ setStatusDisconnected(node);
+
+ if (node.topic) {
+ node.serverConnection.register(node, function() {
+ node.serverConnection.subscribe(node.topic, node.ack, function(msg) {
+ node.send(msg);
+ });
+ });
+
+ if (node.serverConnection.connected) {
+ setStatusConnected(node);
+ }
}
- };
- if (this.serverConfig.vhost) {
- this.stompClientOpts.vhost = this.serverConfig.vhost;
+ } else {
+ node.error("Missing server config");
}
- var node = this;
- var msg = {topic:this.topic};
- node.client = new StompClient(node.stompClientOpts);
-
- node.client.on("connect", function() {
- node.status({fill:"green",shape:"dot",text:"connected"});
- });
-
- node.client.on("reconnecting", function() {
- node.status({fill:"red",shape:"ring",text:"reconnecting"});
- node.warn("reconnecting");
- });
-
- node.client.on("reconnect", function() {
- node.status({fill:"green",shape:"dot",text:"connected"});
- });
-
- node.client.on("error", function(error) {
- node.status({fill:"grey",shape:"dot",text:"error"});
- node.warn(error);
- });
-
- node.status({fill:"grey",shape:"ring",text:"connecting"});
- node.client.connect(function(sessionId) {
- node.log('subscribing to: '+node.topic);
- node.client.subscribe(node.topic, function(body, headers) {
- var newmsg={"headers":headers,"topic":node.topic}
- try {
- newmsg.payload = JSON.parse(body);
- }
- catch(e) {
- newmsg.payload = body;
- }
- node.send(newmsg);
- });
- }, function(error) {
- node.status({fill:"grey",shape:"dot",text:"error"});
- node.warn(error);
- });
-
- node.on("close", function(done) {
- if (node.client) {
- // disconnect can accept a callback - but it is not always called.
- node.client.disconnect();
+ node.on("close", function(removed, done) {
+ if (node.serverConnection) {
+ node.serverConnection.unsubscribe(node.topic);
+ node.serverConnection.deregister(node, true, done);
+ node.serverConnection = null;
+ } else {
+ done();
}
- done();
});
}
RED.nodes.registerType("stomp in",StompInNode);
@@ -97,65 +409,114 @@ module.exports = function(RED) {
function StompOutNode(n) {
RED.nodes.createNode(this,n);
- this.server = n.server;
- this.topic = n.topic;
+ /** @type { StompOutNode } */
+ const node = this;
+ node.server = n.server;
+ /** @type { StompServerNode } */
+ node.serverConnection = RED.nodes.getNode(node.server);
+ node.topic = n.topic;
- this.serverConfig = RED.nodes.getNode(this.server);
- this.stompClientOpts = {
- address: this.serverConfig.server,
- port: this.serverConfig.port * 1,
- user: this.serverConfig.username,
- pass: this.serverConfig.password,
- protocolVersion: this.serverConfig.protocolversion,
- reconnectOpts: {
- retries: this.serverConfig.reconnectretries * 1,
- delay: this.serverConfig.reconnectdelay * 1
+ if (node.serverConnection) {
+ setStatusDisconnected(node);
+
+ node.on("input", function(msg, send, done) {
+ if (node.topic && msg.payload) {
+ try {
+ msg.payload = JSON.stringify(msg.payload);
+ } catch {
+ msg.payload = `${msg.payload}`;
+ }
+ node.serverConnection.publish(node.topic, msg.payload, msg.headers || {});
+ done();
+ }
+ });
+
+ node.serverConnection.register(node);
+ if (node.serverConnection.connected) {
+ setStatusConnected(node);
}
- };
- if (this.serverConfig.vhost) {
- this.stompClientOpts.vhost = this.serverConfig.vhost;
+ } else {
+ node.error("Missing server config");
}
- var node = this;
- node.client = new StompClient(node.stompClientOpts);
-
- node.client.on("connect", function() {
- node.status({fill:"green",shape:"dot",text:"connected"});
- });
-
- node.client.on("reconnecting", function() {
- node.status({fill:"red",shape:"ring",text:"reconnecting"});
- node.warn("reconnecting");
- });
-
- node.client.on("reconnect", function() {
- node.status({fill:"green",shape:"dot",text:"connected"});
- });
-
- node.client.on("error", function(error) {
- node.status({fill:"grey",shape:"dot",text:"error"});
- node.warn(error);
- });
-
- node.status({fill:"grey",shape:"ring",text:"connecting"});
- node.client.connect(function(sessionId) {
- }, function(error) {
- node.status({fill:"grey",shape:"dot",text:"error"});
- node.warn(error);
- });
-
- node.on("input", function(msg) {
- node.client.publish(node.topic || msg.topic, msg.payload, msg.headers);
- });
-
- node.on("close", function(done) {
- if (node.client) {
- // disconnect can accept a callback - but it is not always called.
- node.client.disconnect();
+ node.on("close", function(removed, done) {
+ if (node.serverConnection) {
+ node.serverConnection.deregister(node, true, done);
+ node.serverConnection = null;
+ } else {
+ done();
}
- done();
});
}
RED.nodes.registerType("stomp out",StompOutNode);
+ function StompAckNode(n) {
+ RED.nodes.createNode(this,n);
+ /** @type { StompOutNode } */
+ const node = this;
+ node.server = n.server;
+ /** @type { StompServerNode } */
+ node.serverConnection = RED.nodes.getNode(node.server);
+ node.topic = n.topic;
+
+ if (node.serverConnection) {
+ setStatusDisconnected(node);
+
+ node.on("input", function(msg, send, done) {
+ node.serverConnection.ack(node.topic, msg.messageId, msg.transaction);
+ done();
+ });
+
+ node.serverConnection.register(node);
+ if (node.serverConnection.connected) {
+ setStatusConnected(node);
+ }
+ } else {
+ node.error("Missing server config");
+ }
+
+ node.on("close", function(removed, done) {
+ if (node.serverConnection) {
+ node.serverConnection.deregister(node, true, done);
+ node.serverConnection = null;
+ } else {
+ done();
+ }
+ });
+ }
+ RED.nodes.registerType("stomp ack",StompAckNode);
+
};
+
+
+// ----------------------------------------------
+// ----------------- Helpers --------------------
+// ----------------------------------------------
+ /**
+ * Helper function for applying changes to an objects properties ONLY when the src object actually has the property.
+ * This avoids setting a `dst` property null/undefined when the `src` object doesnt have the named property.
+ * @param {object} src Source object containing properties
+ * @param {object} dst Destination object to set property
+ * @param {string} propName The property name to set in the Destination object
+ * @param {boolean} force force the dst property to be updated/created even if src property is empty
+ */
+ function setIfHasProperty(src, dst, propName, force) {
+ if (src && dst && propName) {
+ const ok = force || hasProperty(src, propName);
+ if (ok) {
+ dst[propName] = src[propName];
+ }
+ }
+ }
+
+ /**
+ * Helper function to test an object has a property
+ * @param {object} obj Object to test
+ * @param {string} propName Name of property to find
+ * @returns true if object has property `propName`
+ */
+ function hasProperty(obj, propName) {
+ //JavaScript does not protect the property name hasOwnProperty
+ //Object.prototype.hasOwnProperty.call is the recommended/safer test
+ return Object.prototype.hasOwnProperty.call(obj, propName);
+ }
\ No newline at end of file