mirror of
https://github.com/node-red/node-red.git
synced 2023-10-10 13:36:53 +02:00
parent
961c9f3fa9
commit
9876570189
@ -1,5 +1,5 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright 2013 IBM Corp.
|
* Copyright 2013, 2014 IBM Corp.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@ -17,6 +17,10 @@
|
|||||||
//Simple node to introduce a pause into a flow
|
//Simple node to introduce a pause into a flow
|
||||||
module.exports = function(RED) {
|
module.exports = function(RED) {
|
||||||
"use strict";
|
"use strict";
|
||||||
|
|
||||||
|
var MILLIS_TO_NANOS = 1000000;
|
||||||
|
var SECONDS_TO_NANOS = 1000000000;
|
||||||
|
|
||||||
function random(n) {
|
function random(n) {
|
||||||
var wait = n.randomFirst + (n.diff * Math.random());
|
var wait = n.randomFirst + (n.diff * Math.random());
|
||||||
if (n.buffer.length > 0) {
|
if (n.buffer.length > 0) {
|
||||||
@ -80,7 +84,7 @@ module.exports = function(RED) {
|
|||||||
this.buffer = [];
|
this.buffer = [];
|
||||||
this.intervalID = -1;
|
this.intervalID = -1;
|
||||||
this.randomID = -1;
|
this.randomID = -1;
|
||||||
this.lastSent = 0;
|
this.lastSent;
|
||||||
this.drop = n.drop;
|
this.drop = n.drop;
|
||||||
var node = this;
|
var node = this;
|
||||||
|
|
||||||
@ -128,8 +132,14 @@ module.exports = function(RED) {
|
|||||||
},node.rate);
|
},node.rate);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
var now = Date.now();
|
var now;
|
||||||
if (now-node.lastSent > node.rate) {
|
if(node.lastSent) {
|
||||||
|
now = process.hrtime(node.lastSent);
|
||||||
|
}
|
||||||
|
if(!node.lastSent) { // ensuring that we always send the first message
|
||||||
|
node.lastSent = process.hrtime();
|
||||||
|
node.send(msg);
|
||||||
|
} else if ( ( (now[0] * SECONDS_TO_NANOS) + now[1] ) > (node.rate * MILLIS_TO_NANOS) ) {
|
||||||
node.lastSent = now;
|
node.lastSent = now;
|
||||||
node.send(msg);
|
node.send(msg);
|
||||||
}
|
}
|
||||||
|
361
test/nodes/core/core/89-delay_spec.js
Normal file
361
test/nodes/core/core/89-delay_spec.js
Normal file
@ -0,0 +1,361 @@
|
|||||||
|
/**
|
||||||
|
* Copyright 2014 IBM Corp.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
**/
|
||||||
|
|
||||||
|
var should = require("should");
|
||||||
|
|
||||||
|
var delayNode = require("../../../../nodes/core/core/89-delay.js");
|
||||||
|
var helper = require("../../helper.js");
|
||||||
|
|
||||||
|
var GRACE_PERCENTAGE=10;
|
||||||
|
|
||||||
|
var nanosToSeconds = 1000000000;
|
||||||
|
var millisToSeconds = 1000;
|
||||||
|
|
||||||
|
var secondsToMinutes = 60;
|
||||||
|
var secondsToHours = 3600;
|
||||||
|
var secondsToDays = 86400;
|
||||||
|
|
||||||
|
|
||||||
|
describe('delayNode', function() {
|
||||||
|
|
||||||
|
beforeEach(function(done) {
|
||||||
|
helper.startServer(done);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(function(done) {
|
||||||
|
helper.unload();
|
||||||
|
helper.stopServer(done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should be loaded', function(done) {
|
||||||
|
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[[]]}];
|
||||||
|
helper.load(delayNode, flow, function() {
|
||||||
|
var delayNode1 = helper.getNode("delayNode1");
|
||||||
|
delayNode1.should.have.property('name', 'delayNode');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
var TimeUnitEnum = {
|
||||||
|
MILLIS : "milliseconds",
|
||||||
|
SECONDS : "seconds",
|
||||||
|
MINUTES : "minutes",
|
||||||
|
HOURS : "hours",
|
||||||
|
DAYS : "days"
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tells whether two numeric values are close enough to each other
|
||||||
|
* @param actualValue - the value we're testing
|
||||||
|
* @param expectedValue - the value we're matching the test value against
|
||||||
|
* @param tolerancePercent - the percentage of tolerated deviation (0 means equals)
|
||||||
|
*/
|
||||||
|
function closeEnough(actualValue, expectedValue, tolerancePercent) {
|
||||||
|
var toReturn;
|
||||||
|
var toleranceFraction = expectedValue * (tolerancePercent/100);
|
||||||
|
var minExpected = expectedValue - toleranceFraction;
|
||||||
|
var maxExpected = expectedValue + toleranceFraction;
|
||||||
|
|
||||||
|
if(actualValue >= minExpected && actualValue <= maxExpected) {
|
||||||
|
toReturn = true;
|
||||||
|
} else {
|
||||||
|
toReturn = false;
|
||||||
|
}
|
||||||
|
return toReturn;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs a delay test
|
||||||
|
* @param aTimeout - the timeout quantity
|
||||||
|
* @param aTimeoutUnit - the unit of the timeout: milliseconds, seconds, minutes, hours, days
|
||||||
|
*/
|
||||||
|
function genericDelayTest(aTimeout, aTimeoutUnit, done) {
|
||||||
|
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":aTimeout,"timeoutUnits":aTimeoutUnit,"rate":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
|
||||||
|
{id:"helperNode1", type:"helper", wires:[]}];
|
||||||
|
helper.load(delayNode, flow, function() {
|
||||||
|
var delayNode1 = helper.getNode("delayNode1");
|
||||||
|
var helperNode1 = helper.getNode("helperNode1");
|
||||||
|
helperNode1.on("input", function(msg) {
|
||||||
|
try {
|
||||||
|
var endTime = process.hrtime(startTime);
|
||||||
|
var runtimeNanos = ( (endTime[0] * nanosToSeconds) + endTime[1] );
|
||||||
|
var runtimeSeconds = runtimeNanos / nanosToSeconds;
|
||||||
|
var aTimeoutUnifiedToSeconds;
|
||||||
|
|
||||||
|
// calculating the timeout in seconds
|
||||||
|
if(aTimeoutUnit == TimeUnitEnum.MILLIS) {
|
||||||
|
aTimeoutUnifiedToSeconds = aTimeout / millisToSeconds;
|
||||||
|
} else if(aTimeoutUnit == TimeUnitEnum.SECONDS) {
|
||||||
|
aTimeoutUnifiedToSeconds = aTimeout;
|
||||||
|
} else if(aTimeoutUnit == TimeUnitEnum.MINUTES) {
|
||||||
|
aTimeoutUnifiedToSeconds = aTimeout * secondsToMinutes;
|
||||||
|
} else if(aTimeoutUnit == TimeUnitEnum.HOURS) {
|
||||||
|
aTimeoutUnifiedToSeconds = aTimeout * secondsToHours;
|
||||||
|
} else if(aTimeoutUnit == TimeUnitEnum.DAYS) {
|
||||||
|
aTimeoutUnifiedToSeconds = aTimeout * secondsToDays;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(closeEnough(runtimeSeconds, aTimeoutUnifiedToSeconds, GRACE_PERCENTAGE)) {
|
||||||
|
done();
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
should.fail(null, null, "Delayed runtime seconds " + runtimeSeconds + " was not close enough to exlected timeout seconds: " + aTimeoutUnifiedToSeconds);
|
||||||
|
} catch (err) {
|
||||||
|
done(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch(err) {
|
||||||
|
done(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
var startTime = process.hrtime();
|
||||||
|
delayNode1.receive({payload:"delayMe"});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We send a message, take a timestamp then when the message is received by the helper node, we take another timestamp.
|
||||||
|
* Then check if the message has been delayed by the expected amount.
|
||||||
|
*/
|
||||||
|
it('delays the message in seconds', function(done) {
|
||||||
|
genericDelayTest(0.5, "seconds", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('delays the message in milliseconds', function(done) {
|
||||||
|
genericDelayTest(500, "milliseconds", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('delays the message in minutes', function(done) { // this is also 0.5 seconds
|
||||||
|
genericDelayTest(0.00833, "minutes", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('delays the message in hours', function(done) { // this is also 0.5 seconds
|
||||||
|
genericDelayTest(0.0001388, "hours", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('delays the message in days', function(done) { // this is also 0.5 seconds
|
||||||
|
genericDelayTest(0.000005787, "days", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs a rate limit test - only testing seconds!
|
||||||
|
* @param aLimit - the message limit count
|
||||||
|
* @param aDrop - whether to drop the messages that are over the limit
|
||||||
|
* @param runtimeInMillis - when to terminate run and count messages received
|
||||||
|
*/
|
||||||
|
function genericRateLimitSECONDSTest(aLimit, aDrop, runtimeInMillis, done) {
|
||||||
|
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":aDrop,"wires":[["helperNode1"]]},
|
||||||
|
{id:"helperNode1", type:"helper", wires:[]}];
|
||||||
|
helper.load(delayNode, flow, function() {
|
||||||
|
var delayNode1 = helper.getNode("delayNode1");
|
||||||
|
var helperNode1 = helper.getNode("helperNode1");
|
||||||
|
var receivedMessagesStack = [];
|
||||||
|
helperNode1.on("input", function(msg) {
|
||||||
|
receivedMessagesStack.push(msg);
|
||||||
|
});
|
||||||
|
|
||||||
|
var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst
|
||||||
|
|
||||||
|
var startTime = process.hrtime();
|
||||||
|
|
||||||
|
var i = 0;
|
||||||
|
for(; i < possibleMaxMessageCount + 1; i++) {
|
||||||
|
delayNode1.receive({payload:i});
|
||||||
|
}
|
||||||
|
|
||||||
|
// if drop is enabled then we need to send a few messages delayed so that they don't get dropped
|
||||||
|
if(aDrop === true) {
|
||||||
|
setTimeout(function() {
|
||||||
|
delayNode1.receive({payload:++i});
|
||||||
|
}, runtimeInMillis - 300); // should give enough time to squeeze another message in
|
||||||
|
}
|
||||||
|
|
||||||
|
setTimeout(function() {
|
||||||
|
if(aDrop === true) { // messages should be dropped in the middle, receive 1st and last one
|
||||||
|
try {
|
||||||
|
receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1);
|
||||||
|
receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination
|
||||||
|
receivedMessagesStack.pop().payload.should.be.exactly(i); // means we received the last message injected just before test termination
|
||||||
|
done();
|
||||||
|
} catch (err) {
|
||||||
|
done(err);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount);
|
||||||
|
for(var j = 0; j < receivedMessagesStack.length; j++) {
|
||||||
|
if(receivedMessagesStack[j].payload === j) {
|
||||||
|
if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far
|
||||||
|
done();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
done(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, runtimeInMillis);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
it('limits the message rate to 1 per second', function(done) {
|
||||||
|
genericRateLimitSECONDSTest(1, false, 1500, done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('limits the message rate to 2 per second, 2 seconds', function(done) {
|
||||||
|
genericRateLimitSECONDSTest(2, false, 2100, done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('limits the message rate to 1 per second, 2 seconds, with drop', function(done) {
|
||||||
|
genericRateLimitSECONDSTest(1, true, 2300, done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('limits the message rate to 2 per second, 2 seconds, with drop', function(done) {
|
||||||
|
genericRateLimitSECONDSTest(2, true, 2300, done);
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if the actualTimeout is gracefully in between the timeoutFrom and timeoutTo
|
||||||
|
* values. Gracefully means that inBetween could actually mean smaller/greater values
|
||||||
|
* than the timeout range so long as it's within an actual grace percentage.
|
||||||
|
* @param timeoutFrom - The expected timeout range (low number)
|
||||||
|
* @param timeoutTo - The expected timeout range (high number)
|
||||||
|
* @param actualTimeout - The actual measured timeout value of test
|
||||||
|
* @param allowedGracePercent - The percentage of grace allowed
|
||||||
|
*/
|
||||||
|
function inBetweenDelays(timeoutFrom, timeoutTo, actualTimeout, allowedGracePercent) {
|
||||||
|
if(closeEnough(actualTimeout, timeoutFrom, allowedGracePercent)) {
|
||||||
|
return true;
|
||||||
|
} else if(closeEnough(actualTimeout, timeoutTo, allowedGracePercent)) {
|
||||||
|
return true;
|
||||||
|
} else if(timeoutFrom < actualTimeout && timeoutTo > actualTimeout) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs a RANDOM DELAY test, checks if the delay is in between the given timeout values
|
||||||
|
* @param aTimeoutFrom - the timeout quantity which is the minimal acceptable wait period
|
||||||
|
* @param aTimeoutTo - the timeout quantity which is the maximum acceptable wait period
|
||||||
|
* @param aTimeoutUnit - the unit of the timeout: milliseconds, seconds, minutes, hours, days
|
||||||
|
*/
|
||||||
|
function randomDelayTest(aTimeoutFrom, aTimeoutTo, aTimeoutUnit, done) {
|
||||||
|
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"random","timeout":5,"timeoutUnits":"seconds","rate":"1","rateUnits":"second","randomFirst":aTimeoutFrom,"randomLast":aTimeoutTo,"randomUnits":aTimeoutUnit,"drop":false,"wires":[["helperNode1"]]},
|
||||||
|
{id:"helperNode1", type:"helper", wires:[]}];
|
||||||
|
helper.load(delayNode, flow, function() {
|
||||||
|
var delayNode1 = helper.getNode("delayNode1");
|
||||||
|
var helperNode1 = helper.getNode("helperNode1");
|
||||||
|
helperNode1.on("input", function(msg) {
|
||||||
|
try {
|
||||||
|
var endTime = process.hrtime(startTime);
|
||||||
|
var runtimeNanos = ( (endTime[0] * nanosToSeconds) + endTime[1] );
|
||||||
|
var runtimeSeconds = runtimeNanos / nanosToSeconds;
|
||||||
|
var aTimeoutFromUnifiedToSeconds;
|
||||||
|
var aTimeoutToUnifiedToSeconds;
|
||||||
|
|
||||||
|
// calculating the timeout in seconds
|
||||||
|
if(aTimeoutUnit == TimeUnitEnum.MILLIS) {
|
||||||
|
aTimeoutFromUnifiedToSeconds = aTimeoutFrom / millisToSeconds;
|
||||||
|
aTimeoutToUnifiedToSeconds = aTimeoutTo / millisToSeconds;
|
||||||
|
} else if(aTimeoutUnit == TimeUnitEnum.SECONDS) {
|
||||||
|
aTimeoutFromUnifiedToSeconds = aTimeoutFrom;
|
||||||
|
aTimeoutToUnifiedToSeconds = aTimeoutTo;
|
||||||
|
} else if(aTimeoutUnit == TimeUnitEnum.MINUTES) {
|
||||||
|
aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToMinutes;
|
||||||
|
aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToMinutes;
|
||||||
|
} else if(aTimeoutUnit == TimeUnitEnum.HOURS) {
|
||||||
|
aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToHours;
|
||||||
|
aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToHours;
|
||||||
|
} else if(aTimeoutUnit == TimeUnitEnum.DAYS) {
|
||||||
|
aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToDays;
|
||||||
|
aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToDays;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(inBetweenDelays(aTimeoutFromUnifiedToSeconds, aTimeoutToUnifiedToSeconds, runtimeSeconds, GRACE_PERCENTAGE)) {
|
||||||
|
done();
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
should.fail(null, null, "Delayed runtime seconds " + runtimeSeconds + " was not \"in between enough\" enough to expected values of: " + aTimeoutFromUnifiedToSeconds + " and " + aTimeoutToUnifiedToSeconds);
|
||||||
|
} catch (err) {
|
||||||
|
done(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch(err) {
|
||||||
|
done(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
var startTime = process.hrtime();
|
||||||
|
delayNode1.receive({payload:"delayMe"});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
it('randomly delays the message in seconds', function(done) {
|
||||||
|
randomDelayTest(0.4, 0.8, "seconds", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it(' randomly delays the message in milliseconds', function(done) {
|
||||||
|
randomDelayTest(400, 800, "milliseconds", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('randomly delays the message in minutes', function(done) {
|
||||||
|
randomDelayTest(0.0066, 0.0133, "minutes", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('delays the message in hours', function(done) {
|
||||||
|
randomDelayTest(0.000111111, 0.000222222, "hours", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('delays the message in days', function(done) {
|
||||||
|
randomDelayTest(0.0000046296, 0.0000092593, "days", done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles bursts using a buffer', function(done) {
|
||||||
|
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":1000,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
|
||||||
|
{id:"helperNode1", type:"helper", wires:[]}];
|
||||||
|
helper.load(delayNode, flow, function() {
|
||||||
|
var delayNode1 = helper.getNode("delayNode1");
|
||||||
|
var helperNode1 = helper.getNode("helperNode1");
|
||||||
|
|
||||||
|
var sinon = require('sinon');
|
||||||
|
|
||||||
|
var receivedWarning = false;
|
||||||
|
var messageBurstSize = 1500;
|
||||||
|
|
||||||
|
// we ensure that we note that a warning is received for buffer growth
|
||||||
|
sinon.stub(delayNode1, 'warn', function(warning){
|
||||||
|
if(warning.indexOf("buffer exceeded 1000 messages" > -1)) {
|
||||||
|
receivedWarning = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// we ensure that the warning is received for buffer size and that we get the last message
|
||||||
|
helperNode1.on("input", function(msg) {
|
||||||
|
if(msg.payload === (messageBurstSize - 1) && receivedWarning === true) {
|
||||||
|
done(); // it will timeout if we don't receive the last message
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// send 1500 messages as quickly as possible
|
||||||
|
for(var i = 0; i < messageBurstSize; i++) {
|
||||||
|
delayNode1.receive({payload:i});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
});
|
Loading…
Reference in New Issue
Block a user