Add "topic based fair queue" option to delay node

This commit is contained in:
Dave C-J 2014-10-24 20:00:08 +01:00
parent cf81de415a
commit b54e9edfa6
2 changed files with 61 additions and 10 deletions

View File

@ -21,8 +21,9 @@
<label for="node-input-pauseType"><i class="fa fa-tasks"></i> Action</label>
<select id="node-input-pauseType" style="width:270px !important">
<option value="delay">Delay message</option>
<option value="rate">Limit rate to</option>
<option value="random">Random delay</option>
<option value="rate">Limit rate to</option>
<option value="queue">Topic based fair queue</option>
</select>
</div>
<div id="delay-details" class="form-row">
@ -38,9 +39,9 @@
</div>
<div id="rate-details" class="form-row">
<label for="node-input-rate"><i class="fa fa-clock-o"></i> To</label>
<label for="node-input-rate"><i class="fa fa-clock-o"></i> Rate</label>
<input type="text" id="node-input-rate" placeholder="1" style="direction:rtl; width:30px !important">
<label for="node-input-reateUnits">msg(s) per</label>
<label for="node-input-rateUnits">msg(s) per</label>
<select id="node-input-rateUnits" style="width:140px !important">
<option value="second">Second</option>
<option value="minute">Minute</option>
@ -48,9 +49,10 @@
<option value="day">Day</option>
</select>
<br/>
<input style="margin: 20px 0 20px 100px; width: 30px;" type="checkbox" id="node-input-drop"><label style="width: 250px;" for="node-input-drop">drop intermediate messages</label>
<div id="node-input-dr"><input style="margin: 20px 0 20px 100px; width: 30px;" type="checkbox" id="node-input-drop"><label style="width: 250px;" for="node-input-drop">drop intermediate messages</label></div>
</div>
<div id="random-details" class="form-row">
<label for="node-input-randomFirst"><i class="fa fa-clock-o"></i> Between</label>
<input type="text" id="node-input-randomFirst" placeholder="" style="directon:rtl; width:30px !important">
@ -74,8 +76,13 @@
<!-- Next, some simple help text is provided for the node. -->
<script type="text/x-red" data-help-name="delay">
<p>Introduces a delay into a flow or rate limts messges</p>
<p>Default delay is 5 seconds and rate limit of 1 msg/second, but both can be configured</p>
<p>Introduces a delay into a flow or rate limits messages.</p>
<p>Default delay is 5 seconds and rate limit of 1 msg/second, but both can be configured.</p>
<p>If you select a rate limit you may optionally discard any intermediate messages that arrive.</p>
<p>The "topic based fair queue" adds messages to a release queue tagged by their <b>msg.topic</b> property.
At each "tick", derived from the rate, the next "topic" is released.
Any messages arriving on the same topic before release replace those in that position in the queue.
So each "topic" gets a turn - but the most recent value is always the one sent.</p>
</script>
<!-- Finally, the node type is registered along with all of its properties -->
@ -125,14 +132,22 @@
$("#delay-details").show();
$("#rate-details").hide();
$("#random-details").hide();
$("#node-input-dr").hide();
} else if (this.pauseType == "rate") {
$("#delay-details").hide();
$("#rate-details").show();
$("#random-details").hide();
$("#node-input-dr").show();
} else if (this.pauseType == "random") {
$("#delay-details").hide();
$("#rate-details").hide();
$("#random-details").show();
$("#node-input-dr").hide();
} else if (this.pauseType == "queue") {
$("#delay-details").hide();
$("#rate-details").show();
$("#random-details").hide();
$("#node-input-dr").hide();
}
if (!this.timeoutUnits) {
@ -152,14 +167,22 @@
$("#delay-details").show();
$("#rate-details").hide();
$("#random-details").hide();
$("#node-input-dr").hide();
} else if (this.value == "rate") {
$("#delay-details").hide();
$("#rate-details").show();
$("#random-details").hide();
} else if (this.value == "random") {
$("#node-input-dr").show();
} else if (this.value == "random") {
$("#delay-details").hide();
$("#rate-details").hide();
$("#random-details").show();
$("#node-input-dr").hide();
} else if (this.value == "queue") {
$("#delay-details").hide();
$("#rate-details").show();
$("#random-details").hide();
$("#node-input-dr").hide();
}
});
}

View File

@ -17,10 +17,10 @@
//Simple node to introduce a pause into a flow
module.exports = function(RED) {
"use strict";
var MILLIS_TO_NANOS = 1000000;
var SECONDS_TO_NANOS = 1000000000;
function random(n) {
var wait = n.randomFirst + (n.diff * Math.random());
if (n.buffer.length > 0) {
@ -136,7 +136,7 @@ module.exports = function(RED) {
if (node.lastSent) {
timeSinceLast = process.hrtime(node.lastSent);
}
if (!node.lastSent) { // ensuring that we always send the first message
if (!node.lastSent) { // ensuring that we always send the first message
node.lastSent = process.hrtime();
node.send(msg);
} else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) {
@ -151,6 +151,34 @@ module.exports = function(RED) {
this.buffer = [];
});
} else if (this.pauseType === "queue") {
this.intervalID = setInterval(function() {
if (node.buffer.length > 0) {
node.send(node.buffer.shift()); // send the first on the queue
}
node.status({text:node.buffer.length});
//console.log(node.buffer);
},node.rate);
this.on("input", function(msg) {
if (!msg.hasOwnProperty("topic")) { msg.topic = "_none_"; }
var hit = false;
for (var b in node.buffer) { // check if already in queue
if (msg.topic === node.buffer[b].topic) {
node.buffer[b] = msg; // if so - replace existing entry
hit = true;
}
}
if (!hit) { node.buffer.push(msg); } // if not add to end of queue
node.status({text:node.buffer.length});
});
this.on("close", function() {
clearInterval(this.intervalID);
this.buffer = [];
node.status({text:node.buffer.length});
});
} else if (this.pauseType === "random") {
this.on("input",function(msg){
node.buffer.push(msg);