From 1c186416995add363c89466c3263b24c786b0bea Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Mon, 4 Oct 2021 13:00:24 +0100 Subject: [PATCH] Delay node: add option to send intermediate messages on separate output --- .../editor-client/src/sass/jquery.scss | 4 +-- .../nodes/core/function/89-delay.html | 34 +++++++++++++++---- .../@node-red/nodes/core/function/89-delay.js | 3 ++ .../nodes/locales/en-US/messages.json | 4 ++- test/nodes/core/function/89-delay_spec.js | 34 +++++++++++++++++-- 5 files changed, 67 insertions(+), 12 deletions(-) diff --git a/packages/node_modules/@node-red/editor-client/src/sass/jquery.scss b/packages/node_modules/@node-red/editor-client/src/sass/jquery.scss index b5b96162d..406d10258 100644 --- a/packages/node_modules/@node-red/editor-client/src/sass/jquery.scss +++ b/packages/node_modules/@node-red/editor-client/src/sass/jquery.scss @@ -137,10 +137,10 @@ padding: 0; border: 1px solid $form-input-border-color; } -.ui-spinner input { +.ui-spinner input[type=text] { background: $form-input-background; margin: 0 17px 0 0; - padding: 6px; + padding: 8px; border: none; border-top-right-radius: 0px; border-bottom-right-radius: 0px; diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.html b/packages/node_modules/@node-red/nodes/core/function/89-delay.html index cd8fef360..3ae4637d7 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.html +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.html @@ -78,11 +78,17 @@ -
- +
+
- + + +
@@ -114,7 +120,8 @@ randomLast: {value:"5", required:true, validate:function(v) { return RED.validators.number(v) && (v >= 0); }}, randomUnits: {value: "seconds"}, drop: {value:false}, - allowrate: {value:false} + allowrate: {value:false}, + outputs: { value: 1}, }, inputs:1, outputs:1, @@ -234,12 +241,23 @@ } }).trigger("change"); + if (this.outputs === 2) { + $("#node-input-drop-select").val("emit"); + } else if (this.drop) { + $("#node-input-drop-select").val("drop"); + } else { + $("#node-input-drop-select").val("queue"); + } + $("#node-input-rate-type").on("change", function() { if (this.value === "all") { - $("#node-input-drop").attr('disabled',false).next().css("opacity",1) $("#rate-details-per-topic").hide(); + $("#node-input-drop-select-queue").attr('disabled', false); } else if (this.value === "topic") { - $("#node-input-drop").prop('checked',true).attr('disabled',true).next().css("opacity",0.5) + if ($("#node-input-drop-select").val() === "queue") { + $("#node-input-drop-select").val("drop"); + } + $("#node-input-drop-select-queue").attr('disabled', true); $("#rate-details-per-topic").show(); } }).trigger("change"); @@ -248,6 +266,7 @@ var action = $("#node-input-delay-action").val(); if (action === "delay") { this.pauseType = $("#node-input-delay-type").val(); + $("#node-input-outputs").val(1); } else if (action === "rate") { action = $("#node-input-rate-type").val(); if (action === "all") { @@ -255,6 +274,9 @@ } else { this.pauseType = $("#node-input-rate-topic-type").val(); } + var dropType = $("#node-input-drop-select").val(); + this.drop = dropType !== "queue"; + $("#node-input-outputs").val(dropType === "emit"?2:1); } } }); diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.js b/packages/node_modules/@node-red/nodes/core/function/89-delay.js index 999e6847b..c4895e0e4 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.js +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.js @@ -95,6 +95,7 @@ module.exports = function(RED) { this.droppedMsgs = 0; this.allowrate = n.allowrate|| false; this.fixedrate = this.rate; + this.outputs = n.outputs; var node = this; function ourTimeout(handler, delay, clearHandler) { @@ -324,6 +325,8 @@ module.exports = function(RED) { else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) { node.lastSent = process.hrtime(); send(msg); + } else if (node.outputs === 2) { + send([null,msg]) } } done(); diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json index 59d8f5c1a..adceaf134 100755 --- a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json +++ b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json @@ -290,7 +290,9 @@ "and": "&", "rate": "Rate", "msgper": "msg(s) per", - "dropmsg": "drop intermediate messages", + "queuemsg": "Queue intermediate messages", + "dropmsg": "Drop intermediate messages", + "sendmsg": "Send intermediate messages on 2nd output", "allowrate": "allow msg.rate (in ms) to override rate", "label": { "delay": "delay", diff --git a/test/nodes/core/function/89-delay_spec.js b/test/nodes/core/function/89-delay_spec.js index 4620458f8..17958b95b 100644 --- a/test/nodes/core/function/89-delay_spec.js +++ b/test/nodes/core/function/89-delay_spec.js @@ -251,13 +251,28 @@ describe('delay Node', function() { * @param nbUnit - the multiple of the unit, aLimit Message for nbUnit Seconds * @param runtimeInMillis - when to terminate run and count messages received */ - function dropRateLimitSECONDSTest(aLimit, nbUnit, runtimeInMillis, rateValue, done) { - var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"nbRateUnits":nbUnit,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,"wires":[["helperNode1"]]}, - {id:"helperNode1", type:"helper", wires:[]}]; + function dropRateLimitSECONDSTest(aLimit, nbUnit, runtimeInMillis, rateValue, sendIntermediate, done) { + if (!done) { + done = sendIntermediate; + sendIntermediate = false; + } + var outputs = 1; + if (sendIntermediate) { + outputs = 2; + } + var flow = [ + {"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"nbRateUnits":nbUnit,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,outputs:outputs,"wires":[["helperNode1"],["helperNode2"]]}, + {id:"helperNode1", type:"helper", wires:[]}, + {id:"helperNode2", type:"helper", wires:[]} + ] + + ; helper.load(delayNode, flow, function() { var delayNode1 = helper.getNode("delayNode1"); var helperNode1 = helper.getNode("helperNode1"); + var helperNode2 = helper.getNode("helperNode2"); var receivedMessagesStack = []; + var receivedIntermediateMessagesStack = []; // Add a small grace to the calculated delay var rate = 1000/aLimit + 10; @@ -273,6 +288,9 @@ describe('delay Node', function() { receiveTimestamp = process.hrtime(); receivedMessagesStack.push(msg); }); + helperNode2.on("input", function(msg) { + receivedIntermediateMessagesStack.push(msg); + }); var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst @@ -304,6 +322,11 @@ describe('delay Node', function() { } } foundAtLeastOneDrop.should.be.true(); + if (sendIntermediate) { + receivedIntermediateMessagesStack.length.should.be.greaterThan(0); + } else { + receivedIntermediateMessagesStack.length.should.be.exactly(0); + } done(); } catch (err) { done(err); @@ -327,6 +350,11 @@ describe('delay Node', function() { dropRateLimitSECONDSTest(2, 1, 5000, null, done); }); + it('limits the message rate to 2 per second, 5 seconds, with drop, 2nd output', function(done) { + this.timeout(6000); + dropRateLimitSECONDSTest(2, 1, 5000, null, true, done); + }); + it('limits the message rate with drop using msg.rate', function (done) { this.timeout(6000); RED.settings.nodeMessageBufferMaxLength = 3;