From d820f55358b29f408dc062da377baaf2bd63a9ee Mon Sep 17 00:00:00 2001 From: Dave Conway-Jones Date: Thu, 15 Jul 2021 22:15:46 +0100 Subject: [PATCH 1/4] Add push to front of rate limit queue. (moved random delay to top to group with other delay types. Tests and docs to follow --- .../@node-red/nodes/core/function/89-delay.js | 49 +++++++++++-------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.js b/packages/node_modules/@node-red/nodes/core/function/89-delay.js index 10f30796b..850497e22 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.js +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.js @@ -154,6 +154,7 @@ module.exports = function(RED) { }, 15 * 1000); node.on("close", function() { clearInterval(loggerId); }); + // The delay type modes if (node.pauseType === "delay") { node.on("input", function(msg, send, done) { var id = ourTimeout(function() { @@ -199,6 +200,29 @@ module.exports = function(RED) { }); node.on("close", function() { clearDelayList(); }); } + else if (node.pauseType === "random") { + node.on("input", function(msg, send, done) { + var wait = node.randomFirst + (node.diff * Math.random()); + var id = ourTimeout(function() { + node.idList.splice(node.idList.indexOf(id),1); + send(msg); + if (node.timeout >= 1000) { + node.status({fill:"blue",shape:"dot",text:node.idList.length}); + } + done(); + }, wait, () => done()); + if (Object.keys(msg).length === 2 && msg.hasOwnProperty("flush")) { id.clear(); } + else { node.idList.push(id); } + if (msg.hasOwnProperty("reset")) { clearDelayList(true); } + if (msg.hasOwnProperty("flush")) { flushDelayList(msg.flush); done(); } + if (node.timeout >= 1000) { + node.status({fill:"blue",shape:"dot",text:node.idList.length}); + } + }); + node.on("close", function() { clearDelayList(); }); + } + + // The rate limit/queue type modes else if (node.pauseType === "rate") { node.on("input", function(msg, send, done) { if (msg.hasOwnProperty("reset")) { @@ -217,6 +241,7 @@ module.exports = function(RED) { if (!node.drop) { var m = RED.util.cloneMessage(msg); delete m.flush; + delete m.lifo; if (Object.keys(m).length > 1) { if (node.intervalID !== -1) { if (node.allowrate && msg.hasOwnProperty("rate") && !isNaN(parseFloat(msg.rate)) && node.rate !== msg.rate) { @@ -228,6 +253,9 @@ module.exports = function(RED) { if ((max_msgs > 0) && (node.buffer.length >= max_msgs)) { node.buffer = []; node.error(RED._("delay.errors.too-many"), msg); + } else if (msg.lifo) { + node.buffer.unshift({msg: m, send: send, done: done}); + node.reportDepth(); } else { node.buffer.push({msg: m, send: send, done: done}); node.reportDepth(); @@ -385,27 +413,6 @@ module.exports = function(RED) { node.status({}); }); } - else if (node.pauseType === "random") { - node.on("input", function(msg, send, done) { - var wait = node.randomFirst + (node.diff * Math.random()); - var id = ourTimeout(function() { - node.idList.splice(node.idList.indexOf(id),1); - send(msg); - if (node.timeout >= 1000) { - node.status({fill:"blue",shape:"dot",text:node.idList.length}); - } - done(); - }, wait, () => done()); - if (Object.keys(msg).length === 2 && msg.hasOwnProperty("flush")) { id.clear(); } - else { node.idList.push(id); } - if (msg.hasOwnProperty("reset")) { clearDelayList(true); } - if (msg.hasOwnProperty("flush")) { flushDelayList(msg.flush); done(); } - if (node.timeout >= 1000) { - node.status({fill:"blue",shape:"dot",text:node.idList.length}); - } - }); - node.on("close", function() { clearDelayList(); }); - } } RED.nodes.registerType("delay",DelayNode); } From a2b95dbb397ef656641a4a97d123b27a16436e2a Mon Sep 17 00:00:00 2001 From: Dave Conway-Jones Date: Fri, 16 Jul 2021 11:31:21 +0100 Subject: [PATCH 2/4] delay node - change lifo property to toFront add info to sidebar add tests --- .../@node-red/nodes/core/function/89-delay.js | 2 +- .../locales/en-US/function/89-delay.html | 5 ++ test/nodes/core/function/89-delay_spec.js | 56 +++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.js b/packages/node_modules/@node-red/nodes/core/function/89-delay.js index 850497e22..5e90edaac 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.js +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.js @@ -253,7 +253,7 @@ module.exports = function(RED) { if ((max_msgs > 0) && (node.buffer.length >= max_msgs)) { node.buffer = []; node.error(RED._("delay.errors.too-many"), msg); - } else if (msg.lifo) { + } else if (msg.toFront === true) { node.buffer.unshift({msg: m, send: send, done: done}); node.reportDepth(); } else { diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html index 1e801698a..574afd24f 100644 --- a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html +++ b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html @@ -35,6 +35,11 @@
If the received message has this property set to a numeric value then that many messages will be released immediately. If set to any other type (e.g. boolean), then all outstanding messages held by the node are sent immediately.
+
toFront
+
When in rate limit mode, if the received message has this property set to boolean true, + then the message is pushed to the front of the queue and will be released next. + This can be used in combination with msg.flush=1 to resend immediately. +

Details

When configured to delay messages, the delay interval can be a fixed value, diff --git a/test/nodes/core/function/89-delay_spec.js b/test/nodes/core/function/89-delay_spec.js index 87ca9c434..4c1ac9dbc 100644 --- a/test/nodes/core/function/89-delay_spec.js +++ b/test/nodes/core/function/89-delay_spec.js @@ -789,6 +789,62 @@ describe('delay Node', function() { }); }); + it('can part push to front of rate limit queue', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + msg.should.have.a.property('payload'); + msg.should.have.a.property('topic'); + try { + if (msg.topic === "aoo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(2,50); + c = c + 1; + } + else if (msg.topic === "eoo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(4,50); + c = c + 1; + } + else if (msg.topic === "coo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(202,50); + c = c + 1; + } + else if (msg.topic === "boo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(406,50); + c = c + 1; + } + else if (msg.topic === "doo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(404,50); + c = c + 1; + } + if (c === 5) { done(); } + } catch(e) { + done(e); + } + }); + + // send test messages + delayNode1.receive({payload:1,topic:"aoo"}); + setImmediate( function() { delayNode1.receive({payload:1,topic:"boo"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"coo",toFront:true}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"doo"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"eoo",toFront:true}); } ); + setImmediate( function() { delayNode1.receive({flush:1}); }); + setTimeout( function() { delayNode1.receive({flush:1}); }, 200); + setTimeout( function() { delayNode1.receive({flush:4}); }, 400); + }); + }); + it('can reset rate limit queue', function(done) { this.timeout(2000); var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, From 4d26b806ddb077886294d1488b94bead609b2d56 Mon Sep 17 00:00:00 2001 From: Dave Conway-Jones Date: Fri, 16 Jul 2021 11:38:06 +0100 Subject: [PATCH 3/4] delay - add test for push toFront and flush=1 --- test/nodes/core/function/89-delay_spec.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/nodes/core/function/89-delay_spec.js b/test/nodes/core/function/89-delay_spec.js index 4c1ac9dbc..4620458f8 100644 --- a/test/nodes/core/function/89-delay_spec.js +++ b/test/nodes/core/function/89-delay_spec.js @@ -824,7 +824,7 @@ describe('delay Node', function() { } else if (msg.topic === "doo") { msg.payload.should.equal(1); - (Date.now() - t).should.be.approximately(404,50); + (Date.now() - t).should.be.approximately(4,50); c = c + 1; } if (c === 5) { done(); } @@ -837,7 +837,7 @@ describe('delay Node', function() { delayNode1.receive({payload:1,topic:"aoo"}); setImmediate( function() { delayNode1.receive({payload:1,topic:"boo"}); } ); setImmediate( function() { delayNode1.receive({payload:1,topic:"coo",toFront:true}); } ); - setImmediate( function() { delayNode1.receive({payload:1,topic:"doo"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"doo",toFront:true,flush:1}); } ); setImmediate( function() { delayNode1.receive({payload:1,topic:"eoo",toFront:true}); } ); setImmediate( function() { delayNode1.receive({flush:1}); }); setTimeout( function() { delayNode1.receive({flush:1}); }, 200); From ef1b3aa7f5fb5179411c30cf1916005a51c86b04 Mon Sep 17 00:00:00 2001 From: Dave Conway-Jones Date: Wed, 18 Aug 2021 14:05:42 +0100 Subject: [PATCH 4/4] Add comment to info docs re queue depth limit setting --- .../node_modules/@node-red/nodes/core/function/89-delay.js | 2 ++ .../@node-red/nodes/locales/en-US/function/89-delay.html | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.js b/packages/node_modules/@node-red/nodes/core/function/89-delay.js index 5e90edaac..5a7d504a3 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.js +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.js @@ -337,6 +337,8 @@ module.exports = function(RED) { node.status({}); }); } + + // The topic based fair queue and last arrived on all topics queue else if ((node.pauseType === "queue") || (node.pauseType === "timed")) { node.intervalID = setInterval(function() { if (node.pauseType === "queue") { diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html index 574afd24f..67086b266 100644 --- a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html +++ b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html @@ -51,7 +51,7 @@ the configured time period. The status shows the number of messages currently in the queue. It can optionally discard intermediate messages as they arrive.

-

If set to allow override of the rate, the new rate will be applied immediately, +

If set to allow override of the rate, the new rate will be applied immediately, and will remain in effect until changed again, the node is reset, or the flow is restarted.

The rate limiting can be applied to all messages, or group them according to their msg.topic value. When grouping, intermediate messages are @@ -59,4 +59,6 @@ the most recent message for all topics, or release the most recent message for the next topic.

+

Note: In rate limit mode the maximum queue depth can be set by a property in your + settings.js file. For example nodeMessageBufferMaxLength: 1000,