Merge pull request #3069 from node-red/delay-push-to-front

Add push to front of rate limit queue.
This commit is contained in:
Nick O'Leary 2021-09-30 10:44:14 +01:00 committed by GitHub
commit 7fffc1a36d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 22 deletions

View File

@ -154,6 +154,7 @@ module.exports = function(RED) {
}, 15 * 1000);
node.on("close", function() { clearInterval(loggerId); });
// The delay type modes
if (node.pauseType === "delay") {
node.on("input", function(msg, send, done) {
var id = ourTimeout(function() {
@ -199,6 +200,29 @@ module.exports = function(RED) {
});
node.on("close", function() { clearDelayList(); });
}
else if (node.pauseType === "random") {
node.on("input", function(msg, send, done) {
var wait = node.randomFirst + (node.diff * Math.random());
var id = ourTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
send(msg);
if (node.timeout >= 1000) {
node.status({fill:"blue",shape:"dot",text:node.idList.length});
}
done();
}, wait, () => done());
if (Object.keys(msg).length === 2 && msg.hasOwnProperty("flush")) { id.clear(); }
else { node.idList.push(id); }
if (msg.hasOwnProperty("reset")) { clearDelayList(true); }
if (msg.hasOwnProperty("flush")) { flushDelayList(msg.flush); done(); }
if (node.timeout >= 1000) {
node.status({fill:"blue",shape:"dot",text:node.idList.length});
}
});
node.on("close", function() { clearDelayList(); });
}
// The rate limit/queue type modes
else if (node.pauseType === "rate") {
node.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
@ -217,6 +241,7 @@ module.exports = function(RED) {
if (!node.drop) {
var m = RED.util.cloneMessage(msg);
delete m.flush;
delete m.lifo;
if (Object.keys(m).length > 1) {
if (node.intervalID !== -1) {
if (node.allowrate && msg.hasOwnProperty("rate") && !isNaN(parseFloat(msg.rate)) && node.rate !== msg.rate) {
@ -228,6 +253,9 @@ module.exports = function(RED) {
if ((max_msgs > 0) && (node.buffer.length >= max_msgs)) {
node.buffer = [];
node.error(RED._("delay.errors.too-many"), msg);
} else if (msg.toFront === true) {
node.buffer.unshift({msg: m, send: send, done: done});
node.reportDepth();
} else {
node.buffer.push({msg: m, send: send, done: done});
node.reportDepth();
@ -309,6 +337,8 @@ module.exports = function(RED) {
node.status({});
});
}
// The topic based fair queue and last arrived on all topics queue
else if ((node.pauseType === "queue") || (node.pauseType === "timed")) {
node.intervalID = setInterval(function() {
if (node.pauseType === "queue") {
@ -385,27 +415,6 @@ module.exports = function(RED) {
node.status({});
});
}
else if (node.pauseType === "random") {
node.on("input", function(msg, send, done) {
var wait = node.randomFirst + (node.diff * Math.random());
var id = ourTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
send(msg);
if (node.timeout >= 1000) {
node.status({fill:"blue",shape:"dot",text:node.idList.length});
}
done();
}, wait, () => done());
if (Object.keys(msg).length === 2 && msg.hasOwnProperty("flush")) { id.clear(); }
else { node.idList.push(id); }
if (msg.hasOwnProperty("reset")) { clearDelayList(true); }
if (msg.hasOwnProperty("flush")) { flushDelayList(msg.flush); done(); }
if (node.timeout >= 1000) {
node.status({fill:"blue",shape:"dot",text:node.idList.length});
}
});
node.on("close", function() { clearDelayList(); });
}
}
RED.nodes.registerType("delay",DelayNode);
}

View File

@ -35,6 +35,11 @@
<dd>If the received message has this property set to a numeric value then that many messages
will be released immediately. If set to any other type (e.g. boolean), then all
outstanding messages held by the node are sent immediately.</dd>
<dt class="optional">toFront</dt>
<dd>When in rate limit mode, if the received message has this property set to boolean <code>true</code>,
then the message is pushed to the front of the queue and will be released next.
This can be used in combination with <code>msg.flush=1</code> to resend immediately.
</dd>
</dl>
<h3>Details</h3>
<p>When configured to delay messages, the delay interval can be a fixed value,
@ -46,7 +51,7 @@
the configured time period. The status shows the number of messages currently in the queue.
It can optionally discard intermediate messages as they arrive.</p>
</p>
<p> If set to allow override of the rate, the new rate will be applied immediately,
<p>If set to allow override of the rate, the new rate will be applied immediately,
and will remain in effect until changed again, the node is reset, or the flow is restarted.</p>
<p>The rate limiting can be applied to all messages, or group them according to
their <code>msg.topic</code> value. When grouping, intermediate messages are
@ -54,4 +59,6 @@
the most recent message for all topics, or release the most recent message
for the next topic.
</p>
<p><b>Note</b>: In rate limit mode the maximum queue depth can be set by a property in your
<i>settings.js</i> file. For example <code>nodeMessageBufferMaxLength: 1000,</code>
</script>

View File

@ -789,6 +789,62 @@ describe('delay Node', function() {
});
});
it('can part push to front of rate limit queue', function(done) {
this.timeout(2000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var t = Date.now();
var c = 0;
helperNode1.on("input", function(msg) {
msg.should.have.a.property('payload');
msg.should.have.a.property('topic');
try {
if (msg.topic === "aoo") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(2,50);
c = c + 1;
}
else if (msg.topic === "eoo") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(4,50);
c = c + 1;
}
else if (msg.topic === "coo") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(202,50);
c = c + 1;
}
else if (msg.topic === "boo") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(406,50);
c = c + 1;
}
else if (msg.topic === "doo") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(4,50);
c = c + 1;
}
if (c === 5) { done(); }
} catch(e) {
done(e);
}
});
// send test messages
delayNode1.receive({payload:1,topic:"aoo"});
setImmediate( function() { delayNode1.receive({payload:1,topic:"boo"}); } );
setImmediate( function() { delayNode1.receive({payload:1,topic:"coo",toFront:true}); } );
setImmediate( function() { delayNode1.receive({payload:1,topic:"doo",toFront:true,flush:1}); } );
setImmediate( function() { delayNode1.receive({payload:1,topic:"eoo",toFront:true}); } );
setImmediate( function() { delayNode1.receive({flush:1}); });
setTimeout( function() { delayNode1.receive({flush:1}); }, 200);
setTimeout( function() { delayNode1.receive({flush:4}); }, 400);
});
});
it('can reset rate limit queue', function(done) {
this.timeout(2000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},