Add timed release mode to delay node

This commit is contained in:
Dave Conway-Jones 2016-03-20 17:46:12 +00:00
parent 0cd4a2b4ec
commit 906703db5f
4 changed files with 75 additions and 8 deletions

View File

@ -19,10 +19,11 @@
<div class="form-row"> <div class="form-row">
<label for="node-input-pauseType"><i class="fa fa-tasks"></i> <span data-i18n="delay.action"></span></label> <label for="node-input-pauseType"><i class="fa fa-tasks"></i> <span data-i18n="delay.action"></span></label>
<select id="node-input-pauseType" style="width:270px !important"> <select id="node-input-pauseType" style="width:270px !important">
<option value="delay" data-i18n="delay.delaymsg"></option> <option value="delay" data-i18n="delay.delaymsg"></option>
<option value="random" data-i18n="delay.randomdelay"></option> <option value="random" data-i18n="delay.randomdelay"></option>
<option value="rate" data-i18n="delay.limitrate"></option> <option value="rate" data-i18n="delay.limitrate"></option>
<option value="queue" data-i18n="delay.fairqueue"></option> <option value="queue" data-i18n="delay.fairqueue"></option>
<option value="timed" data-i18n="delay.timedqueue"></option>
</select> </select>
</div> </div>
<div id="delay-details" class="form-row"> <div id="delay-details" class="form-row">
@ -82,6 +83,9 @@
At each "tick", derived from the rate, the next "topic" is released. At each "tick", derived from the rate, the next "topic" is released.
Any messages arriving on the same topic before release replace those in that position in the queue. Any messages arriving on the same topic before release replace those in that position in the queue.
So each "topic" gets a turn - but the most recent value is always the one sent.</p> So each "topic" gets a turn - but the most recent value is always the one sent.</p>
<p>The "timed release queue" adds messages to an array based on their <code>msg.topic</code> property.
At each "tick", all the latest messages are released. Any new messages arriving before release will
replace those of the same topic.</p>
</script> </script>
<script type="text/javascript"> <script type="text/javascript">
@ -114,6 +118,9 @@
} else if (this.pauseType == "random") { } else if (this.pauseType == "random") {
return this.name || this._("delay.label.random"); return this.name || this._("delay.label.random");
} }
else if (this.pauseType == "timed") {
return this.name || this.rate+" "+this._("delay.label.timed")+" "+this.rateUnits;
}
else { else {
var units = this.rateUnits ? this.rateUnits.charAt(0) : "s"; var units = this.rateUnits ? this.rateUnits.charAt(0) : "s";
return this.name || this._("delay.label.queue")+" "+this.rate+" msg/"+units; return this.name || this._("delay.label.queue")+" "+this.rate+" msg/"+units;
@ -149,6 +156,11 @@
$("#rate-details").show(); $("#rate-details").show();
$("#random-details").hide(); $("#random-details").hide();
$("#node-input-dr").hide(); $("#node-input-dr").hide();
} else if (this.pauseType == "timed") {
$("#delay-details").hide();
$("#rate-details").show();
$("#random-details").hide();
$("#node-input-dr").hide();
} }
if (!this.timeoutUnits) { if (!this.timeoutUnits) {
@ -184,6 +196,11 @@
$("#rate-details").show(); $("#rate-details").show();
$("#random-details").hide(); $("#random-details").hide();
$("#node-input-dr").hide(); $("#node-input-dr").hide();
} else if (this.value == "timed") {
$("#delay-details").hide();
$("#rate-details").show();
$("#random-details").hide();
$("#node-input-dr").hide();
} }
}); });
} }

View File

@ -147,10 +147,17 @@ module.exports = function(RED) {
node.status({}); node.status({});
}); });
} else if (this.pauseType === "queue") { } else if ((this.pauseType === "queue") || (this.pauseType === "timed")) {
this.intervalID = setInterval(function() { this.intervalID = setInterval(function() {
if (node.buffer.length > 0) { if (this.pauseType === "queue") {
node.send(node.buffer.shift()); // send the first on the queue if (node.buffer.length > 0) {
node.send(node.buffer.shift()); // send the first on the queue
}
}
else {
while (node.buffer.length > 0) { // send the whole queue
node.send(node.buffer.shift());
}
} }
node.status({text:node.buffer.length}); node.status({text:node.buffer.length});
},node.rate); },node.rate);

View File

@ -152,6 +152,7 @@
"randomdelay": "Random delay", "randomdelay": "Random delay",
"limitrate": "Limit rate to", "limitrate": "Limit rate to",
"fairqueue": "Topic based fair queue", "fairqueue": "Topic based fair queue",
"timedqueue": "Timed release queue",
"milisecs": "Miliseconds", "milisecs": "Miliseconds",
"secs": "Seconds", "secs": "Seconds",
"sec": "Second", "sec": "Second",
@ -169,7 +170,8 @@
"delay": "delay", "delay": "delay",
"limit": "limit", "limit": "limit",
"random": "random", "random": "random",
"queue": "queue" "queue": "queue",
"timed": "releases per"
}, },
"error": { "error": {
"buffer": "buffer exceeded 1000 messages" "buffer": "buffer exceeded 1000 messages"

View File

@ -477,4 +477,45 @@ describe('delay Node', function() {
}); });
}); });
it('handles timed queue', function(done) {
this.timeout(6000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"timed","timeout":5,"timeoutUnits":"seconds","rate":1000,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var messages = 2;
var c = 0;
helperNode1.on("input", function(msg) {
c += 1;
if (c === 1) {
msg.should.have.property("topic","A");
msg.should.have.property("payload",3);
}
else if (c === 2) {
msg.should.have.property("topic","_none_");
msg.should.have.property("payload",2);
}
else if (c === 3) {
msg.should.have.property("topic","_none_");
msg.should.have.property("payload","Biscuit");
done();
}
});
// send test messages
delayNode1.receive({payload:1,topic:"A"});
delayNode1.receive({payload:1});
delayNode1.receive({payload:2,topic:"A"});
delayNode1.receive({payload:3,topic:"A"}); // should get this
delayNode1.receive({payload:2}); // and this
setTimeout( function() {
delayNode1.receive({payload:"Biscuit"}); // and then this
},2000);
});
});
}); });