Add new msg.delay option to delay node

and msg.reset to clear/fluch existing delays
This commit is contained in:
Dave Conway-Jones 2017-05-11 16:12:38 +01:00
parent c75dc3cc36
commit 49389d6f06
No known key found for this signature in database
GPG Key ID: 81B04231572A9A2D
4 changed files with 149 additions and 41 deletions

View File

@ -15,17 +15,18 @@
--> -->
<script type="text/x-red" data-template-name="delay"> <script type="text/x-red" data-template-name="delay">
<div class="form-row"> <div class="form-row">
<label for="node-input-pauseType"><i class="fa fa-tasks"></i> <span data-i18n="delay.action"></span></label> <label for="node-input-pauseType"><i class="fa fa-tasks"></i> <span data-i18n="delay.action"></span></label>
<select id="node-input-pauseType" style="width:270px !important"> <select id="node-input-pauseType" style="width:270px !important">
<option value="delay" data-i18n="delay.delaymsg"></option> <option value="delay" data-i18n="delay.delaymsg"></option>
<option value="delayv" data-i18n="delay.delayvarmsg"></option>
<option value="random" data-i18n="delay.randomdelay"></option> <option value="random" data-i18n="delay.randomdelay"></option>
<option value="rate" data-i18n="delay.limitrate"></option> <option value="rate" data-i18n="delay.limitrate"></option>
<option value="queue" data-i18n="delay.fairqueue"></option> <option value="queue" data-i18n="delay.fairqueue"></option>
<option value="timed" data-i18n="delay.timedqueue"></option> <option value="timed" data-i18n="delay.timedqueue"></option>
</select> </select>
</div> </div>
<div id="delay-details" class="form-row"> <div id="delay-details" class="form-row">
<label for="node-input-timeout"><i class="fa fa-clock-o"></i> <span data-i18n="delay.for"></span></label> <label for="node-input-timeout"><i class="fa fa-clock-o"></i> <span data-i18n="delay.for"></span></label>
<input type="text" id="node-input-timeout" placeholder="Time" style="text-align:end; width:50px !important"> <input type="text" id="node-input-timeout" placeholder="Time" style="text-align:end; width:50px !important">
@ -53,7 +54,6 @@
<div id="node-input-dr"><input style="margin: 20px 0 20px 100px; width: 30px;" type="checkbox" id="node-input-drop"><label style="width: 250px;" for="node-input-drop"><span data-i18n="delay.dropmsg"></span></label></div> <div id="node-input-dr"><input style="margin: 20px 0 20px 100px; width: 30px;" type="checkbox" id="node-input-drop"><label style="width: 250px;" for="node-input-drop"><span data-i18n="delay.dropmsg"></span></label></div>
</div> </div>
<div id="random-details" class="form-row"> <div id="random-details" class="form-row">
<label for="node-input-randomFirst"><i class="fa fa-clock-o"></i> <span data-i18n="delay.between"></span></label> <label for="node-input-randomFirst"><i class="fa fa-clock-o"></i> <span data-i18n="delay.between"></span></label>
<input type="text" id="node-input-randomFirst" placeholder="" style="text-align:end; width:30px !important"> <input type="text" id="node-input-randomFirst" placeholder="" style="text-align:end; width:30px !important">
@ -72,7 +72,6 @@
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label> <label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name"> <input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
</div> </div>
</script> </script>
<script type="text/x-red" data-help-name="delay"> <script type="text/x-red" data-help-name="delay">
@ -80,6 +79,7 @@
<p>The default delay is 5 seconds and rate limit of 1 msg/second, but both can be configured.</p> <p>The default delay is 5 seconds and rate limit of 1 msg/second, but both can be configured.</p>
<p>When set to rate limit messages, they are spread across the configured time period. It can <p>When set to rate limit messages, they are spread across the configured time period. It can
also be set to discard any intermediate messages that arrive.</p> also be set to discard any intermediate messages that arrive.</p>
<p>If configured to use <code>msg.delay</code> the delay is specified in milliseconds.</p>
<p>The "topic based fair queue" adds messages to a release queue tagged by their <code>msg.topic</code> property. <p>The "topic based fair queue" adds messages to a release queue tagged by their <code>msg.topic</code> property.
At each "tick", derived from the rate, the next "topic" is released. At each "tick", derived from the rate, the next "topic" is released.
Any messages arriving on the same topic before release replace those in that position in the queue. Any messages arriving on the same topic before release replace those in that position in the queue.
@ -110,15 +110,17 @@
outputs:1, outputs:1,
icon: "timer.png", icon: "timer.png",
label: function() { label: function() {
if (this.pauseType == "delay") { if (this.pauseType == "delayv") {
var units = this.timeoutUnits ? this.timeoutUnits.charAt(0) : "s"; return this.name||this._("delay.label.variable");
if (this.timeoutUnits == "milliseconds") { units = "ms"; } } else if (this.pauseType == "delay") {
return this.name||this._("delay.label.delay")+" "+this.timeout+" "+units; var units = this.timeoutUnits ? this.timeoutUnits.charAt(0) : "s";
if (this.timeoutUnits == "milliseconds") { units = "ms"; }
return this.name||this._("delay.label.delay")+" "+this.timeout+" "+units;
} else if (this.pauseType == "rate") { } else if (this.pauseType == "rate") {
var units = this.rateUnits ? (this.nbRateUnits > 1 ? this.nbRateUnits : '') + this.rateUnits.charAt(0) : "s"; var units = this.rateUnits ? (this.nbRateUnits > 1 ? this.nbRateUnits : '') + this.rateUnits.charAt(0) : "s";
return this.name||this._("delay.label.limit")+" "+this.rate+" msg/"+units; return this.name||this._("delay.label.limit")+" "+this.rate+" msg/"+units;
} else if (this.pauseType == "random") { } else if (this.pauseType == "random") {
return this.name || this._("delay.label.random"); return this.name || this._("delay.label.random");
} }
else if (this.pauseType == "timed") { else if (this.pauseType == "timed") {
var units = ''; var units = '';
@ -154,10 +156,10 @@
var $this = $(this); var $this = $(this);
var val = parseInt($this.val()); var val = parseInt($this.val());
var type = "singular"; var type = "singular";
if(val > 1) { if (val > 1) {
type = "plural"; type = "plural";
} }
if($this.attr("data-type") == type) { if ($this.attr("data-type") == type) {
return; return;
} }
$this.attr("data-type", type); $this.attr("data-type", type);
@ -166,16 +168,19 @@
var key = "delay.label.units." + $option.val() + "." + type; var key = "delay.label.units." + $option.val() + "." + type;
$option.attr('data-i18n', 'node-red:' + key); $option.attr('data-i18n', 'node-red:' + key);
$option.html(node._(key)); $option.html(node._(key));
}) });
}); });
if (this.pauseType == "delay") { if (this.pauseType == "delay") {
$("#delay-details").show(); $("#delay-details").show();
$("#rate-details").hide(); $("#rate-details").hide();
$("#random-details").hide(); $("#random-details").hide();
$("#node-input-dr").hide(); $("#node-input-dr").hide();
} else if (this.pauseType == "delayv") {
$("#delay-details").hide();
$("#rate-details").hide();
$("#random-details").hide();
$("#node-input-dr").hide();
} else if (this.pauseType == "rate") { } else if (this.pauseType == "rate") {
$("#delay-details").hide(); $("#delay-details").hide();
$("#rate-details").show(); $("#rate-details").show();
@ -216,12 +221,17 @@
$("#rate-details").hide(); $("#rate-details").hide();
$("#random-details").hide(); $("#random-details").hide();
$("#node-input-dr").hide(); $("#node-input-dr").hide();
} else if (this.value == "delayv") {
$("#delay-details").hide();
$("#rate-details").hide();
$("#random-details").hide();
$("#node-input-dr").hide();
} else if (this.value == "rate") { } else if (this.value == "rate") {
$("#delay-details").hide(); $("#delay-details").hide();
$("#rate-details").show(); $("#rate-details").show();
$("#random-details").hide(); $("#random-details").hide();
$("#node-input-dr").show(); $("#node-input-dr").show();
} else if (this.value == "random") { } else if (this.value == "random") {
$("#delay-details").hide(); $("#delay-details").hide();
$("#rate-details").hide(); $("#rate-details").hide();
$("#random-details").show(); $("#random-details").show();

View File

@ -80,10 +80,17 @@ module.exports = function(RED) {
this.drop = n.drop; this.drop = n.drop;
var node = this; var node = this;
var clearDelayList = function() {
for (var i=0; i<node.idList.length; i++ ) {
clearTimeout(node.idList[i]);
}
node.idList = [];
node.status({text:"reset"});
}
if (node.pauseType === "delay") { if (node.pauseType === "delay") {
node.on("input", function(msg) { node.on("input", function(msg) {
var id; var id = setTimeout(function() {
id = setTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1); node.idList.splice(node.idList.indexOf(id),1);
if (node.idList.length === 0) { node.status({}); } if (node.idList.length === 0) { node.status({}); }
node.send(msg); node.send(msg);
@ -92,17 +99,28 @@ module.exports = function(RED) {
if ((node.timeout > 1000) && (node.idList.length !== 0)) { if ((node.timeout > 1000) && (node.idList.length !== 0)) {
node.status({fill:"blue",shape:"dot",text:" "}); node.status({fill:"blue",shape:"dot",text:" "});
} }
if (msg.hasOwnProperty("reset")) { clearDelayList(); }
}); });
node.on("close", function() { clearDelayList(); });
node.on("close", function() { }
for (var i=0; i<node.idList.length; i++ ) { else if (node.pauseType === "delayv") {
clearTimeout(node.idList[i]); node.on("input", function(msg) {
var delayvar = Number(msg.delay || 0);
if (delayvar < 0) { delayvar = 0; }
var id = setTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
if (node.idList.length === 0) { node.status({}); }
node.send(msg);
}, delayvar);
node.idList.push(id);
if ((delayvar >= 1) && (node.idList.length !== 0)) {
node.status({fill:"blue",shape:"dot",text:delayvar/1000+"s"});
} }
node.idList = []; if (msg.hasOwnProperty("reset")) { clearDelayList(); }
node.status({});
}); });
node.on("close", function() { clearDelayList(); });
} else if (node.pauseType === "rate") { }
else if (node.pauseType === "rate") {
var olddepth = 0; var olddepth = 0;
node.on("input", function(msg) { node.on("input", function(msg) {
if (!node.drop) { if (!node.drop) {
@ -119,7 +137,8 @@ module.exports = function(RED) {
olddepth = 10000; olddepth = 10000;
node.warn(node.name + " " + RED._("delay.error.buffer1")); node.warn(node.name + " " + RED._("delay.error.buffer1"));
} }
} else { }
else {
node.send(msg); node.send(msg);
node.intervalID = setInterval(function() { node.intervalID = setInterval(function() {
if (node.buffer.length === 0) { if (node.buffer.length === 0) {
@ -134,7 +153,8 @@ module.exports = function(RED) {
} }
},node.rate); },node.rate);
} }
} else { }
else {
var timeSinceLast; var timeSinceLast;
if (node.lastSent) { if (node.lastSent) {
timeSinceLast = process.hrtime(node.lastSent); timeSinceLast = process.hrtime(node.lastSent);
@ -142,20 +162,25 @@ module.exports = function(RED) {
if (!node.lastSent) { // ensuring that we always send the first message if (!node.lastSent) { // ensuring that we always send the first message
node.lastSent = process.hrtime(); node.lastSent = process.hrtime();
node.send(msg); node.send(msg);
} else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) { }
else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) {
node.lastSent = process.hrtime(); node.lastSent = process.hrtime();
node.send(msg); node.send(msg);
} }
} }
if (msg.hasOwnProperty("reset")) {
clearInterval(node.intervalID);
node.buffer = [];
node.status({text:"reset"});
}
}); });
node.on("close", function() { node.on("close", function() {
clearInterval(node.intervalID); clearInterval(node.intervalID);
node.buffer = []; node.buffer = [];
node.status({}); node.status({});
}); });
}
} else if ((node.pauseType === "queue") || (node.pauseType === "timed")) { else if ((node.pauseType === "queue") || (node.pauseType === "timed")) {
node.intervalID = setInterval(function() { node.intervalID = setInterval(function() {
if (node.pauseType === "queue") { if (node.pauseType === "queue") {
if (node.buffer.length > 0) { if (node.buffer.length > 0) {
@ -181,30 +206,32 @@ module.exports = function(RED) {
} }
if (!hit) { node.buffer.push(msg); } // if not add to end of queue if (!hit) { node.buffer.push(msg); } // if not add to end of queue
node.status({text:node.buffer.length}); node.status({text:node.buffer.length});
if (msg.hasOwnProperty("reset")) {
node.buffer = [];
node.status({text:"reset"});
}
}); });
node.on("close", function() { node.on("close", function() {
clearInterval(node.intervalID); clearInterval(node.intervalID);
node.buffer = []; node.buffer = [];
node.status({}); node.status({});
}); });
}
} else if (node.pauseType === "random") { else if (node.pauseType === "random") {
node.on("input", function(msg) { node.on("input", function(msg) {
var wait = node.randomFirst + (node.diff * Math.random()); var wait = node.randomFirst + (node.diff * Math.random());
var id = setTimeout(function() { var id = setTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1); node.idList.splice(node.idList.indexOf(id),1);
node.send(msg); node.send(msg);
node.status({});
}, wait); }, wait);
node.idList.push(id); node.idList.push(id);
}); if ((node.timeout >= 1000) && (node.idList.length !== 0)) {
node.status({fill:"blue",shape:"dot",text:parseInt(wait/10)/100+"s"});
node.on("close", function() {
for (var i=0; i<node.idList.length; i++ ) {
clearTimeout(node.idList[i]);
} }
node.idList = []; if (msg.hasOwnProperty("reset")) { clearDelayList(); }
}); });
node.on("close", function() { clearDelayList(); });
} }
} }
RED.nodes.registerType("delay",DelayNode); RED.nodes.registerType("delay",DelayNode);

View File

@ -186,6 +186,7 @@
"action": "Action", "action": "Action",
"for": "For", "for": "For",
"delaymsg": "Delay message", "delaymsg": "Delay message",
"delayvarmsg": "Set delay with msg.delay",
"randomdelay": "Random delay", "randomdelay": "Random delay",
"limitrate": "Limit rate to", "limitrate": "Limit rate to",
"fairqueue": "Topic based fair queue", "fairqueue": "Topic based fair queue",
@ -205,6 +206,7 @@
"dropmsg": "drop intermediate messages", "dropmsg": "drop intermediate messages",
"label": { "label": {
"delay": "delay", "delay": "delay",
"variable": "variable",
"limit": "limit", "limit": "limit",
"random": "random", "random": "random",
"queue": "queue", "queue": "queue",

View File

@ -339,6 +339,75 @@ describe('delay Node', function() {
} }
} }
/**
* Runs a VARIABLE DELAY test, checks if the delay is in between the given timeout values
* @param aTimeoutFrom - the timeout quantity which is the minimal acceptable wait period
* @param aTimeoutTo - the timeout quantity which is the maximum acceptable wait period
* @param aTimeoutUnit - the unit of the timeout: milliseconds, seconds, minutes, hours, days
* @param delay - the variable delay: milliseconds
*/
function variableDelayTest(aTimeoutFrom, aTimeoutTo, aTimeoutUnit, delay, done) {
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delayv","timeout":5,"timeoutUnits":"seconds","rate":"1","rateUnits":"second","randomFirst":aTimeoutFrom,"randomLast":aTimeoutTo,"randomUnits":aTimeoutUnit,"drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
helperNode1.on("input", function(msg) {
try {
var endTime = process.hrtime(startTime);
var runtimeNanos = ( (endTime[0] * nanosToSeconds) + endTime[1] );
var runtimeSeconds = runtimeNanos / nanosToSeconds;
var aTimeoutFromUnifiedToSeconds;
var aTimeoutToUnifiedToSeconds;
// calculating the timeout in seconds
if (aTimeoutUnit == TimeUnitEnum.MILLIS) {
aTimeoutFromUnifiedToSeconds = aTimeoutFrom / millisToSeconds;
aTimeoutToUnifiedToSeconds = aTimeoutTo / millisToSeconds;
} else if (aTimeoutUnit == TimeUnitEnum.SECONDS) {
aTimeoutFromUnifiedToSeconds = aTimeoutFrom;
aTimeoutToUnifiedToSeconds = aTimeoutTo;
} else if (aTimeoutUnit == TimeUnitEnum.MINUTES) {
aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToMinutes;
aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToMinutes;
} else if (aTimeoutUnit == TimeUnitEnum.HOURS) {
aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToHours;
aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToHours;
} else if (aTimeoutUnit == TimeUnitEnum.DAYS) {
aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToDays;
aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToDays;
}
if (inBetweenDelays(aTimeoutFromUnifiedToSeconds, aTimeoutToUnifiedToSeconds, runtimeSeconds, GRACE_PERCENTAGE)) {
done();
} else {
try {
should.fail(null, null, "Delayed runtime seconds " + runtimeSeconds + " was not \"in between enough\" enough to expected values of: " + aTimeoutFromUnifiedToSeconds + " and " + aTimeoutToUnifiedToSeconds);
} catch (err) {
done(err);
}
}
} catch(err) {
done(err);
}
});
var startTime = process.hrtime();
delayNode1.receive({payload:"delayMe", delay:delay});
});
}
it('variable delay set by msg.delay the message in milliseconds', function(done) {
variableDelayTest("200", "300", "milliseconds", 250, done);
});
it('variable delay is zero if msg.delay not specified', function(done) {
variableDelayTest("0", "50", "milliseconds", null, done);
});
it('variable delay is zero if msg.delay is negative', function(done) {
variableDelayTest("0", "50", "milliseconds", -250, done);
});
/** /**
* Runs a RANDOM DELAY test, checks if the delay is in between the given timeout values * Runs a RANDOM DELAY test, checks if the delay is in between the given timeout values
* @param aTimeoutFrom - the timeout quantity which is the minimal acceptable wait period * @param aTimeoutFrom - the timeout quantity which is the minimal acceptable wait period