Add msg.flush mode to delay node

to spew out contents rather than dump
(as per Trello item)
This commit is contained in:
Dave Conway-Jones 2018-09-23 17:31:11 +01:00
parent 80a15089b4
commit 4630a162af
No known key found for this signature in database
GPG Key ID: 9E7F9C73F5168CD4
3 changed files with 216 additions and 37 deletions

View File

@ -103,9 +103,12 @@
<dd>Sets the delay, in milliseconds, to be applied to the message. This
option only applies if the node is configured to allow the message to
override the configured default delay interval.</dd>
<dt class="optional">reset</dt>
<dd>If the received message has this property set to any value, all
outstanding messages held by the node are cleared without being sent.</dd>
<dt class="optional">reset</dt>
<dd>If the received message has this property set to any value, all
outstanding messages held by the node are cleared without being sent.</dd>
<dt class="optional">flush</dt>
<dd>If the received message has this property set to any value, all
outstanding messages held by the node are sent immediately.</dd>
</dl>
<h3>Details</h3>
<p>When configured to delay messages, the delay interval can be a fixed value,

View File

@ -80,26 +80,52 @@ module.exports = function(RED) {
this.drop = n.drop;
var node = this;
function ourTimeout(handler, delay) {
var toutID = setTimeout(handler, delay);
return {
clear: function() { clearTimeout(toutID); },
trigger: function() { clearTimeout(toutID); return handler(); }
};
}
var clearDelayList = function() {
for (var i=0; i<node.idList.length; i++ ) {
clearTimeout(node.idList[i]);
}
for (var i=0; i<node.idList.length; i++ ) { node.idList[i].clear(); }
node.idList = [];
node.status({text:"reset"});
}
var flushDelayList = function() {
var len = node.idList.length;
for (var i=0; i<len; i++ ) { node.idList[0].trigger(); }
node.idList = [];
node.status({text:"flushed"});
}
node.reportDepth = function() {
if (!node.busy) {
node.busy = setTimeout(function() {
if (node.buffer.length > 0) { node.status({text:node.buffer.length}); }
else { node.status({}); }
node.busy = null;
}, 500);
}
}
if (node.pauseType === "delay") {
node.on("input", function(msg) {
var id = setTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
if (node.idList.length === 0) { node.status({}); }
node.send(msg);
}, node.timeout);
node.idList.push(id);
if ((node.timeout > 1000) && (node.idList.length !== 0)) {
node.status({fill:"blue",shape:"dot",text:" "});
if (msg.hasOwnProperty("flush")) { flushDelayList(); }
else {
var id = ourTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
if (node.idList.length === 0) { node.status({}); }
node.send(msg);
}, node.timeout);
node.idList.push(id);
if ((node.timeout > 1000) && (node.idList.length !== 0)) {
node.status({fill:"blue",shape:"dot",text:" "});
}
if (msg.hasOwnProperty("reset")) { clearDelayList(); }
}
if (msg.hasOwnProperty("reset")) { clearDelayList(); }
});
node.on("close", function() { clearDelayList(); });
}
@ -110,7 +136,7 @@ module.exports = function(RED) {
delayvar = parseFloat(msg.delay);
}
if (delayvar < 0) { delayvar = 0; }
var id = setTimeout(function() {
var id = ourTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
if (node.idList.length === 0) { node.status({}); }
node.send(msg);
@ -120,32 +146,22 @@ module.exports = function(RED) {
node.status({fill:"blue",shape:"dot",text:delayvar/1000+"s"});
}
if (msg.hasOwnProperty("reset")) { clearDelayList(); }
if (msg.hasOwnProperty("flush")) { flushDelayList(); }
});
node.on("close", function() { clearDelayList(); });
}
else if (node.pauseType === "rate") {
node.reportDepth = function() {
if (!node.busy) {
node.busy = setTimeout(function() {
if (node.buffer.length > 0) {
node.status({text:node.buffer.length});
} else {
node.status({});
}
node.busy = null;
},500);
}
}
node.on("input", function(msg) {
if (!node.drop) {
if ( node.intervalID !== -1) {
node.buffer.push(msg);
node.reportDepth();
if (node.intervalID !== -1) {
if (!msg.hasOwnProperty("flush")) {
node.buffer.push(msg);
node.reportDepth();
}
}
else {
node.send(msg);
node.reportDepth();
node.intervalID = setInterval(function() {
if (node.buffer.length === 0) {
clearInterval(node.intervalID);
@ -155,7 +171,7 @@ module.exports = function(RED) {
node.send(node.buffer.shift());
}
node.reportDepth();
},node.rate);
}, node.rate);
}
}
else {
@ -178,6 +194,12 @@ module.exports = function(RED) {
node.buffer = [];
node.status({text:"reset"});
}
if (msg.hasOwnProperty("flush")) {
while (node.buffer.length > 0) {
node.send(node.buffer.shift());
}
node.status({});
}
});
node.on("close", function() {
clearInterval(node.intervalID);
@ -198,24 +220,31 @@ module.exports = function(RED) {
node.send(node.buffer.shift());
}
}
node.status({text:node.buffer.length});
node.reportDepth();
},node.rate);
var hit;
node.on("input", function(msg) {
if (!msg.hasOwnProperty("topic")) { msg.topic = "_none_"; }
var hit = false;
hit = false;
for (var b in node.buffer) { // check if already in queue
if (msg.topic === node.buffer[b].topic) {
node.buffer[b] = msg; // if so - replace existing entry
hit = true;
break;
}
}
if (!hit) { node.buffer.push(msg); } // if not add to end of queue
node.status({text:node.buffer.length});
if (msg.hasOwnProperty("reset")) {
node.buffer = [];
node.status({text:"reset"});
}
if (msg.hasOwnProperty("flush")) {
while (node.buffer.length > 0) {
node.send(node.buffer.shift());
}
node.status({});
}
});
node.on("close", function() {
clearInterval(node.intervalID);
@ -226,7 +255,7 @@ module.exports = function(RED) {
else if (node.pauseType === "random") {
node.on("input", function(msg) {
var wait = node.randomFirst + (node.diff * Math.random());
var id = setTimeout(function() {
var id = ourTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
node.send(msg);
node.status({});
@ -236,6 +265,7 @@ module.exports = function(RED) {
node.status({fill:"blue",shape:"dot",text:parseInt(wait/10)/100+"s"});
}
if (msg.hasOwnProperty("reset")) { clearDelayList(); }
if (msg.hasOwnProperty("flush")) { flushDelayList(); }
});
node.on("close", function() { clearDelayList(); });
}

View File

@ -572,4 +572,150 @@ describe('delay Node', function() {
delayNode1.receive({payload:4,topic:"A"}); // and nothing on second
});
});
it('can flush delay queue', function(done) {
this.timeout(2000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var t = Date.now();
var c = 0;
helperNode1.on("input", function(msg) {
msg.should.have.a.property('payload');
msg.should.have.a.property('topic');
try {
if (msg.topic === "foo") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(0,100);
c = c + 1;
}
else {
if (msg.topic === "bar") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(0,100);
c = c + 1;
}
}
if (c === 5) { done(); }
} catch(e) {
done(e);
}
});
// send test messages
delayNode1.receive({payload:1,topic:"foo"}); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({flush:true}); }); // reset the queue
});
});
it('can reset delay queue', function(done) {
this.timeout(2000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var t = Date.now();
var c = 0;
helperNode1.on("input", function(msg) {
c = c + 1;
});
setTimeout( function() {
if (c === 0) { done(); }
}, 700);
// send test messages
delayNode1.receive({payload:1,topic:"foo"}); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({reset:true}); }); // reset the queue
});
});
it('can flush rate limit queue', function(done) {
this.timeout(2000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var t = Date.now();
var c = 0;
helperNode1.on("input", function(msg) {
msg.should.have.a.property('payload');
msg.should.have.a.property('topic');
try {
if (msg.topic === "foo") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(0,100);
c = c + 1;
}
else {
if (msg.topic === "bar") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(0,100);
c = c + 1;
}
}
if (c === 5) { done(); }
} catch(e) {
done(e);
}
});
// send test messages
delayNode1.receive({payload:1,topic:"foo"}); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({flush:true}); }); // reset the queue
});
});
it('can reset rate limit queue', function(done) {
this.timeout(2000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var t = Date.now();
var c = 0;
helperNode1.on("input", function(msg) {
msg.should.have.a.property('payload');
msg.should.have.a.property('topic');
try {
if (msg.topic === "foo") {
msg.payload.should.equal(1);
(Date.now() - t).should.be.approximately(0,100);
c = c + 1;
}
} catch(e) {
done(e);
}
});
setTimeout( function() {
if (c === 1) { done(); }
}, 700);
// send test messages
delayNode1.receive({payload:1,topic:"foo"}); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({payload:1,topic:"bar"}); } ); // send something with blank topic
setImmediate( function() { delayNode1.receive({reset:true}); }); // reset the queue
});
});
});