diff --git a/packages/node_modules/@node-red/nodes/core/core/89-delay.html b/packages/node_modules/@node-red/nodes/core/core/89-delay.html index 9b8c5cad6..d91631ce1 100644 --- a/packages/node_modules/@node-red/nodes/core/core/89-delay.html +++ b/packages/node_modules/@node-red/nodes/core/core/89-delay.html @@ -103,9 +103,12 @@
Sets the delay, in milliseconds, to be applied to the message. This option only applies if the node is configured to allow the message to override the configured default delay interval.
-
reset
-
If the received message has this property set to any value, all - outstanding messages held by the node are cleared without being sent.
+
reset
+
If the received message has this property set to any value, all + outstanding messages held by the node are cleared without being sent.
+
flush
+
If the received message has this property set to any value, all + outstanding messages held by the node are sent immediately.

Details

When configured to delay messages, the delay interval can be a fixed value, diff --git a/packages/node_modules/@node-red/nodes/core/core/89-delay.js b/packages/node_modules/@node-red/nodes/core/core/89-delay.js index a93431a33..9d9b1e740 100644 --- a/packages/node_modules/@node-red/nodes/core/core/89-delay.js +++ b/packages/node_modules/@node-red/nodes/core/core/89-delay.js @@ -80,26 +80,52 @@ module.exports = function(RED) { this.drop = n.drop; var node = this; + function ourTimeout(handler, delay) { + var toutID = setTimeout(handler, delay); + return { + clear: function() { clearTimeout(toutID); }, + trigger: function() { clearTimeout(toutID); return handler(); } + }; + } + var clearDelayList = function() { - for (var i=0; i 0) { node.status({text:node.buffer.length}); } + else { node.status({}); } + node.busy = null; + }, 500); + } + } + if (node.pauseType === "delay") { node.on("input", function(msg) { - var id = setTimeout(function() { - node.idList.splice(node.idList.indexOf(id),1); - if (node.idList.length === 0) { node.status({}); } - node.send(msg); - }, node.timeout); - node.idList.push(id); - if ((node.timeout > 1000) && (node.idList.length !== 0)) { - node.status({fill:"blue",shape:"dot",text:" "}); + if (msg.hasOwnProperty("flush")) { flushDelayList(); } + else { + var id = ourTimeout(function() { + node.idList.splice(node.idList.indexOf(id),1); + if (node.idList.length === 0) { node.status({}); } + node.send(msg); + }, node.timeout); + node.idList.push(id); + if ((node.timeout > 1000) && (node.idList.length !== 0)) { + node.status({fill:"blue",shape:"dot",text:" "}); + } + if (msg.hasOwnProperty("reset")) { clearDelayList(); } } - if (msg.hasOwnProperty("reset")) { clearDelayList(); } }); node.on("close", function() { clearDelayList(); }); } @@ -110,7 +136,7 @@ module.exports = function(RED) { delayvar = parseFloat(msg.delay); } if (delayvar < 0) { delayvar = 0; } - var id = setTimeout(function() { + var id = ourTimeout(function() { node.idList.splice(node.idList.indexOf(id),1); if (node.idList.length === 0) { node.status({}); } node.send(msg); @@ -120,32 +146,22 @@ module.exports = function(RED) { node.status({fill:"blue",shape:"dot",text:delayvar/1000+"s"}); } if (msg.hasOwnProperty("reset")) { clearDelayList(); } + if (msg.hasOwnProperty("flush")) { flushDelayList(); } }); node.on("close", function() { clearDelayList(); }); } else if (node.pauseType === "rate") { - node.reportDepth = function() { - if (!node.busy) { - node.busy = setTimeout(function() { - if (node.buffer.length > 0) { - node.status({text:node.buffer.length}); - } else { - node.status({}); - } - node.busy = null; - },500); - } - } node.on("input", function(msg) { if (!node.drop) { - if ( node.intervalID !== -1) { - node.buffer.push(msg); - node.reportDepth(); + if (node.intervalID !== -1) { + if (!msg.hasOwnProperty("flush")) { + node.buffer.push(msg); + node.reportDepth(); + } } else { node.send(msg); node.reportDepth(); - node.intervalID = setInterval(function() { if (node.buffer.length === 0) { clearInterval(node.intervalID); @@ -155,7 +171,7 @@ module.exports = function(RED) { node.send(node.buffer.shift()); } node.reportDepth(); - },node.rate); + }, node.rate); } } else { @@ -178,6 +194,12 @@ module.exports = function(RED) { node.buffer = []; node.status({text:"reset"}); } + if (msg.hasOwnProperty("flush")) { + while (node.buffer.length > 0) { + node.send(node.buffer.shift()); + } + node.status({}); + } }); node.on("close", function() { clearInterval(node.intervalID); @@ -198,24 +220,31 @@ module.exports = function(RED) { node.send(node.buffer.shift()); } } - node.status({text:node.buffer.length}); + node.reportDepth(); },node.rate); + var hit; node.on("input", function(msg) { if (!msg.hasOwnProperty("topic")) { msg.topic = "_none_"; } - var hit = false; + hit = false; for (var b in node.buffer) { // check if already in queue if (msg.topic === node.buffer[b].topic) { node.buffer[b] = msg; // if so - replace existing entry hit = true; + break; } } if (!hit) { node.buffer.push(msg); } // if not add to end of queue - node.status({text:node.buffer.length}); if (msg.hasOwnProperty("reset")) { node.buffer = []; node.status({text:"reset"}); } + if (msg.hasOwnProperty("flush")) { + while (node.buffer.length > 0) { + node.send(node.buffer.shift()); + } + node.status({}); + } }); node.on("close", function() { clearInterval(node.intervalID); @@ -226,7 +255,7 @@ module.exports = function(RED) { else if (node.pauseType === "random") { node.on("input", function(msg) { var wait = node.randomFirst + (node.diff * Math.random()); - var id = setTimeout(function() { + var id = ourTimeout(function() { node.idList.splice(node.idList.indexOf(id),1); node.send(msg); node.status({}); @@ -236,6 +265,7 @@ module.exports = function(RED) { node.status({fill:"blue",shape:"dot",text:parseInt(wait/10)/100+"s"}); } if (msg.hasOwnProperty("reset")) { clearDelayList(); } + if (msg.hasOwnProperty("flush")) { flushDelayList(); } }); node.on("close", function() { clearDelayList(); }); } diff --git a/test/nodes/core/core/89-delay_spec.js b/test/nodes/core/core/89-delay_spec.js index d678bf1c6..c516adb43 100644 --- a/test/nodes/core/core/89-delay_spec.js +++ b/test/nodes/core/core/89-delay_spec.js @@ -572,4 +572,150 @@ describe('delay Node', function() { delayNode1.receive({payload:4,topic:"A"}); // and nothing on second }); }); + + it('can flush delay queue', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + msg.should.have.a.property('payload'); + msg.should.have.a.property('topic'); + try { + if (msg.topic === "foo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(0,100); + c = c + 1; + } + else { + if (msg.topic === "bar") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(0,100); + c = c + 1; + } + } + if (c === 5) { done(); } + } catch(e) { + done(e); + } + }); + + // send test messages + delayNode1.receive({payload:1,topic:"foo"}); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({flush:true}); }); // reset the queue + }); + }); + + it('can reset delay queue', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + c = c + 1; + }); + + setTimeout( function() { + if (c === 0) { done(); } + }, 700); + + // send test messages + delayNode1.receive({payload:1,topic:"foo"}); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({reset:true}); }); // reset the queue + }); + }); + + it('can flush rate limit queue', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + msg.should.have.a.property('payload'); + msg.should.have.a.property('topic'); + try { + if (msg.topic === "foo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(0,100); + c = c + 1; + } + else { + if (msg.topic === "bar") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(0,100); + c = c + 1; + } + } + if (c === 5) { done(); } + } catch(e) { + done(e); + } + }); + + // send test messages + delayNode1.receive({payload:1,topic:"foo"}); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({flush:true}); }); // reset the queue + }); + }); + + it('can reset rate limit queue', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + msg.should.have.a.property('payload'); + msg.should.have.a.property('topic'); + try { + if (msg.topic === "foo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(0,100); + c = c + 1; + } + } catch(e) { + done(e); + } + }); + + setTimeout( function() { + if (c === 1) { done(); } + }, 700); + + // send test messages + delayNode1.receive({payload:1,topic:"foo"}); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic + setImmediate( function() { delayNode1.receive({reset:true}); }); // reset the queue + }); + }); });