1
0
mirror of https://github.com/node-red/node-red-nodes.git synced 2023-10-10 13:36:58 +02:00
node-red-nodes/io/stomp/18-stomp.js
Olivier Verhaegen 45b43ebb21
STOMP bugfix: usage of multiple stomp in nodes (#1015)
* Bugfix: only execute node register callbacks after connection to the server has been made

* Bugfix: only execute direct callback of register node register when connected to STOMP server

* Doc: Add tip about reconnection to server
2023-07-11 19:58:51 +01:00

545 lines
21 KiB
JavaScript

module.exports = function(RED) {
"use strict";
var StompClient = require('stomp-client');
// ----------------------------------------------
// ------------------- State --------------------
// ----------------------------------------------
function updateStatus(node, allNodes) {
let setStatus = setStatusDisconnected;
if (node.connecting) {
setStatus = setStatusConnecting;
} else if (node.connected) {
setStatus = setStatusConnected;
}
setStatus(node, allNodes);
}
function setStatusDisconnected(node, allNodes) {
if (allNodes) {
for (let id in node.users) {
if (hasProperty(node.users, id)) {
node.users[id].status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected"});
}
}
} else {
node.status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected" })
}
}
function setStatusConnecting(node, allNodes) {
if(allNodes) {
for (var id in node.users) {
if (hasProperty(node.users, id)) {
node.users[id].status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" });
}
}
} else {
node.status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" });
}
}
function setStatusConnected(node, allNodes) {
if(allNodes) {
for (var id in node.users) {
if (hasProperty(node.users, id)) {
node.users[id].status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" });
}
}
} else {
node.status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" });
}
}
function setStatusError(node, allNodes) {
if(allNodes) {
for (var id in node.users) {
if (hasProperty(node.users, id)) {
node.users[id].status({ fill: "red", shape: "dot", text: "error" });
}
}
} else {
node.status({ fill: "red", shape: "dot", text: "error" });
}
}
// ----------------------------------------------
// ------------------- Nodes --------------------
// ----------------------------------------------
function StompServerNode(n) {
RED.nodes.createNode(this,n);
const node = this;
// To keep track of processing nodes that use this config node for their connection
node.users = {};
// Config node state
node.connected = false;
node.connecting = false;
/** Flag to avoid race conditions between `deregister` and the `close` event of the config node (ex. on redeploy) */
node.closing = false;
/** Options to pass to the stomp-client API */
node.options = {};
node.sessionId = null;
node.subscribtionIndex = 1;
node.subscriptionIds = {};
/** Array of callbacks to be called once the connection to the broker has been made */
node.connectedCallbacks = [];
/** @type { StompClient } */
node.client;
node.setOptions = function(options, init) {
if (!options || typeof options !== "object") {
return; // Nothing to change
}
// Apply property changes (only if the property exists in the options object)
setIfHasProperty(options, node, "server", init);
setIfHasProperty(options, node, "port", init);
setIfHasProperty(options, node, "protocolversion", init);
setIfHasProperty(options, node, "vhost", init);
setIfHasProperty(options, node, "reconnectretries", init);
setIfHasProperty(options, node, "reconnectdelay", init);
if (node.credentials) {
node.username = node.credentials.user;
node.password = node.credentials.password;
}
if (!init && hasProperty(options, "username")) {
node.username = options.username;
}
if (!init && hasProperty(options, "password")) {
node.password = options.password;
}
// Build options for passing to the stomp-client API
node.options = {
address: node.server,
port: node.port * 1,
user: node.username,
pass: node.password,
protocolVersion: node.protocolversion,
reconnectOpts: {
retries: node.reconnectretries * 1,
delay: node.reconnectdelay * 1000
},
vhost: node.vhost
};
}
node.setOptions(n, true);
/**
* Register a STOMP processing node to the connection.
* @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to register
* @param { Function } callback
*/
node.register = function(stompNode, callback = () => {}) {
node.users[stompNode.id] = stompNode;
if (!node.connected) {
node.connectedCallbacks.push(callback);
}
// Auto connect when first STOMP processing node is added
if (Object.keys(node.users).length === 1) {
node.connect(() => {
while (node.connectedCallbacks.length) {
node.connectedCallbacks.shift().call();
}
});
} else if (node.connected) {
// Execute callback directly as the connection to the STOMP server has already been made
callback();
}
}
/**
* Remove registered STOMP processing nodes from the connection.
* @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to unregister
* @param { Boolean } autoDisconnect Automatically disconnect from the STOM server when no processing nodes registered to the connection
* @param { Function } callback
*/
node.deregister = function(stompNode, autoDisconnect, callback = () => {}) {
delete node.users[stompNode.id];
if (autoDisconnect && !node.closing && node.connected && Object.keys(node.users).length === 0) {
node.disconnect(callback);
} else {
callback();
}
}
/**
* Wether a new connection can be made.
* @returns `true` or `false`
*/
node.canConnect = function() {
return !node.connected && !node.connecting;
}
/**
* Connect to the STOMP server.
* @param {Function} callback
*/
node.connect = function(callback = () => {}) {
if (node.canConnect()) {
node.closing = false;
node.connecting = true;
setStatusConnecting(node, true);
try {
// Remove left over client if needed
if (node.client) {
node.client.disconnect();
node.client = null;
}
node.client = new StompClient(node.options);
node.client.on("connect", function(sessionId) {
node.closing = false;
node.connecting = false;
node.connected = true;
node.sessionId = sessionId;
node.log(`Connected to STOMP server, sessionId: ${node.sessionId}, url: ${node.options.address}:${node.options.port}, protocolVersion: ${node.options.protocolVersion}`);
setStatusConnected(node, true);
callback();
});
node.client.on("reconnect", function(sessionId, numOfRetries) {
node.closing = false;
node.connecting = false;
node.connected = true;
node.sessionId = sessionId;
node.log(`Reconnected to STOMP server, sessionId: ${node.sessionId}, url: ${node.options.address}:${node.options.port}, protocolVersion: ${node.options.protocolVersion}, retries: ${numOfRetries}`);
setStatusConnected(node, true);
callback();
});
node.client.on("reconnecting", function() {
node.warn("reconnecting");
node.connecting = true;
node.connected = false;
node.log("Reconnecting to STOMP server...", {url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion});
setStatusConnecting(node, true);
});
node.client.on("error", function(err) {
node.error(err);
setStatusError(node, true);
});
node.client.connect();
} catch (err) {
node.error(err);
}
} else {
node.log("Not connecting to STOMP server, already connected");
callback();
}
}
/**
* Disconnect from the STOMP server.
* @param {Function} callback
*/
node.disconnect = function(callback = () => {}) {
const waitDisconnect = (client, timeout) => {
return new Promise((resolve, reject) => {
// Set flag to avoid race conditions for disconnect as every node tries to call it directly or indirectly using deregister
node.closing = true;
const t = setTimeout(() => {
reject();
}, timeout);
client.disconnect(() => {
clearTimeout(t);
resolve();
});
});
}
if (!node.client) {
node.warn("Can't disconnect, connection not initialized.");
callback();
} else if (node.closing || !node.connected) {
// Disconnection already in progress or not connected
callback();
} else {
const subscribedQueues = Object.keys(node.subscriptionIds);
subscribedQueues.forEach(function(queue) {
node.unsubscribe(queue);
});
node.log('Disconnecting from STOMP server...');
waitDisconnect(node.client, 2000).then(() => {
node.log(`Disconnected from STOMP server, sessionId: ${node.sessionId}, url: ${node.options.address}:${node.options.port}, protocolVersion: ${node.options.protocolVersion}`)
}).catch(() => {
node.log("Disconnect timeout closing node...");
}).finally(() => {
node.sessionId = null;
node.subscribtionIndex = 1;
node.subscriptionIds = {};
node.connected = false;
node.connecting = false;
setStatusDisconnected(node, true);
callback();
});
}
}
/**
* Subscribe to a given STOMP queue.
* @param { String} queue The queue to subscribe to
* @param { "auto" | "client" | "client-individual" } clientAck Can be `auto`, `client` or `client-individual` (the latter only starting from STOMP v1.1)
* @param { Function } callback
*/
node.subscribe = function(queue, acknowledgment, callback) {
node.log(`Subscribe to: ${queue}`);
if (node.connected && !node.closing) {
if (!node.subscriptionIds[queue]) {
node.subscriptionIds[queue] = node.subscribtionIndex++;
}
const headers = {
id: node.subscriptionIds[queue],
// Only set client-individual if not v1.0
ack: acknowledgment === "client-individual" && node.options.protocolVersion === "1.0" ? "client" : acknowledgment
}
node.client.subscribe(queue, headers, function(body, responseHeaders) {
let msg = { headers: responseHeaders, topic: queue };
try {
msg.payload = JSON.parse(body);
} catch {
msg.payload = body;
}
callback(msg);
});
} else {
node.error("Can't subscribe, not connected");
}
}
/**
* Unsubscribe from a STOMP queue.
* @param {String} queue The STOMP queue to unsubscribe from
* @param {Object} headers Headers to add to the unsubscribe message
*/
node.unsubscribe = function(queue, headers = {}) {
delete node.subscriptionIds[queue];
if (node.connected && !node.closing) {
node.client.unsubscribe(queue, headers);
node.log(`Unsubscribed from ${queue}, headers: ${JSON.stringify(headers)}`);
}
}
/**
* Publish a STOMP message on a queue.
* @param {String} queue The STOMP queue to publish to
* @param {any} message The message to send
* @param {Object} headers STOMP headers to add to the SEND command
*/
node.publish = function(queue, message, headers = {}) {
if (node.connected && !node.closing) {
node.client.publish(queue, message, headers);
} else {
node.error("Can't publish, not connected");
}
}
/**
* Acknowledge (a) message(s) that was received from the specified queue.
* @param {String} queue The queue/topic to send an acknowledgment for
* @param {String} messageId ID of the message that was received from the server, which can be found in the reponse header as `message-id`
* @param {String} transaction Optional transaction name
*/
node.ack = function(queue, messageId, transaction = undefined) {
if (node.connected && !node.closing) {
node.client.ack(messageId, node.subscriptionIds[queue], transaction);
} else {
node.error("Can't send acknowledgment, not connected");
}
}
node.on("close", function(done) {
node.disconnect(function() { done (); });
});
}
RED.nodes.registerType("stomp-server", StompServerNode, {
credentials: {
user: { type: "text" },
password: { type: "password" }
}
});
function StompInNode(n) {
RED.nodes.createNode(this,n);
/** @type { StompInNode } */
const node = this;
node.server = n.server;
/** @type { StompServerNode } */
node.serverConnection = RED.nodes.getNode(node.server);
node.topic = n.topic;
node.ack = n.ack;
if (node.serverConnection) {
setStatusDisconnected(node);
if (node.topic) {
node.serverConnection.register(node, function() {
node.serverConnection.subscribe(node.topic, node.ack, function(msg) {
node.send(msg);
});
});
if (node.serverConnection.connected) {
setStatusConnected(node);
}
}
} else {
node.error("Missing server config");
}
node.on("close", function(removed, done) {
if (node.serverConnection) {
node.serverConnection.unsubscribe(node.topic);
node.serverConnection.deregister(node, true, done);
node.serverConnection = null;
} else {
done();
}
});
}
RED.nodes.registerType("stomp in",StompInNode);
function StompOutNode(n) {
RED.nodes.createNode(this,n);
/** @type { StompOutNode } */
const node = this;
node.server = n.server;
/** @type { StompServerNode } */
node.serverConnection = RED.nodes.getNode(node.server);
node.topic = n.topic;
if (node.serverConnection) {
setStatusDisconnected(node);
node.on("input", function(msg, send, done) {
const topic = node.topic || msg.topic;
if (topic.length > 0 && msg.payload) {
try {
msg.payload = JSON.stringify(msg.payload);
} catch {
msg.payload = `${msg.payload}`;
}
node.serverConnection.publish(topic, msg.payload, msg.headers || {});
} else if (!topic.length > 0) {
node.warn('No valid publish topic');
} else {
node.warn('Payload or topic is undefined/null')
}
done();
});
node.serverConnection.register(node);
if (node.serverConnection.connected) {
setStatusConnected(node);
}
} else {
node.error("Missing server config");
}
node.on("close", function(removed, done) {
if (node.serverConnection) {
node.serverConnection.deregister(node, true, done);
node.serverConnection = null;
} else {
done();
}
});
}
RED.nodes.registerType("stomp out",StompOutNode);
function StompAckNode(n) {
RED.nodes.createNode(this,n);
/** @type { StompOutNode } */
const node = this;
node.server = n.server;
/** @type { StompServerNode } */
node.serverConnection = RED.nodes.getNode(node.server);
node.topic = n.topic;
if (node.serverConnection) {
setStatusDisconnected(node);
node.on("input", function(msg, send, done) {
const topic = node.topic || msg.topic;
if (topic.length > 0) {
node.serverConnection.ack(topic, msg.messageId, msg.transaction);
} else if (!topic.length > 0) {
node.warn('No valid publish topic');
} else {
node.warn('Payload or topic is undefined/null')
}
done();
});
node.serverConnection.register(node);
if (node.serverConnection.connected) {
setStatusConnected(node);
}
} else {
node.error("Missing server config");
}
node.on("close", function(removed, done) {
if (node.serverConnection) {
node.serverConnection.deregister(node, true, done);
node.serverConnection = null;
} else {
done();
}
});
}
RED.nodes.registerType("stomp ack",StompAckNode);
};
// ----------------------------------------------
// ----------------- Helpers --------------------
// ----------------------------------------------
/**
* Helper function for applying changes to an objects properties ONLY when the src object actually has the property.
* This avoids setting a `dst` property null/undefined when the `src` object doesnt have the named property.
* @param {object} src Source object containing properties
* @param {object} dst Destination object to set property
* @param {string} propName The property name to set in the Destination object
* @param {boolean} force force the dst property to be updated/created even if src property is empty
*/
function setIfHasProperty(src, dst, propName, force) {
if (src && dst && propName) {
const ok = force || hasProperty(src, propName);
if (ok) {
dst[propName] = src[propName];
}
}
}
/**
* Helper function to test an object has a property
* @param {object} obj Object to test
* @param {string} propName Name of property to find
* @returns true if object has property `propName`
*/
function hasProperty(obj, propName) {
//JavaScript does not protect the property name hasOwnProperty
//Object.prototype.hasOwnProperty.call is the recommended/safer test
return Object.prototype.hasOwnProperty.call(obj, propName);
}