Complete refactor based on MQTT nodes to be able to share server connection between nodes

This commit is contained in:
Olivier Verhaegen 2023-04-14 14:16:30 +02:00 committed by GitHub
parent 9b07b185aa
commit 8ab9aff3de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 223 additions and 289 deletions

View File

@ -9,12 +9,16 @@
<input type="text" id="node-input-topic" placeholder="topic or queue">
</div>
<div class="form-row">
<label for="node-input-enable_subscriptionid" style="width: 110px;"><i class="fa fa-envelope"></i> Enable subscription ID</label>
<input type="checkbox" id="node-input-enable_subscriptionid">
<label for="node-config-input-ack"><i class="fa fa-check"></i> Enable client acknowledgement</label>
<select type="text" id="node-config-input-ack" style="display: inline-block; width: 250px; vertical-align: top;">
<option value="auto">Auto</option>
<option value="client">Client</option>
<option value="client-individual">Client individual (only v1.1)</option>
</select>
</div>
<div class="form-row">
<label for="node-input-subscriptionid" style="width: 110px;"><i class="fa fa-envelope"></i> Subscription ID</label>
<input type="number" id="node-input-subscriptionid" placeholder="Default: 0">
<div class="form-tips">
Enabling the ACK (acknowledgement) will set the <code>ack</code> header to <code>client</code> while subscribing to topics.
This means the items on the broker queue will not be dequeue'd unless an ACK message is sent using the <code>ack</code> node.
</div>
<div class="form-row">
<label for="node-input-name" style="width: 110px;"><i class="fa fa-tag"></i> Name</label>
@ -41,8 +45,7 @@
defaults: {
name: {value:""},
server: {type:"stomp-server",required:true},
enable_subscriptionid: {value: false},
subscriptionid: {value: 0},
ack: {value: "auto"},
topic: {value:"",required:true}
},
inputs:0,
@ -132,14 +135,6 @@
<option value="1.1">v1.1</option>
</select>
</div>
<div class="form-row">
<label for="node-config-input-clientAcknowledgement"><i class="fa fa-check"></i> Enable client acknowledgement</label>
<input type="checkbox" id="node-config-input-clientAcknowledgement" />
</div>
<div class="form-tips">
Enabling the ACK (acknowledgement) will set the <code>ack</code> header to <code>client</code> while subscribing to topics.
This means the items on the broker queue will not be dequeue'd unless an ACK message is sent using the <code>ack</code> node.
</div>
<div class="form-row">
<label for="node-config-input-vhost"><i class="fa fa-server"></i> vhost</label>
<input type="text" id="node-config-input-vhost" placeholder="Default is null" />
@ -165,7 +160,6 @@
address: {required:true},
port: {value:61613,required:true,validate:RED.validators.number()},
protocolVersion: {value:"1.0",required:true},
clientAcknowledgement: {value: false},
vhost: {},
reconnectRetries: {value:0,required:true,validate:RED.validators.number()},
reconnectDelay: {value:1,required:true,validate:RED.validators.number()},
@ -186,6 +180,10 @@
<label for="node-input-server" style="width: 110px;"><i class="fa fa-bookmark"></i> Server</label>
<input type="text" id="node-input-server">
</div>
<div class="form-row">
<label for="node-input-topic" style="width: 110px;"><i class="fa fa-envelope"></i> Destination</label>
<input type="text" id="node-input-topic" placeholder="topic or queue">
</div>
<div class="form-row">
<label for="node-input-name" style="width: 110px;"><i class="fa fa-tag"></i> Name</label>
<input type="text" id="node-input-name" placeholder="Name">
@ -200,7 +198,6 @@
The node allows for following inputs:
<ul>
<li><code>msg.messageId</code>: The id of the message to acknowledge</li>
<li><code>msg.subscription</code>: The id of the subscription made using the stomp in node</li>
<li><code>msg.transaction</code>: Optional transaction name</li>
</ul>
</p>
@ -215,7 +212,8 @@
color:"#e8cfe8",
defaults: {
name: {value:""},
server: {type:"stomp-server",required:true}
server: {type:"stomp-server",required:true},
topic: {value:""}
},
inputs:1,
outputs:0,

View File

@ -2,7 +2,6 @@
module.exports = function(RED) {
"use strict";
var StompClient = require('stomp-client');
var querystring = require('querystring');
// ----------------------------------------------
// ------------------- State --------------------
@ -68,40 +67,8 @@ module.exports = function(RED) {
}
// ----------------------------------------------
// ---------------- Connection ------------------
// ------------------- Nodes --------------------
// ----------------------------------------------
function handleConnectAction(node, msg, done) {
const clientOptions = typeof msg.clientOptions === 'object' ? msg.clientOptions : null;
if (clientOptions) {
if (!node.client) {
// Client has not been initialized - initialize client and connect
node.client = new StompClient(clientOptions);
node.client.connect(function(sessionId) {
done(sessionId);
});
} else {
// Already connected/connecting/reconnecting
if (clientOptions.forceReconnect) {
// The force flag tells us to cycle the connection
node.client.disconnect(function() {
node.client = new StompClient(clientOptions);
node.client.connect(function(sessionId) {
done(sessionId)
})
});
}
}
} else {
done(new Error("No connection options provided"));
}
}
function handleDisconnectAction(node, done) {
node.client.disconnect(function() {
done();
});
}
function StompServerNode(n) {
RED.nodes.createNode(this,n);
const node = this;
@ -112,9 +79,10 @@ module.exports = function(RED) {
node.connecting = false;
node.closing = false;
node.options = {};
// node.subscriptions = {};
node.sessionId = null;
/** @type {StompClient} */
node.subscribtionIndex = 1;
node.subscriptionIds = {};
/** @type { StompClient } */
node.client;
node.setOptions = function(options, init) {
if (!options || typeof options !== "object") {
@ -125,7 +93,6 @@ module.exports = function(RED) {
setIfHasProperty(options, node, "address", init);
setIfHasProperty(options, node, "port", init);
setIfHasProperty(options, node, "protocolVersion", init);
setIfHasProperty(options, node, "clientAcknowledgement", init);
setIfHasProperty(options, node, "vhost", init);
setIfHasProperty(options, node, "reconnectRetries", init);
setIfHasProperty(options, node, "reconnectDelay", init);
@ -151,36 +118,52 @@ module.exports = function(RED) {
reconnectOpts: {
retries: node.reconnectRetries * 1,
delay: node.reconnectDelay * 1
}
},
vhost: node.vhost
};
}
node.setOptions(n, true);
// Define functions called by STOMP processing nodes
/**
* Register a STOMP processing node to the connection.
* @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to register
*/
node.register = function(stompNode) {
node.users[stompNode.id] = stompNode;
// Auto connect when first STOMP processing node is added
if (Object.keys(node.users).length === 1) {
node.connect();
// Update nodes status
setTimeout(function() { updateStatus(node, true) }, 1);
}
}
node.deregister = function(stompNode, done, autoDisconnect) {
/**
* Remove registered STOMP processing nodes from the connection.
* @param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to unregister
* @param { Boolean } autoDisconnect Automatically disconnect from the STOM server when no processing nodes registered to the connection
* @param { Function } callback
*/
node.deregister = function(stompNode, autoDisconnect, callback) {
delete node.users[stompNode.id];
if (autoDisconnect && !node.closing && node.connected && Object.keys(node.users).length === 0) {
node.disconnect(done);
node.disconnect(callback);
} else {
done();
callback();
}
}
/**
* Wether a new connection can be made.
* @returns `true` or `false`
*/
node.canConnect = function() {
return !node.connected && !node.connecting;
}
/**
* Connect to the STOMP server.
* @param {Function} callback
*/
node.connect = function(callback) {
if (node.canConnect()) {
node.closing = false;
@ -195,9 +178,6 @@ module.exports = function(RED) {
}
node.client = new StompClient(node.options);
node.client.connect(function(sessionId) {
node.sessionId = sessionId;
});
node.client.on("connect", function() {
node.closing = false;
@ -234,119 +214,156 @@ module.exports = function(RED) {
setStatusError(node, true);
});
node.client.connect(function(sessionId) {
node.sessionId = sessionId;
});
} catch (err) {
node.error(err);
}
}
}
/**
* Disconnect from the STOMP server.
* @param {Function} callback
*/
node.disconnect = function(callback) {
if (!node.client) {
node.warn("Can't disconnect, connection not initialized.");
callback();
} else {
node.client.disconnect(function() {
node.log("Disconnected from STOMP server", {sessionId: node.sessionId, url: `${node.options.address}:${node.options.port}`, protocolVersion: node.options.protocolVersion})
node.sessionId = null;
node.subscribtionIndex = 1;
node.subscriptionIds = {};
node.connected = false;
node.connecting = false;
setStatusDisconnected(node, true);
callback();
});
}
}
/**
* Subscribe to a given STOMP queue.
* @param { String} queue The queue to subscribe to
* @param { "auto" | "client" | "client-individual" } clientAck Can be `auto`, `client` or `client-individual` (the latter only starting from STOMP v1.1)
* @param { Function } callback
*/
node.subscribe = function(queue, acknowledgement, callback) {
node.log(`Subscribing to: ${queue}`);
if (node.connected) {
if (!node.subscriptionIds[queue]) {
node.subscriptionIds[queue] = node.subscribtionIndex++;
}
const headers = {
id: node.subscriptionIds[queue],
// Only set client-individual if not v1.0
ack: acknowledgement === "client-individual" && node.options.protocolVersion === "1.0" ? "client" : acknowledgement
}
node.client.subscribe(queue, headers, function(body, responseHeaders) {
let msg = { headers: responseHeaders, topic: queue };
try {
msg.payload = JSON.parse(body);
} catch {
msg.payload = body;
}
callback(msg);
});
} else {
node.error("Can't subscribe, not connected");
}
}
/**
* Unsubscribe from a STOMP queue.
* @param {String} queue The STOMP queue to unsubscribe from
* @param {Object} headers Headers to add to the unsubscribe message
*/
node.unsubscribe = function(queue, headers = {}) {
delete node.subscriptionIds[queue];
if (node.connected) {
node.client.unsubscribe(queue, headers);
node.log(`Unsubscribed from ${queue}`, headers);
}
}
/**
* Publish a STOMP message on a queue.
* @param {String} queue The STOMP queue to publish to
* @param {any} message The message to send
* @param {Object} headers STOMP headers to add to the SEND command
*/
node.publish = function(queue, message, headers = {}) {
if (node.connected) {
node.client.publish(queue, message, headers);
} else {
node.error("Can't publish, not connected");
}
}
node.ack = function(queue, messageId, transaction = undefined) {
if (node.connected) {
node.client.ack(messageId, subscriptionIds[queue], transaction);
} else {
node.error("Can't publish, not connected");
}
}
node.on("close", function(done) {
node.disconnect(function() { done (); });
});
}
RED.nodes.registerType("stomp-server",StompServerNode,{
RED.nodes.registerType("stomp-server", StompServerNode, {
credentials: {
user: {type:"text"},
password: {type: "password"}
user: { type: "text" },
password: { type: "password" }
}
});
function StompInNode(n) {
RED.nodes.createNode(this,n);
this.server = n.server;
this.topic = n.topic;
this.enableSubscriptionId = n.enable_subscriptionid;
this.subscriptionid = n.subscriptionid;
/** @type { StompInNode } */
const node = this;
node.server = n.server;
/** @type { StompServerNode } */
node.serverConnection = RED.nodes.getNode(node.server);
node.topic = n.topic;
node.ack = n.ack;
this.serverConfig = RED.nodes.getNode(this.server);
this.stompClientOpts = {
address: this.serverConfig.server,
port: this.serverConfig.port * 1,
user: this.serverConfig.username,
pass: this.serverConfig.password,
protocolVersion: this.serverConfig.protocolversion,
reconnectOpts: {
retries: this.serverConfig.reconnectretries * 1,
delay: this.serverConfig.reconnectdelay * 1
if (node.serverConnection) {
setStatusDisconnected(node);
if (node.topic) {
node.serverConnection.register(node);
node.serverConnection.subscribe(topic, ack, function(msg) {
node.send(msg);
});
if (node.serverConnection.connected) {
setStatusConnected(node);
}
}
};
if (this.serverConfig.vhost) {
this.stompClientOpts.vhost = this.serverConfig.vhost;
}
this.subscribeHeaders = {};
if (this.enableSubscriptionId) {
this.subscribeHeaders.id = this.subscriptionid;
}
if (this.serverConfig.ack) {
this.subscribeHeaders.ack = "client";
}
var node = this;
var msg = {topic:this.topic};
// Save the client connection to the shared server instance if needed
if (!node.serverConfig.clientConnection) {
node.serverConfig.clientConnection = new StompClient(node.stompClientOpts);
}
node.client = node.serverConfig.clientConnection;
node.client.on("connect", function() {
node.status({fill:"green",shape:"dot",text:"connected"});
node.serverConfig.connected = true;
});
node.client.on("reconnecting", function() {
node.status({fill:"red",shape:"ring",text:"reconnecting"});
node.warn("reconnecting");
node.serverConfig.connected = false;
});
node.client.on("reconnect", function() {
node.status({fill:"green",shape:"dot",text:"connected"});
node.serverConfig.connected = true;
});
node.client.on("error", function(error) {
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
});
// Connect to server if needed and subscribe
const subscribe = () => {
node.log('subscribing to: '+node.topic);
node.client.subscribe(node.topic, node.subscribeHeaders, function(body, headers) {
var newmsg={"headers":headers,"topic":node.topic}
try {
newmsg.payload = JSON.parse(body);
}
catch(e) {
newmsg.payload = body;
}
node.send(newmsg);
});
}
if (!node.serverConfig.connected) {
node.status({fill:"grey",shape:"ring",text:"connecting"});
node.client.connect(function(sessionId) {
node.serverConfig.connected = true;
subscribe();
}, function(error) {
node.serverConfig.connected = false;
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
});
} else {
subscribe();
node.error("Missing server config");
}
node.on("close", function(done) {
if (node.client) {
// disconnect can accept a callback - but it is not always called.
node.client.disconnect();
node.on("close", function(removed, done) {
if (node.serverConnection) {
if (node.serverConnection.connected) {
node.serverConnection.unsubscribe(node.topic);
}
node.serverConnection.deregister(node, true, done);
node.serverConnection = null;
} else {
done();
}
done();
});
}
RED.nodes.registerType("stomp in",StompInNode);
@ -354,156 +371,75 @@ module.exports = function(RED) {
function StompOutNode(n) {
RED.nodes.createNode(this,n);
this.server = n.server;
this.topic = n.topic;
/** @type { StompOutNode } */
const node = this;
node.server = n.server;
/** @type { StompServerNode } */
node.serverConnection = RED.nodes.getNode(node.server);
node.topic = n.topic;
this.serverConfig = RED.nodes.getNode(this.server);
this.stompClientOpts = {
address: this.serverConfig.server,
port: this.serverConfig.port * 1,
user: this.serverConfig.username,
pass: this.serverConfig.password,
protocolVersion: this.serverConfig.protocolversion,
reconnectOpts: {
retries: this.serverConfig.reconnectretries * 1,
delay: this.serverConfig.reconnectdelay * 1
}
};
if (this.serverConfig.vhost) {
this.stompClientOpts.vhost = this.serverConfig.vhost;
}
if (node.serverConnection) {
setStatusDisconnected(node);
var node = this;
// Save the client connection to the shared server instance if needed
if (!node.serverConfig.clientConnection) {
node.serverConfig.clientConnection = new StompClient(node.stompClientOpts);
}
node.client = node.serverConfig.clientConnection;
node.client.on("connect", function() {
node.status({fill:"green",shape:"dot",text:"connected"});
node.serverConfig.connected = true;
});
node.client.on("reconnecting", function() {
node.status({fill:"red",shape:"ring",text:"reconnecting"});
node.warn("reconnecting");
node.serverConfig.connected = false;
});
node.client.on("reconnect", function() {
node.status({fill:"green",shape:"dot",text:"connected"});
node.serverConfig.connected = true;
});
node.client.on("error", function(error) {
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
});
// Connect to server if needed
if(!node.serverConfig.connected) {
node.status({fill:"grey",shape:"ring",text:"connecting"});
node.client.connect(function(sessionId) {
node.serverConfig.connected = true;
}, function(error) {
node.serverConfig.connected = false;
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
node.on("input", function(msg, send, done) {
if (node.topic && msg.payload) {
node.serverConnection.publish(topic, msg.payload, msg.headers || {});
done();
}
});
node.serverConnection.register(node);
if (node.serverConnection.connected) {
setStatusConnected(node);
}
} else {
node.error("Missing server config");
}
node.on("input", function(msg) {
node.client.publish(node.topic || msg.topic, msg.payload, msg.headers);
});
node.on("close", function(done) {
if (node.client) {
// disconnect can accept a callback - but it is not always called.
node.client.disconnect();
node.on("close", function(removed, done) {
if (node.serverConnection) {
node.serverConnection.deregister(node, true, done);
node.serverConnection = null;
} else {
done();
}
done();
});
}
RED.nodes.registerType("stomp out",StompOutNode);
function StompAckNode(n) {
RED.nodes.createNode(this,n);
this.server = n.server;
/** @type { StompOutNode } */
const node = this;
node.server = n.server;
/** @type { StompServerNode } */
node.serverConnection = RED.nodes.getNode(node.server);
node.topic = n.topic;
this.serverConfig = RED.nodes.getNode(this.server);
this.stompClientOpts = {
address: this.serverConfig.server,
port: this.serverConfig.port * 1,
user: this.serverConfig.username,
pass: this.serverConfig.password,
protocolVersion: this.serverConfig.protocolversion,
reconnectOpts: {
retries: this.serverConfig.reconnectretries * 1,
delay: this.serverConfig.reconnectdelay * 1
}
};
if (this.serverConfig.vhost) {
this.stompClientOpts.vhost = this.serverConfig.vhost;
}
if (node.serverConnection) {
setStatusDisconnected(node);
var node = this;
// only start connection etc. when acknowledgements are configured to be send by client
if (node.serverConfig.ack) {
// Save the client connection to the shared server instance if needed
if (!node.serverConfig.clientConnection) {
node.serverConfig.clientConnection = new StompClient(node.stompClientOpts);
}
node.client = node.serverConfig.clientConnection;
node.client.on("connect", function() {
node.status({fill:"green",shape:"dot",text:"connected"});
node.serverConfig.connected = true;
});
node.client.on("reconnecting", function() {
node.status({fill:"red",shape:"ring",text:"reconnecting"});
node.warn("reconnecting");
node.serverConfig.connected = false;
});
node.client.on("reconnect", function() {
node.status({fill:"green",shape:"dot",text:"connected"});
node.serverConfig.connected = true;
});
node.client.on("error", function(error) {
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
});
// Connect to server if needed
if(!node.serverConfig.connected) {
node.status({fill:"grey",shape:"ring",text:"connecting"});
node.client.connect(function(sessionId) {
node.serverConfig.connected = true;
}, function(error) {
node.serverConfig.connected = false;
node.status({fill:"grey",shape:"dot",text:"error"});
node.warn(error);
});
}
node.on("input", function(msg) {
node.client.ack(msg.messageId, msg.subsriptionId, msg.transaction);
});
node.on("close", function(done) {
if (node.client) {
// disconnect can accept a callback - but it is not always called.
node.client.disconnect();
}
node.on("input", function(msg, send, done) {
node.client.ack(msg.messageId, n.topic, msg.transaction);
done();
});
node.serverConnection.register(node);
if (node.serverConnection.connected) {
setStatusConnected(node);
}
} else {
node.error("ACK not configured in server (config node)");
node.error("Missing server config");
}
node.on("close", function(removed, done) {
if (node.serverConnection) {
node.serverConnection.deregister(node, true, done);
node.serverConnection = null;
} else {
done();
}
});
}
RED.nodes.registerType("stomp ack",StompAckNode);