diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.js b/packages/node_modules/@node-red/nodes/core/function/89-delay.js index 6524aa040..6d4843caf 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.js +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.js @@ -201,6 +201,7 @@ module.exports = function(RED) { }); node.on("close", function() { clearDelayList(); }); } + else if (node.pauseType === "random") { node.on("input", function(msg, send, done) { var wait = node.randomFirst + (node.diff * Math.random()); @@ -226,34 +227,19 @@ module.exports = function(RED) { // The rate limit/queue type modes else if (node.pauseType === "rate") { node.on("input", function(msg, send, done) { - if (msg.hasOwnProperty("reset")) { - if (node.intervalID !== -1 ) { - clearInterval(node.intervalID); - node.intervalID = -1; - } - delete node.lastSent; - node.buffer = []; - node.rate = node.fixedrate; - node.status({fill:"blue",shape:"ring",text:0}); - done(); - return; - } - if (!node.drop) { var m = RED.util.cloneMessage(msg); - delete m.flush; - delete m.lifo; if (Object.keys(m).length > 1) { if (node.intervalID !== -1) { - if (node.allowrate && msg.hasOwnProperty("rate") && !isNaN(parseFloat(msg.rate)) && node.rate !== msg.rate) { - node.rate = msg.rate; + if (node.allowrate && m.hasOwnProperty("rate") && !isNaN(parseFloat(m.rate)) && node.rate !== m.rate) { + node.rate = m.rate; clearInterval(node.intervalID); node.intervalID = setInterval(sendMsgFromBuffer, node.rate); } var max_msgs = maxKeptMsgsCount(node); if ((max_msgs > 0) && (node.buffer.length >= max_msgs)) { node.buffer = []; - node.error(RED._("delay.errors.too-many"), msg); + node.error(RED._("delay.errors.too-many"), m); } else if (msg.toFront === true) { node.buffer.unshift({msg: m, send: send, done: done}); node.reportDepth(); @@ -263,8 +249,8 @@ module.exports = function(RED) { } } else { - if (node.allowrate && msg.hasOwnProperty("rate") && !isNaN(parseFloat(msg.rate))) { - node.rate = msg.rate; + if (node.allowrate && m.hasOwnProperty("rate") && !isNaN(parseFloat(m.rate))) { + node.rate = m.rate; } send(m); node.reportDepth(); @@ -282,6 +268,8 @@ module.exports = function(RED) { else { while (len > 0) { const msgInfo = node.buffer.shift(); + delete msgInfo.msg.flush; + delete msgInfo.msg.reset; if (Object.keys(msgInfo.msg).length > 1) { node.send(msgInfo.msg); msgInfo.done(); @@ -335,6 +323,21 @@ module.exports = function(RED) { } done(); } + + if (msg.hasOwnProperty("reset")) { + if (msg.flush === undefined) { + if (node.intervalID !== -1 ) { + clearInterval(node.intervalID); + node.intervalID = -1; + } + delete node.lastSent; + } + node.buffer = []; + node.rate = node.fixedrate; + node.status({fill:"blue",shape:"ring",text:0}); + done(); + return; + } }); node.on("close", function() { clearInterval(node.intervalID); @@ -387,6 +390,22 @@ module.exports = function(RED) { node.buffer.push({msg, send, done}); // if not add to end of queue node.reportDepth(); } + if (msg.hasOwnProperty("flush")) { + var len = node.buffer.length; + if (typeof(msg.flush) == 'number') { len = Math.min(Math.floor(msg.flush,len)); } + while (len > 0) { + const msgInfo = node.buffer.shift(); + delete msgInfo.msg.flush; + delete msgInfo.msg.reset; + if (Object.keys(msgInfo.msg).length > 2) { + node.send(msgInfo.msg); + msgInfo.done(); + } + len = len - 1; + } + node.status({}); + done(); + } if (msg.hasOwnProperty("reset")) { while (node.buffer.length > 0) { const msgInfo = node.buffer.shift(); @@ -397,21 +416,6 @@ module.exports = function(RED) { node.status({text:"reset"}); done(); } - if (msg.hasOwnProperty("flush")) { - var len = node.buffer.length; - if (typeof(msg.flush) == 'number') { len = Math.min(Math.floor(msg.flush,len)); } - while (len > 0) { - const msgInfo = node.buffer.shift(); - delete msgInfo.msg.flush; - if (Object.keys(msgInfo.msg).length > 2) { - node.send(msgInfo.msg); - msgInfo.done(); - } - len = len - 1; - } - node.status({}); - done(); - } }); node.on("close", function() { clearInterval(node.intervalID); diff --git a/test/nodes/core/function/89-delay_spec.js b/test/nodes/core/function/89-delay_spec.js index 17958b95b..46b0037bc 100644 --- a/test/nodes/core/function/89-delay_spec.js +++ b/test/nodes/core/function/89-delay_spec.js @@ -817,6 +817,105 @@ describe('delay Node', function() { }); }); + it('can part flush and reset rate limit queue', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + // console.log("GOT",Date.now() - t,msg) + msg.should.have.a.property('payload'); + msg.should.have.a.property('topic'); + try { + if (msg.topic === "foo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(0,50); + c = c + 1; + } + else if (msg.topic === "bar") { + msg.payload.should.equal(2); + (Date.now() - t).should.be.approximately(200,100); + c = c + 1; + } + else if (msg.topic === "fob") { + msg.payload.should.equal(5); + (Date.now() - t).should.be.approximately(400,100); + c = 5; + } + if (c === 5) { done(); } + } catch(e) { + done(e); + } + }); + + // send test messages + // delayNode1.receive({payload:1,topic:"foo"}); + setImmediate( function() { delayNode1.receive({payload:1,topic:"foo"}); } ); + setTimeout( function() { delayNode1.receive({payload:2,topic:"far"}); }, 10 ); + setTimeout( function() { delayNode1.receive({payload:3,topic:"boo"}); }, 20 ); + setTimeout( function() { delayNode1.receive({payload:4,topic:"bar"}); }, 30 ); + setTimeout( function() { delayNode1.receive({flush:2,reset:true}); }, 200); + setTimeout( function() { delayNode1.receive({payload:5,topic:"fob"}); }, 300 ); + setTimeout( function() { delayNode1.receive({flush:1,reset:true}); }, 400); + }); + }); + + it('can full flush and reset rate limit queue', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + // console.log("GOT",Date.now() - t,msg) + msg.should.have.a.property('payload'); + msg.should.have.a.property('topic'); + try { + if (msg.topic === "foo") { + msg.payload.should.equal(1); + (Date.now() - t).should.be.approximately(0,50); + c = c + 1; + } + else if (msg.topic === "bar") { + msg.payload.should.equal(4); + (Date.now() - t).should.be.approximately(200,100); + c = c + 1; + } + else if (msg.topic === "all") { + msg.payload.should.equal(5); + (Date.now() - t).should.be.approximately(200,100); + c = c + 1; + } + else if (msg.topic === "fob") { + msg.payload.should.equal(6); + (Date.now() - t).should.be.approximately(400,100); + c = 5; + } + if (c === 5) { done(); } + } catch(e) { + done(e); + } + }); + + // send test messages + // delayNode1.receive({payload:1,topic:"foo"}); + setImmediate( function() { delayNode1.receive({payload:1,topic:"foo"}); } ); + setTimeout( function() { delayNode1.receive({payload:2,topic:"far"}); }, 10 ); + setTimeout( function() { delayNode1.receive({payload:3,topic:"boo"}); }, 20 ); + setTimeout( function() { delayNode1.receive({payload:4,topic:"bar"}); }, 30 ); + setTimeout( function() { delayNode1.receive({payload:5,topic:"last",flush:true,reset:true}); }, 200); + setTimeout( function() { delayNode1.receive({payload:6,topic:"fob"}); }, 300 ); + setTimeout( function() { delayNode1.receive({flush:1,reset:true}); }, 400); + }); + }); + it('can part push to front of rate limit queue', function(done) { this.timeout(2000); var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},