1
0
mirror of https://github.com/node-red/node-red-nodes.git synced 2023-10-10 13:36:58 +02:00

STOMP refactor in accordance with MQTT (shared connection using config node) + client ACK (#988)

* Bugfix: show connected state after reconnect

* Bugfix: show connected state for stomp out node after reconnect

* Add support for ACK messages

* Add optional subscription id

* Typo fix

* Subscription ID not required

* Bugfix "node.ack is not a fuction"

* Use shared client connection

* Bugfix shared connection

* Improvements & bugfixes to shared connection

* Bugfix connecting state

* Set connected state in connect callback

* Typo fix

* add server shared connection variables

* Bugfix for shared state

* WIP

* Complete refactor based on MQTT nodes to be able to share server connection between nodes

* Change address back to server for backwards compatibility

* Fixes for race conditions on node closing

* Add disconnect timeout of 2s to avoid "Error stopping node"

* If not connected, do not try to disconnect

* Fix for disconnecting log

* Styling fix for ack select form row

* Typo fixes

* Typo fix

* Bugfix: subscription before connected

* Bugfix: stringify payload before sending to be able to send numbers etc

* Bugfix: not saving ack field

* Bugfix: ack

* Bugfix: ack

* Bugfix: ack & better docs regardign ack

* BugFix: reconnect delay

* Improvements regarding cleanup on close

* Handle connect and reconnect event in the same way

* Typo fix

* Fix backwards compatibility
This commit is contained in:
Olivier Verhaegen 2023-05-30 12:05:25 +02:00 committed by GitHub
parent 490a3d8d37
commit 0d7f0cb16d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 554 additions and 127 deletions

View File

@ -8,6 +8,18 @@
<label for="node-input-topic" style="width: 110px;"><i class="fa fa-envelope"></i> Destination</label> <label for="node-input-topic" style="width: 110px;"><i class="fa fa-envelope"></i> Destination</label>
<input type="text" id="node-input-topic" placeholder="topic or queue"> <input type="text" id="node-input-topic" placeholder="topic or queue">
</div> </div>
<div class="form-row">
<label for="node-input-ack" style="width: 110px;"><i class="fa fa-check"></i> Message acknowledgment</label>
<select type="text" id="node-input-ack" style="display: inline-block; width: 250px; vertical-align: top;">
<option value="auto">Auto</option>
<option value="client">Client</option>
<option value="client-individual">Client individual (only v1.1)</option>
</select>
</div>
<div class="form-tips">
Enabling the ACK (acknowledgment) will set the <code>ack</code> header to <code>client</code> while subscribing to topics.
This means the items on the broker queue will not be dequeue'd unless an ACK message is sent using the <code>ack</code> node.
</div>
<div class="form-row"> <div class="form-row">
<label for="node-input-name" style="width: 110px;"><i class="fa fa-tag"></i> Name</label> <label for="node-input-name" style="width: 110px;"><i class="fa fa-tag"></i> Name</label>
<input type="text" id="node-input-name" placeholder="Name"> <input type="text" id="node-input-name" placeholder="Name">
@ -33,6 +45,7 @@
defaults: { defaults: {
name: {value:""}, name: {value:""},
server: {type:"stomp-server",required:true}, server: {type:"stomp-server",required:true},
ack: {value: "auto", required: true},
topic: {value:"",required:true} topic: {value:"",required:true}
}, },
inputs:0, inputs:0,
@ -148,8 +161,8 @@
port: {value:61613,required:true,validate:RED.validators.number()}, port: {value:61613,required:true,validate:RED.validators.number()},
protocolversion: {value:"1.0",required:true}, protocolversion: {value:"1.0",required:true},
vhost: {}, vhost: {},
reconnectretries: {value:"0",required:true,validate:RED.validators.number()}, reconnectretries: {value:0,required:true,validate:RED.validators.number()},
reconnectdelay: {value:"0.5",required:true,validate:RED.validators.number()}, reconnectdelay: {value:1,required:true,validate:RED.validators.number()},
name: {} name: {}
}, },
credentials: { credentials: {
@ -161,3 +174,56 @@
} }
}); });
</script> </script>
<script type="text/html" data-template-name="stomp ack">
<div class="form-row">
<label for="node-input-server" style="width: 110px;"><i class="fa fa-bookmark"></i> Server</label>
<input type="text" id="node-input-server">
</div>
<div class="form-row">
<label for="node-input-topic" style="width: 110px;"><i class="fa fa-envelope"></i> Destination</label>
<input type="text" id="node-input-topic" placeholder="topic or queue">
</div>
<div class="form-row">
<label for="node-input-name" style="width: 110px;"><i class="fa fa-tag"></i> Name</label>
<input type="text" id="node-input-name" placeholder="Name">
</div>
</script>
<script type="text/html" data-help-name="stomp ack">
<p>
ACK is used to acknowledge consumption of a message from a subscription using client acknowledgment. When a client has issued a SUBSCRIBE frame with the ack header set to client any messages received from that destination will not be considered to have been consumed (by the server) until the message has been acknowledged via an ACK.
</p>
<p>
The node allows for following inputs:
<ul>
<li><code>msg.messageId</code>: The id of the message to acknowledge</li>
<li><code>msg.transaction</code>: Optional transaction name</li>
</ul>
</p>
<p>
See the <a href="https://stomp.github.io/stomp-specification-1.0.html#frame-ACK" target="new">Stomp 1.0 spec</a> for more details.
</p>
</script>
<script type="text/javascript">
RED.nodes.registerType('stomp ack',{
category: 'output',
color:"#e8cfe8",
defaults: {
name: {value:""},
server: {type:"stomp-server",required:true},
topic: {value:""}
},
inputs:1,
outputs:0,
icon: "bridge.png",
align: "right",
label: function() {
return this.name||"stomp";
},
labelStyle: function() {
return (this.name)?"node_label_italic":"";
}
});
</script>

View File

@ -2,94 +2,406 @@
module.exports = function(RED) { module.exports = function(RED) {
"use strict"; "use strict";
var StompClient = require('stomp-client'); var StompClient = require('stomp-client');
var querystring = require('querystring');
// ----------------------------------------------
// ------------------- State --------------------
// ----------------------------------------------
function updateStatus(node, allNodes) {
let setStatus = setStatusDisconnected;
if (node.connecting) {
setStatus = setStatusConnecting;
} else if (node.connected) {
setStatus = setStatusConnected;
}
setStatus(node, allNodes);
}
function setStatusDisconnected(node, allNodes) {
if (allNodes) {
for (let id in node.users) {
if (hasProperty(node.users, id)) {
node.users[id].status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected"});
}
}
} else {
node.status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected" })
}
}
function setStatusConnecting(node, allNodes) {
if(allNodes) {
for (var id in node.users) {
if (hasProperty(node.users, id)) {
node.users[id].status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" });
}
}
} else {
node.status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" });
}
}
function setStatusConnected(node, allNodes) {
if(allNodes) {
for (var id in node.users) {
if (hasProperty(node.users, id)) {
node.users[id].status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" });
}
}
} else {
node.status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" });
}
}
function setStatusError(node, allNodes) {
if(allNodes) {
for (var id in node.users) {
if (hasProperty(node.users, id)) {
node.users[id].status({ fill: "red", shape: "dot", text: "error" });
}
}
} else {
node.status({ fill: "red", shape: "dot", text: "error" });
}
}
// ----------------------------------------------
// ------------------- Nodes --------------------
// ----------------------------------------------
function StompServerNode(n) { function StompServerNode(n) {
RED.nodes.createNode(this,n); RED.nodes.createNode(this,n);
this.server = n.server; const node = this;
this.port = n.port; // To keep track of processing nodes that use this config node for their connection
this.protocolversion = n.protocolversion; node.users = {};
this.vhost = n.vhost; // Config node state
this.reconnectretries = n.reconnectretries || 999999; node.connected = false;
this.reconnectdelay = (n.reconnectdelay || 15) * 1000; node.connecting = false;
this.name = n.name; /** Flag to avoid race conditions between `deregister` and the `close` event of the config node (ex. on redeploy) */
this.username = this.credentials.user; node.closing = false;
this.password = this.credentials.password; /** Options to pass to the stomp-client API */
node.options = {};
node.sessionId = null;
node.subscribtionIndex = 1;
node.subscriptionIds = {};
/** @type { StompClient } */
node.client;
node.setOptions = function(options, init) {
if (!options || typeof options !== "object") {
return; // Nothing to change
}
// Apply property changes (only if the property exists in the options object)
setIfHasProperty(options, node, "server", init);
setIfHasProperty(options, node, "port", init);
setIfHasProperty(options, node, "protocolversion", init);
setIfHasProperty(options, node, "vhost", init);
setIfHasProperty(options, node, "reconnectretries", init);
setIfHasProperty(options, node, "reconnectdelay", init);
if (node.credentials) {
node.username = node.credentials.username;
node.password = node.credentials.password;
}
if (!init && hasProperty(options, "username")) {
node.username = options.username;
}
if (!init && hasProperty(options, "password")) {
node.password = options.password;
}
// Build options for passing to the stomp-client API
node.options = {
address: node.server,
port: node.port * 1,
user: node.username,
pass: node.password,
protocolVersion: node.protocolversion,
reconnectOpts: {
retries: node.reconnectretries * 1,
delay: node.reconnectdelay * 1000
},
vhost: node.vhost
};
}
node.setOptions(n, true);
/**
* Register a STOMP processing node to the connection.
* @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to register
* @param { Function } callback
*/
node.register = function(stompNode, callback = () => {}) {
node.users[stompNode.id] = stompNode;
// Auto connect when first STOMP processing node is added
if (Object.keys(node.users).length === 1) {
node.connect(callback);
} else {
callback();
}
}
/**
* Remove registered STOMP processing nodes from the connection.
* @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to unregister
* @param { Boolean } autoDisconnect Automatically disconnect from the STOM server when no processing nodes registered to the connection
* @param { Function } callback
*/
node.deregister = function(stompNode, autoDisconnect, callback = () => {}) {
delete node.users[stompNode.id];
if (autoDisconnect && !node.closing && node.connected && Object.keys(node.users).length === 0) {
node.disconnect(callback);
} else {
callback();
}
}
/**
* Wether a new connection can be made.
* @returns `true` or `false`
*/
node.canConnect = function() {
return !node.connected && !node.connecting;
}
/**
* Connect to the STOMP server.
* @param {Function} callback
*/
node.connect = function(callback = () => {}) {
if (node.canConnect()) {
node.closing = false;
node.connecting = true;
setStatusConnecting(node, true);
try {
// Remove left over client if needed
if (node.client) {
node.client.disconnect();
node.client = null;
}
node.client = new StompClient(node.options);
node.client.on("connect", function() {
node.closing = false;
node.connecting = false;
node.connected = true;
callback();
node.log("Connected to STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion});
setStatusConnected(node, true);
});
node.client.on("reconnect", function(sessionId, numOfRetries) {
node.closing = false;
node.connecting = false;
node.connected = true;
node.sessionId = sessionId;
callback();
node.log("Reconnected to STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion, retries: numOfRetries});
setStatusConnected(node, true);
});
node.client.on("reconnecting", function() {
node.warn("reconnecting");
node.connecting = true;
node.connected = false;
node.log("Reconnecting to STOMP server...", {url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion});
setStatusConnecting(node, true);
});
node.client.on("error", function(err) {
node.error(err);
setStatusError(node, true);
});
node.client.connect(function(sessionId) {
node.sessionId = sessionId;
});
} catch (err) {
node.error(err);
}
} else {
node.log("Not connecting to STOMP server, already connected");
callback();
}
}
/**
* Disconnect from the STOMP server.
* @param {Function} callback
*/
node.disconnect = function(callback = () => {}) {
const waitDisconnect = (client, timeout) => {
return new Promise((resolve, reject) => {
// Set flag to avoid race conditions for disconnect as every node tries to call it directly or indirectly using deregister
node.closing = true;
const t = setTimeout(() => {
reject();
}, timeout);
client.disconnect(() => {
clearTimeout(t);
resolve();
});
});
}
if (!node.client) {
node.warn("Can't disconnect, connection not initialized.");
callback();
} else if (node.closing || !node.connected) {
// Disconnection already in progress or not connected
callback();
} else {
node.log("Unsubscribing from STOMP queue's...");
const subscribedQueues = Object.keys(node.subscriptionIds);
subscribedQueues.forEach(function(queue) {
node.unsubscribe(queue);
});
node.log('Disconnecting from STOMP server...');
waitDisconnect(node.client, 2000).then(() => {
node.log("Disconnected from STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion})
}).catch(() => {
node.log("Disconnect timeout closing node...");
}).finally(() => {
node.sessionId = null;
node.subscribtionIndex = 1;
node.subscriptionIds = {};
node.connected = false;
node.connecting = false;
setStatusDisconnected(node, true);
callback();
});
}
}
/**
* Subscribe to a given STOMP queue.
* @param { String} queue The queue to subscribe to
* @param { "auto" | "client" | "client-individual" } clientAck Can be `auto`, `client` or `client-individual` (the latter only starting from STOMP v1.1)
* @param { Function } callback
*/
node.subscribe = function(queue, acknowledgment, callback) {
node.log(`Subscribing to: ${queue}`);
if (node.connected && !node.closing) {
if (!node.subscriptionIds[queue]) {
node.subscriptionIds[queue] = node.subscribtionIndex++;
}
const headers = {
id: node.subscriptionIds[queue],
// Only set client-individual if not v1.0
ack: acknowledgment === "client-individual" && node.options.protocolVersion === "1.0" ? "client" : acknowledgment
}
node.client.subscribe(queue, headers, function(body, responseHeaders) {
let msg = { headers: responseHeaders, topic: queue };
try {
msg.payload = JSON.parse(body);
} catch {
msg.payload = body;
}
callback(msg);
});
} else {
node.error("Can't subscribe, not connected");
}
}
/**
* Unsubscribe from a STOMP queue.
* @param {String} queue The STOMP queue to unsubscribe from
* @param {Object} headers Headers to add to the unsubscribe message
*/
node.unsubscribe = function(queue, headers = {}) {
delete node.subscriptionIds[queue];
if (node.connected && !node.closing) {
node.client.unsubscribe(queue, headers);
node.log(`Unsubscribed from ${queue}`, headers);
}
}
/**
* Publish a STOMP message on a queue.
* @param {String} queue The STOMP queue to publish to
* @param {any} message The message to send
* @param {Object} headers STOMP headers to add to the SEND command
*/
node.publish = function(queue, message, headers = {}) {
if (node.connected && !node.closing) {
node.client.publish(queue, message, headers);
} else {
node.error("Can't publish, not connected");
}
}
/**
* Acknowledge (a) message(s) that was received from the specified queue.
* @param {String} queue The queue/topic to send an acknowledgment for
* @param {String} messageId ID of the message that was received from the server, which can be found in the reponse header as `message-id`
* @param {String} transaction Optional transaction name
*/
node.ack = function(queue, messageId, transaction = undefined) {
if (node.connected && !node.closing) {
node.client.ack(messageId, node.subscriptionIds[queue], transaction);
} else {
node.error("Can't send acknowledgment, not connected");
}
}
node.on("close", function(done) {
node.disconnect(function() { done (); });
});
} }
RED.nodes.registerType("stomp-server",StompServerNode,{ RED.nodes.registerType("stomp-server", StompServerNode, {
credentials: { credentials: {
user: {type:"text"}, user: { type: "text" },
password: {type: "password"} password: { type: "password" }
} }
}); });
function StompInNode(n) { function StompInNode(n) {
RED.nodes.createNode(this,n); RED.nodes.createNode(this,n);
this.server = n.server; /** @type { StompInNode } */
this.topic = n.topic; const node = this;
node.server = n.server;
/** @type { StompServerNode } */
node.serverConnection = RED.nodes.getNode(node.server);
node.topic = n.topic;
node.ack = n.ack;
this.serverConfig = RED.nodes.getNode(this.server); if (node.serverConnection) {
this.stompClientOpts = { setStatusDisconnected(node);
address: this.serverConfig.server,
port: this.serverConfig.port * 1, if (node.topic) {
user: this.serverConfig.username, node.serverConnection.register(node, function() {
pass: this.serverConfig.password, node.serverConnection.subscribe(node.topic, node.ack, function(msg) {
protocolVersion: this.serverConfig.protocolversion, node.send(msg);
reconnectOpts: { });
retries: this.serverConfig.reconnectretries * 1, });
delay: this.serverConfig.reconnectdelay * 1
if (node.serverConnection.connected) {
setStatusConnected(node);
}
} }
}; } else {
if (this.serverConfig.vhost) { node.error("Missing server config");
this.stompClientOpts.vhost = this.serverConfig.vhost;
} }
var node = this; node.on("close", function(removed, done) {
var msg = {topic:this.topic}; if (node.serverConnection) {
node.client = new StompClient(node.stompClientOpts); node.serverConnection.unsubscribe(node.topic);
node.serverConnection.deregister(node, true, done);
node.client.on("connect", function() { node.serverConnection = null;
node.status({fill:"green",shape:"dot",text:"connected"}); } else {
}); done();
node.client.on("reconnecting", function() {
node.status({fill:"red",shape:"ring",text:"reconnecting"});
node.warn("reconnecting");
});
node.client.on("reconnect", function() {
node.status({fill:"green",shape:"dot",text:"connected"});
});
node.client.on("error", function(error) {
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
});
node.status({fill:"grey",shape:"ring",text:"connecting"});
node.client.connect(function(sessionId) {
node.log('subscribing to: '+node.topic);
node.client.subscribe(node.topic, function(body, headers) {
var newmsg={"headers":headers,"topic":node.topic}
try {
newmsg.payload = JSON.parse(body);
}
catch(e) {
newmsg.payload = body;
}
node.send(newmsg);
});
}, function(error) {
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
});
node.on("close", function(done) {
if (node.client) {
// disconnect can accept a callback - but it is not always called.
node.client.disconnect();
} }
done();
}); });
} }
RED.nodes.registerType("stomp in",StompInNode); RED.nodes.registerType("stomp in",StompInNode);
@ -97,65 +409,114 @@ module.exports = function(RED) {
function StompOutNode(n) { function StompOutNode(n) {
RED.nodes.createNode(this,n); RED.nodes.createNode(this,n);
this.server = n.server; /** @type { StompOutNode } */
this.topic = n.topic; const node = this;
node.server = n.server;
/** @type { StompServerNode } */
node.serverConnection = RED.nodes.getNode(node.server);
node.topic = n.topic;
this.serverConfig = RED.nodes.getNode(this.server); if (node.serverConnection) {
this.stompClientOpts = { setStatusDisconnected(node);
address: this.serverConfig.server,
port: this.serverConfig.port * 1, node.on("input", function(msg, send, done) {
user: this.serverConfig.username, if (node.topic && msg.payload) {
pass: this.serverConfig.password, try {
protocolVersion: this.serverConfig.protocolversion, msg.payload = JSON.stringify(msg.payload);
reconnectOpts: { } catch {
retries: this.serverConfig.reconnectretries * 1, msg.payload = `${msg.payload}`;
delay: this.serverConfig.reconnectdelay * 1 }
node.serverConnection.publish(node.topic, msg.payload, msg.headers || {});
done();
}
});
node.serverConnection.register(node);
if (node.serverConnection.connected) {
setStatusConnected(node);
} }
}; } else {
if (this.serverConfig.vhost) { node.error("Missing server config");
this.stompClientOpts.vhost = this.serverConfig.vhost;
} }
var node = this; node.on("close", function(removed, done) {
node.client = new StompClient(node.stompClientOpts); if (node.serverConnection) {
node.serverConnection.deregister(node, true, done);
node.client.on("connect", function() { node.serverConnection = null;
node.status({fill:"green",shape:"dot",text:"connected"}); } else {
}); done();
node.client.on("reconnecting", function() {
node.status({fill:"red",shape:"ring",text:"reconnecting"});
node.warn("reconnecting");
});
node.client.on("reconnect", function() {
node.status({fill:"green",shape:"dot",text:"connected"});
});
node.client.on("error", function(error) {
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
});
node.status({fill:"grey",shape:"ring",text:"connecting"});
node.client.connect(function(sessionId) {
}, function(error) {
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
});
node.on("input", function(msg) {
node.client.publish(node.topic || msg.topic, msg.payload, msg.headers);
});
node.on("close", function(done) {
if (node.client) {
// disconnect can accept a callback - but it is not always called.
node.client.disconnect();
} }
done();
}); });
} }
RED.nodes.registerType("stomp out",StompOutNode); RED.nodes.registerType("stomp out",StompOutNode);
function StompAckNode(n) {
RED.nodes.createNode(this,n);
/** @type { StompOutNode } */
const node = this;
node.server = n.server;
/** @type { StompServerNode } */
node.serverConnection = RED.nodes.getNode(node.server);
node.topic = n.topic;
if (node.serverConnection) {
setStatusDisconnected(node);
node.on("input", function(msg, send, done) {
node.serverConnection.ack(node.topic, msg.messageId, msg.transaction);
done();
});
node.serverConnection.register(node);
if (node.serverConnection.connected) {
setStatusConnected(node);
}
} else {
node.error("Missing server config");
}
node.on("close", function(removed, done) {
if (node.serverConnection) {
node.serverConnection.deregister(node, true, done);
node.serverConnection = null;
} else {
done();
}
});
}
RED.nodes.registerType("stomp ack",StompAckNode);
}; };
// ----------------------------------------------
// ----------------- Helpers --------------------
// ----------------------------------------------
/**
* Helper function for applying changes to an objects properties ONLY when the src object actually has the property.
* This avoids setting a `dst` property null/undefined when the `src` object doesnt have the named property.
* @param {object} src Source object containing properties
* @param {object} dst Destination object to set property
* @param {string} propName The property name to set in the Destination object
* @param {boolean} force force the dst property to be updated/created even if src property is empty
*/
function setIfHasProperty(src, dst, propName, force) {
if (src && dst && propName) {
const ok = force || hasProperty(src, propName);
if (ok) {
dst[propName] = src[propName];
}
}
}
/**
* Helper function to test an object has a property
* @param {object} obj Object to test
* @param {string} propName Name of property to find
* @returns true if object has property `propName`
*/
function hasProperty(obj, propName) {
//JavaScript does not protect the property name hasOwnProperty
//Object.prototype.hasOwnProperty.call is the recommended/safer test
return Object.prototype.hasOwnProperty.call(obj, propName);
}