diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.js b/packages/node_modules/@node-red/nodes/core/function/89-delay.js index 73129956e..999e6847b 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.js +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.js @@ -154,6 +154,7 @@ module.exports = function(RED) { }, 15 * 1000); node.on("close", function() { clearInterval(loggerId); }); + // The delay type modes if (node.pauseType === "delay") { node.on("input", function(msg, send, done) { var id = ourTimeout(function() { @@ -199,6 +200,29 @@ module.exports = function(RED) { }); node.on("close", function() { clearDelayList(); }); } + else if (node.pauseType === "random") { + node.on("input", function(msg, send, done) { + var wait = node.randomFirst + (node.diff * Math.random()); + var id = ourTimeout(function() { + node.idList.splice(node.idList.indexOf(id),1); + send(msg); + if (node.timeout >= 1000) { + node.status({fill:"blue",shape:"dot",text:node.idList.length}); + } + done(); + }, wait, () => done()); + if (Object.keys(msg).length === 2 && msg.hasOwnProperty("flush")) { id.clear(); } + else { node.idList.push(id); } + if (msg.hasOwnProperty("reset")) { clearDelayList(true); } + if (msg.hasOwnProperty("flush")) { flushDelayList(msg.flush); done(); } + if (node.timeout >= 1000) { + node.status({fill:"blue",shape:"dot",text:node.idList.length}); + } + }); + node.on("close", function() { clearDelayList(); }); + } + + // The rate limit/queue type modes else if (node.pauseType === "rate") { node.on("input", function(msg, send, done) { if (msg.hasOwnProperty("reset")) { @@ -217,6 +241,7 @@ module.exports = function(RED) { if (!node.drop) { var m = RED.util.cloneMessage(msg); delete m.flush; + delete m.lifo; if (Object.keys(m).length > 1) { if (node.intervalID !== -1) { if (node.allowrate && msg.hasOwnProperty("rate") && !isNaN(parseFloat(msg.rate)) && node.rate !== msg.rate) { @@ -228,6 +253,9 @@ module.exports = function(RED) { if ((max_msgs > 0) && (node.buffer.length >= max_msgs)) { node.buffer = []; node.error(RED._("delay.errors.too-many"), msg); + } else if (msg.toFront === true) { + node.buffer.unshift({msg: m, send: send, done: done}); + node.reportDepth(); } else { node.buffer.push({msg: m, send: send, done: done}); node.reportDepth(); @@ -309,6 +337,8 @@ module.exports = function(RED) { node.status({}); }); } + + // The topic based fair queue and last arrived on all topics queue else if ((node.pauseType === "queue") || (node.pauseType === "timed")) { node.intervalID = setInterval(function() { if (node.pauseType === "queue") { @@ -385,27 +415,6 @@ module.exports = function(RED) { node.status({}); }); } - else if (node.pauseType === "random") { - node.on("input", function(msg, send, done) { - var wait = node.randomFirst + (node.diff * Math.random()); - var id = ourTimeout(function() { - node.idList.splice(node.idList.indexOf(id),1); - send(msg); - if (node.timeout >= 1000) { - node.status({fill:"blue",shape:"dot",text:node.idList.length}); - } - done(); - }, wait, () => done()); - if (Object.keys(msg).length === 2 && msg.hasOwnProperty("flush")) { id.clear(); } - else { node.idList.push(id); } - if (msg.hasOwnProperty("reset")) { clearDelayList(true); } - if (msg.hasOwnProperty("flush")) { flushDelayList(msg.flush); done(); } - if (node.timeout >= 1000) { - node.status({fill:"blue",shape:"dot",text:node.idList.length}); - } - }); - node.on("close", function() { clearDelayList(); }); - } } RED.nodes.registerType("delay",DelayNode); } diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html index 1e801698a..67086b266 100644 --- a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html +++ b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html @@ -35,6 +35,11 @@
If the received message has this property set to a numeric value then that many messages will be released immediately. If set to any other type (e.g. boolean), then all outstanding messages held by the node are sent immediately.
+
toFront
+
When in rate limit mode, if the received message has this property set to boolean true, + then the message is pushed to the front of the queue and will be released next. + This can be used in combination with msg.flush=1 to resend immediately. +

Details

When configured to delay messages, the delay interval can be a fixed value, @@ -46,7 +51,7 @@ the configured time period. The status shows the number of messages currently in the queue. It can optionally discard intermediate messages as they arrive.

-

If set to allow override of the rate, the new rate will be applied immediately, +

If set to allow override of the rate, the new rate will be applied immediately, and will remain in effect until changed again, the node is reset, or the flow is restarted.

The rate limiting can be applied to all messages, or group them according to their msg.topic value. When grouping, intermediate messages are @@ -54,4 +59,6 @@ the most recent message for all topics, or release the most recent message for the next topic.

+

Note: In rate limit mode the maximum queue depth can be set by a property in your + settings.js file. For example nodeMessageBufferMaxLength: 1000, diff --git a/test/nodes/core/function/89-delay_spec.js b/test/nodes/core/function/89-delay_spec.js index 87ca9c434..4620458f8 100644 --- a/test/nodes/core/function/89-delay_spec.js +++ b/test/nodes/core/function/89-delay_spec.js @@ -789,6 +789,62 @@ describe('delay Node', function() { }); }); + it('can part push to front of rate limit queue', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + msg.should.have.a.property('payload'); + msg.should.have.a.property('topic'); + try { + if (msg.topic === "aoo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(2,50); + c = c + 1; + } + else if (msg.topic === "eoo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(4,50); + c = c + 1; + } + else if (msg.topic === "coo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(202,50); + c = c + 1; + } + else if (msg.topic === "boo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(406,50); + c = c + 1; + } + else if (msg.topic === "doo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(4,50); + c = c + 1; + } + if (c === 5) { done(); } + } catch(e) { + done(e); + } + }); + + // send test messages + delayNode1.receive({payload:1,topic:"aoo"}); + setImmediate( function() { delayNode1.receive({payload:1,topic:"boo"}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"coo",toFront:true}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"doo",toFront:true,flush:1}); } ); + setImmediate( function() { delayNode1.receive({payload:1,topic:"eoo",toFront:true}); } ); + setImmediate( function() { delayNode1.receive({flush:1}); }); + setTimeout( function() { delayNode1.receive({flush:1}); }, 200); + setTimeout( function() { delayNode1.receive({flush:4}); }, 400); + }); + }); + it('can reset rate limit queue', function(done) { this.timeout(2000); var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},