mirror of
https://github.com/node-red/node-red-nodes.git
synced 2025-03-01 10:37:43 +00:00
Add support for ACK messages
This commit is contained in:
parent
2df137230f
commit
54ec71f16c
@ -122,6 +122,14 @@
|
||||
<option value="1.1">v1.1</option>
|
||||
</select>
|
||||
</div>
|
||||
<div class="form-row">
|
||||
<label for="node-config-input-ack"><i class="fa fa-check"></i> Enable client acknowledgement</label>
|
||||
<input type="checkbox" id="node-config-input-ack" />
|
||||
</div>
|
||||
<div class="form-tips">
|
||||
Enabling the ACK (acknowledgement) will set the <code>ack</code> header to <code>client</code> while subscribing to topics.
|
||||
This means the items on the broker queue will not be dequeue'd unless an ACK message is sent using the <code>ack</code> node.
|
||||
</div>
|
||||
<div class="form-row">
|
||||
<label for="node-config-input-vhost"><i class="fa fa-server"></i> vhost</label>
|
||||
<input type="text" id="node-config-input-vhost" placeholder="Default is null" />
|
||||
@ -147,6 +155,7 @@
|
||||
server: {required:true},
|
||||
port: {value:61613,required:true,validate:RED.validators.number()},
|
||||
protocolversion: {value:"1.0",required:true},
|
||||
ack: {value: false},
|
||||
vhost: {},
|
||||
reconnectretries: {value:"0",required:true,validate:RED.validators.number()},
|
||||
reconnectdelay: {value:"0.5",required:true,validate:RED.validators.number()},
|
||||
@ -161,3 +170,52 @@
|
||||
}
|
||||
});
|
||||
</script>
|
||||
|
||||
<script type="text/html" data-template-name="stomp ack">
|
||||
<div class="form-row">
|
||||
<label for="node-input-server" style="width: 110px;"><i class="fa fa-bookmark"></i> Server</label>
|
||||
<input type="text" id="node-input-server">
|
||||
</div>
|
||||
<div class="form-row">
|
||||
<label for="node-input-name" style="width: 110px;"><i class="fa fa-tag"></i> Name</label>
|
||||
<input type="text" id="node-input-name" placeholder="Name">
|
||||
</div>
|
||||
</script>
|
||||
|
||||
<script type="text/html" data-help-name="stomp ack">
|
||||
<p>
|
||||
ACK is used to acknowledge consumption of a message from a subscription using client acknowledgment. When a client has issued a SUBSCRIBE frame with the ack header set to client any messages received from that destination will not be considered to have been consumed (by the server) until the message has been acknowledged via an ACK.
|
||||
</p>
|
||||
<p>
|
||||
The node allows for following inputs:
|
||||
<ul>
|
||||
<li><code>msg.messageId</code>: The id of the message to acknowledge</li>
|
||||
<li><code>msg.subscription</code>: The id of the subscription made using the stomp in node</li>
|
||||
<li><code>msg.transaction</code>: Optional transaction name</li>
|
||||
</ul>
|
||||
</p>
|
||||
<p>
|
||||
See the <a href="https://stomp.github.io/stomp-specification-1.0.html#frame-ACK" target="new">Stomp 1.0 spec</a> for more details.
|
||||
</p>
|
||||
</script>
|
||||
|
||||
<script type="text/javascript">
|
||||
RED.nodes.registerType('stomp ack',{
|
||||
category: 'output',
|
||||
color:"#e8cfe8",
|
||||
defaults: {
|
||||
name: {value:""},
|
||||
server: {type:"stomp-server",required:true}
|
||||
},
|
||||
inputs:1,
|
||||
outputs:0,
|
||||
icon: "bridge.png",
|
||||
align: "right",
|
||||
label: function() {
|
||||
return this.name||"stomp";
|
||||
},
|
||||
labelStyle: function() {
|
||||
return (this.name)?"node_label_italic":"";
|
||||
}
|
||||
});
|
||||
</script>
|
||||
|
@ -9,6 +9,7 @@ module.exports = function(RED) {
|
||||
this.server = n.server;
|
||||
this.port = n.port;
|
||||
this.protocolversion = n.protocolversion;
|
||||
this.ack = n.ack;
|
||||
this.vhost = n.vhost;
|
||||
this.reconnectretries = n.reconnectretries || 999999;
|
||||
this.reconnectdelay = (n.reconnectdelay || 15) * 1000;
|
||||
@ -43,6 +44,7 @@ module.exports = function(RED) {
|
||||
if (this.serverConfig.vhost) {
|
||||
this.stompClientOpts.vhost = this.serverConfig.vhost;
|
||||
}
|
||||
this.subscribeHeaders = this.serverConfig.ack ? { "ack": "client" } : {};
|
||||
|
||||
var node = this;
|
||||
var msg = {topic:this.topic};
|
||||
@ -69,7 +71,7 @@ module.exports = function(RED) {
|
||||
node.status({fill:"grey",shape:"ring",text:"connecting"});
|
||||
node.client.connect(function(sessionId) {
|
||||
node.log('subscribing to: '+node.topic);
|
||||
node.client.subscribe(node.topic, function(body, headers) {
|
||||
node.client.subscribe(node.topic, node.subscribeHeaders, function(body, headers) {
|
||||
var newmsg={"headers":headers,"topic":node.topic}
|
||||
try {
|
||||
newmsg.payload = JSON.parse(body);
|
||||
@ -158,4 +160,72 @@ module.exports = function(RED) {
|
||||
}
|
||||
RED.nodes.registerType("stomp out",StompOutNode);
|
||||
|
||||
function StompAckNode(n) {
|
||||
RED.nodes.createNode(this,n);
|
||||
this.server = n.server;
|
||||
|
||||
this.serverConfig = RED.nodes.getNode(this.server);
|
||||
this.stompClientOpts = {
|
||||
address: this.serverConfig.server,
|
||||
port: this.serverConfig.port * 1,
|
||||
user: this.serverConfig.username,
|
||||
pass: this.serverConfig.password,
|
||||
protocolVersion: this.serverConfig.protocolversion,
|
||||
reconnectOpts: {
|
||||
retries: this.serverConfig.reconnectretries * 1,
|
||||
delay: this.serverConfig.reconnectdelay * 1
|
||||
}
|
||||
};
|
||||
if (this.serverConfig.vhost) {
|
||||
this.stompClientOpts.vhost = this.serverConfig.vhost;
|
||||
}
|
||||
|
||||
var node = this;
|
||||
|
||||
// only start connection etc. when acknowledgements are configured to be send by client
|
||||
if (node.serverConfig.ack) {
|
||||
node.client = new StompClient(node.stompClientOpts);
|
||||
|
||||
node.client.on("connect", function() {
|
||||
node.status({fill:"green",shape:"dot",text:"connected"});
|
||||
});
|
||||
|
||||
node.client.on("reconnecting", function() {
|
||||
node.status({fill:"red",shape:"ring",text:"reconnecting"});
|
||||
node.warn("reconnecting");
|
||||
});
|
||||
|
||||
node.client.on("reconnect", function() {
|
||||
node.status({fill:"green",shape:"dot",text:"connected"});
|
||||
});
|
||||
|
||||
node.client.on("error", function(error) {
|
||||
node.status({fill:"grey",shape:"dot",text:"error"});
|
||||
node.warn(error);
|
||||
});
|
||||
|
||||
node.status({fill:"grey",shape:"ring",text:"connecting"});
|
||||
node.client.connect(function(sessionId) {
|
||||
}, function(error) {
|
||||
node.status({fill:"grey",shape:"dot",text:"error"});
|
||||
node.warn(error);
|
||||
});
|
||||
|
||||
node.on("input", function(msg) {
|
||||
node.ack(msg.messageId, msg.subsriptionId, msg.transaction);
|
||||
});
|
||||
|
||||
node.on("close", function(done) {
|
||||
if (node.client) {
|
||||
// disconnect can accept a callback - but it is not always called.
|
||||
node.client.disconnect();
|
||||
}
|
||||
done();
|
||||
});
|
||||
} else {
|
||||
node.error("ACK not configured in server (config node)");
|
||||
}
|
||||
}
|
||||
RED.nodes.registerType("stomp ack",StompAckNode);
|
||||
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user