mirror of
https://github.com/node-red/node-red.git
synced 2023-10-10 13:36:53 +02:00
Delay node: add option to send intermediate messages on separate output
This commit is contained in:
parent
1419729458
commit
1c18641699
@ -137,10 +137,10 @@
|
|||||||
padding: 0;
|
padding: 0;
|
||||||
border: 1px solid $form-input-border-color;
|
border: 1px solid $form-input-border-color;
|
||||||
}
|
}
|
||||||
.ui-spinner input {
|
.ui-spinner input[type=text] {
|
||||||
background: $form-input-background;
|
background: $form-input-background;
|
||||||
margin: 0 17px 0 0;
|
margin: 0 17px 0 0;
|
||||||
padding: 6px;
|
padding: 8px;
|
||||||
border: none;
|
border: none;
|
||||||
border-top-right-radius: 0px;
|
border-top-right-radius: 0px;
|
||||||
border-bottom-right-radius: 0px;
|
border-bottom-right-radius: 0px;
|
||||||
|
@ -78,11 +78,17 @@
|
|||||||
<option value="day" data-i18n="delay.label.units.day.singular"></option>
|
<option value="day" data-i18n="delay.label.units.day.singular"></option>
|
||||||
</select>
|
</select>
|
||||||
</div>
|
</div>
|
||||||
<div class="form-row" id="rate-override">
|
<div class="form-row" id="rate-override" style="display: flex; align-items: center">
|
||||||
<label></label><input style="width:30px; vertical-align:baseline;" type="checkbox" id="node-input-allowrate"><label style="width: 250px;" for="node-input-allowrate" data-i18n="delay.allowrate"></label>
|
<label></label><input style="width:30px; margin:0" type="checkbox" id="node-input-allowrate"><label style="margin:0;width: auto;" for="node-input-allowrate" data-i18n="delay.allowrate"></label>
|
||||||
</div>
|
</div>
|
||||||
<div class="form-row" id="rate-details-drop">
|
<div class="form-row" id="rate-details-drop">
|
||||||
<label></label><input style="width:30px;; vertical-align:baseline;" type="checkbox" id="node-input-drop"><label style="width: 250px;" for="node-input-drop" data-i18n="delay.dropmsg"></label>
|
<input type="hidden" id="node-input-outputs" value="1">
|
||||||
|
<label></label>
|
||||||
|
<select id="node-input-drop-select" style="width: 70%">
|
||||||
|
<option id="node-input-drop-select-queue" value="queue" data-i18n="delay.queuemsg"></option>
|
||||||
|
<option value="drop" data-i18n="delay.dropmsg"></option>
|
||||||
|
<option value="emit" data-i18n="delay.sendmsg"></option>
|
||||||
|
</select>
|
||||||
</div>
|
</div>
|
||||||
<div class="form-row" id="rate-details-per-topic">
|
<div class="form-row" id="rate-details-per-topic">
|
||||||
<label></label>
|
<label></label>
|
||||||
@ -114,7 +120,8 @@
|
|||||||
randomLast: {value:"5", required:true, validate:function(v) { return RED.validators.number(v) && (v >= 0); }},
|
randomLast: {value:"5", required:true, validate:function(v) { return RED.validators.number(v) && (v >= 0); }},
|
||||||
randomUnits: {value: "seconds"},
|
randomUnits: {value: "seconds"},
|
||||||
drop: {value:false},
|
drop: {value:false},
|
||||||
allowrate: {value:false}
|
allowrate: {value:false},
|
||||||
|
outputs: { value: 1},
|
||||||
},
|
},
|
||||||
inputs:1,
|
inputs:1,
|
||||||
outputs:1,
|
outputs:1,
|
||||||
@ -234,12 +241,23 @@
|
|||||||
}
|
}
|
||||||
}).trigger("change");
|
}).trigger("change");
|
||||||
|
|
||||||
|
if (this.outputs === 2) {
|
||||||
|
$("#node-input-drop-select").val("emit");
|
||||||
|
} else if (this.drop) {
|
||||||
|
$("#node-input-drop-select").val("drop");
|
||||||
|
} else {
|
||||||
|
$("#node-input-drop-select").val("queue");
|
||||||
|
}
|
||||||
|
|
||||||
$("#node-input-rate-type").on("change", function() {
|
$("#node-input-rate-type").on("change", function() {
|
||||||
if (this.value === "all") {
|
if (this.value === "all") {
|
||||||
$("#node-input-drop").attr('disabled',false).next().css("opacity",1)
|
|
||||||
$("#rate-details-per-topic").hide();
|
$("#rate-details-per-topic").hide();
|
||||||
|
$("#node-input-drop-select-queue").attr('disabled', false);
|
||||||
} else if (this.value === "topic") {
|
} else if (this.value === "topic") {
|
||||||
$("#node-input-drop").prop('checked',true).attr('disabled',true).next().css("opacity",0.5)
|
if ($("#node-input-drop-select").val() === "queue") {
|
||||||
|
$("#node-input-drop-select").val("drop");
|
||||||
|
}
|
||||||
|
$("#node-input-drop-select-queue").attr('disabled', true);
|
||||||
$("#rate-details-per-topic").show();
|
$("#rate-details-per-topic").show();
|
||||||
}
|
}
|
||||||
}).trigger("change");
|
}).trigger("change");
|
||||||
@ -248,6 +266,7 @@
|
|||||||
var action = $("#node-input-delay-action").val();
|
var action = $("#node-input-delay-action").val();
|
||||||
if (action === "delay") {
|
if (action === "delay") {
|
||||||
this.pauseType = $("#node-input-delay-type").val();
|
this.pauseType = $("#node-input-delay-type").val();
|
||||||
|
$("#node-input-outputs").val(1);
|
||||||
} else if (action === "rate") {
|
} else if (action === "rate") {
|
||||||
action = $("#node-input-rate-type").val();
|
action = $("#node-input-rate-type").val();
|
||||||
if (action === "all") {
|
if (action === "all") {
|
||||||
@ -255,6 +274,9 @@
|
|||||||
} else {
|
} else {
|
||||||
this.pauseType = $("#node-input-rate-topic-type").val();
|
this.pauseType = $("#node-input-rate-topic-type").val();
|
||||||
}
|
}
|
||||||
|
var dropType = $("#node-input-drop-select").val();
|
||||||
|
this.drop = dropType !== "queue";
|
||||||
|
$("#node-input-outputs").val(dropType === "emit"?2:1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -95,6 +95,7 @@ module.exports = function(RED) {
|
|||||||
this.droppedMsgs = 0;
|
this.droppedMsgs = 0;
|
||||||
this.allowrate = n.allowrate|| false;
|
this.allowrate = n.allowrate|| false;
|
||||||
this.fixedrate = this.rate;
|
this.fixedrate = this.rate;
|
||||||
|
this.outputs = n.outputs;
|
||||||
var node = this;
|
var node = this;
|
||||||
|
|
||||||
function ourTimeout(handler, delay, clearHandler) {
|
function ourTimeout(handler, delay, clearHandler) {
|
||||||
@ -324,6 +325,8 @@ module.exports = function(RED) {
|
|||||||
else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) {
|
else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) {
|
||||||
node.lastSent = process.hrtime();
|
node.lastSent = process.hrtime();
|
||||||
send(msg);
|
send(msg);
|
||||||
|
} else if (node.outputs === 2) {
|
||||||
|
send([null,msg])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
done();
|
done();
|
||||||
|
@ -290,7 +290,9 @@
|
|||||||
"and": "&",
|
"and": "&",
|
||||||
"rate": "Rate",
|
"rate": "Rate",
|
||||||
"msgper": "msg(s) per",
|
"msgper": "msg(s) per",
|
||||||
"dropmsg": "drop intermediate messages",
|
"queuemsg": "Queue intermediate messages",
|
||||||
|
"dropmsg": "Drop intermediate messages",
|
||||||
|
"sendmsg": "Send intermediate messages on 2nd output",
|
||||||
"allowrate": "allow msg.rate (in ms) to override rate",
|
"allowrate": "allow msg.rate (in ms) to override rate",
|
||||||
"label": {
|
"label": {
|
||||||
"delay": "delay",
|
"delay": "delay",
|
||||||
|
@ -251,13 +251,28 @@ describe('delay Node', function() {
|
|||||||
* @param nbUnit - the multiple of the unit, aLimit Message for nbUnit Seconds
|
* @param nbUnit - the multiple of the unit, aLimit Message for nbUnit Seconds
|
||||||
* @param runtimeInMillis - when to terminate run and count messages received
|
* @param runtimeInMillis - when to terminate run and count messages received
|
||||||
*/
|
*/
|
||||||
function dropRateLimitSECONDSTest(aLimit, nbUnit, runtimeInMillis, rateValue, done) {
|
function dropRateLimitSECONDSTest(aLimit, nbUnit, runtimeInMillis, rateValue, sendIntermediate, done) {
|
||||||
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"nbRateUnits":nbUnit,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,"wires":[["helperNode1"]]},
|
if (!done) {
|
||||||
{id:"helperNode1", type:"helper", wires:[]}];
|
done = sendIntermediate;
|
||||||
|
sendIntermediate = false;
|
||||||
|
}
|
||||||
|
var outputs = 1;
|
||||||
|
if (sendIntermediate) {
|
||||||
|
outputs = 2;
|
||||||
|
}
|
||||||
|
var flow = [
|
||||||
|
{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"nbRateUnits":nbUnit,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,outputs:outputs,"wires":[["helperNode1"],["helperNode2"]]},
|
||||||
|
{id:"helperNode1", type:"helper", wires:[]},
|
||||||
|
{id:"helperNode2", type:"helper", wires:[]}
|
||||||
|
]
|
||||||
|
|
||||||
|
;
|
||||||
helper.load(delayNode, flow, function() {
|
helper.load(delayNode, flow, function() {
|
||||||
var delayNode1 = helper.getNode("delayNode1");
|
var delayNode1 = helper.getNode("delayNode1");
|
||||||
var helperNode1 = helper.getNode("helperNode1");
|
var helperNode1 = helper.getNode("helperNode1");
|
||||||
|
var helperNode2 = helper.getNode("helperNode2");
|
||||||
var receivedMessagesStack = [];
|
var receivedMessagesStack = [];
|
||||||
|
var receivedIntermediateMessagesStack = [];
|
||||||
|
|
||||||
// Add a small grace to the calculated delay
|
// Add a small grace to the calculated delay
|
||||||
var rate = 1000/aLimit + 10;
|
var rate = 1000/aLimit + 10;
|
||||||
@ -273,6 +288,9 @@ describe('delay Node', function() {
|
|||||||
receiveTimestamp = process.hrtime();
|
receiveTimestamp = process.hrtime();
|
||||||
receivedMessagesStack.push(msg);
|
receivedMessagesStack.push(msg);
|
||||||
});
|
});
|
||||||
|
helperNode2.on("input", function(msg) {
|
||||||
|
receivedIntermediateMessagesStack.push(msg);
|
||||||
|
});
|
||||||
|
|
||||||
var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst
|
var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst
|
||||||
|
|
||||||
@ -304,6 +322,11 @@ describe('delay Node', function() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
foundAtLeastOneDrop.should.be.true();
|
foundAtLeastOneDrop.should.be.true();
|
||||||
|
if (sendIntermediate) {
|
||||||
|
receivedIntermediateMessagesStack.length.should.be.greaterThan(0);
|
||||||
|
} else {
|
||||||
|
receivedIntermediateMessagesStack.length.should.be.exactly(0);
|
||||||
|
}
|
||||||
done();
|
done();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
done(err);
|
done(err);
|
||||||
@ -327,6 +350,11 @@ describe('delay Node', function() {
|
|||||||
dropRateLimitSECONDSTest(2, 1, 5000, null, done);
|
dropRateLimitSECONDSTest(2, 1, 5000, null, done);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('limits the message rate to 2 per second, 5 seconds, with drop, 2nd output', function(done) {
|
||||||
|
this.timeout(6000);
|
||||||
|
dropRateLimitSECONDSTest(2, 1, 5000, null, true, done);
|
||||||
|
});
|
||||||
|
|
||||||
it('limits the message rate with drop using msg.rate', function (done) {
|
it('limits the message rate with drop using msg.rate', function (done) {
|
||||||
this.timeout(6000);
|
this.timeout(6000);
|
||||||
RED.settings.nodeMessageBufferMaxLength = 3;
|
RED.settings.nodeMessageBufferMaxLength = 3;
|
||||||
|
Loading…
Reference in New Issue
Block a user