From b54e9edfa605a4c580d0b5fff2abc3cf6c2c3c66 Mon Sep 17 00:00:00 2001 From: Dave C-J Date: Fri, 24 Oct 2014 20:00:08 +0100 Subject: [PATCH] Add "topic based fair queue" option to delay node --- nodes/core/core/89-delay.html | 37 ++++++++++++++++++++++++++++------- nodes/core/core/89-delay.js | 34 +++++++++++++++++++++++++++++--- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/nodes/core/core/89-delay.html b/nodes/core/core/89-delay.html index dcb0a5b9b..997a0b4e3 100644 --- a/nodes/core/core/89-delay.html +++ b/nodes/core/core/89-delay.html @@ -21,8 +21,9 @@
@@ -38,9 +39,9 @@
- + - +
- +
+
@@ -74,8 +76,13 @@ @@ -125,14 +132,22 @@ $("#delay-details").show(); $("#rate-details").hide(); $("#random-details").hide(); + $("#node-input-dr").hide(); } else if (this.pauseType == "rate") { $("#delay-details").hide(); $("#rate-details").show(); $("#random-details").hide(); + $("#node-input-dr").show(); } else if (this.pauseType == "random") { $("#delay-details").hide(); $("#rate-details").hide(); $("#random-details").show(); + $("#node-input-dr").hide(); + } else if (this.pauseType == "queue") { + $("#delay-details").hide(); + $("#rate-details").show(); + $("#random-details").hide(); + $("#node-input-dr").hide(); } if (!this.timeoutUnits) { @@ -152,14 +167,22 @@ $("#delay-details").show(); $("#rate-details").hide(); $("#random-details").hide(); + $("#node-input-dr").hide(); } else if (this.value == "rate") { $("#delay-details").hide(); $("#rate-details").show(); $("#random-details").hide(); - } else if (this.value == "random") { + $("#node-input-dr").show(); + } else if (this.value == "random") { $("#delay-details").hide(); $("#rate-details").hide(); $("#random-details").show(); + $("#node-input-dr").hide(); + } else if (this.value == "queue") { + $("#delay-details").hide(); + $("#rate-details").show(); + $("#random-details").hide(); + $("#node-input-dr").hide(); } }); } diff --git a/nodes/core/core/89-delay.js b/nodes/core/core/89-delay.js index 3c4e1c014..dfc5616ae 100644 --- a/nodes/core/core/89-delay.js +++ b/nodes/core/core/89-delay.js @@ -17,10 +17,10 @@ //Simple node to introduce a pause into a flow module.exports = function(RED) { "use strict"; - + var MILLIS_TO_NANOS = 1000000; var SECONDS_TO_NANOS = 1000000000; - + function random(n) { var wait = n.randomFirst + (n.diff * Math.random()); if (n.buffer.length > 0) { @@ -136,7 +136,7 @@ module.exports = function(RED) { if (node.lastSent) { timeSinceLast = process.hrtime(node.lastSent); } - if (!node.lastSent) { // ensuring that we always send the first message + if (!node.lastSent) { // ensuring that we always send the first message node.lastSent = process.hrtime(); node.send(msg); } else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) { @@ -151,6 +151,34 @@ module.exports = function(RED) { this.buffer = []; }); + } else if (this.pauseType === "queue") { + this.intervalID = setInterval(function() { + if (node.buffer.length > 0) { + node.send(node.buffer.shift()); // send the first on the queue + } + node.status({text:node.buffer.length}); + //console.log(node.buffer); + },node.rate); + + this.on("input", function(msg) { + if (!msg.hasOwnProperty("topic")) { msg.topic = "_none_"; } + var hit = false; + for (var b in node.buffer) { // check if already in queue + if (msg.topic === node.buffer[b].topic) { + node.buffer[b] = msg; // if so - replace existing entry + hit = true; + } + } + if (!hit) { node.buffer.push(msg); } // if not add to end of queue + node.status({text:node.buffer.length}); + }); + + this.on("close", function() { + clearInterval(this.intervalID); + this.buffer = []; + node.status({text:node.buffer.length}); + }); + } else if (this.pauseType === "random") { this.on("input",function(msg){ node.buffer.push(msg);