diff --git a/nodes/core/core/89-delay.js b/nodes/core/core/89-delay.js index 0377760d6..c92ed39d1 100644 --- a/nodes/core/core/89-delay.js +++ b/nodes/core/core/89-delay.js @@ -1,5 +1,5 @@ /** - * Copyright 2013 IBM Corp. + * Copyright 2013, 2014 IBM Corp. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,10 @@ //Simple node to introduce a pause into a flow module.exports = function(RED) { "use strict"; + + var MILLIS_TO_NANOS = 1000000; + var SECONDS_TO_NANOS = 1000000000; + function random(n) { var wait = n.randomFirst + (n.diff * Math.random()); if (n.buffer.length > 0) { @@ -80,7 +84,7 @@ module.exports = function(RED) { this.buffer = []; this.intervalID = -1; this.randomID = -1; - this.lastSent = 0; + this.lastSent; this.drop = n.drop; var node = this; @@ -128,8 +132,14 @@ module.exports = function(RED) { },node.rate); } } else { - var now = Date.now(); - if (now-node.lastSent > node.rate) { + var now; + if(node.lastSent) { + now = process.hrtime(node.lastSent); + } + if(!node.lastSent) { // ensuring that we always send the first message + node.lastSent = process.hrtime(); + node.send(msg); + } else if ( ( (now[0] * SECONDS_TO_NANOS) + now[1] ) > (node.rate * MILLIS_TO_NANOS) ) { node.lastSent = now; node.send(msg); } diff --git a/test/nodes/core/core/89-delay_spec.js b/test/nodes/core/core/89-delay_spec.js new file mode 100644 index 000000000..b5f1462a6 --- /dev/null +++ b/test/nodes/core/core/89-delay_spec.js @@ -0,0 +1,361 @@ +/** + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +var should = require("should"); + +var delayNode = require("../../../../nodes/core/core/89-delay.js"); +var helper = require("../../helper.js"); + +var GRACE_PERCENTAGE=10; + +var nanosToSeconds = 1000000000; +var millisToSeconds = 1000; + +var secondsToMinutes = 60; +var secondsToHours = 3600; +var secondsToDays = 86400; + + +describe('delayNode', function() { + + beforeEach(function(done) { + helper.startServer(done); + }); + + afterEach(function(done) { + helper.unload(); + helper.stopServer(done); + }); + + it('should be loaded', function(done) { + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[[]]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + delayNode1.should.have.property('name', 'delayNode'); + done(); + }); + }); + + var TimeUnitEnum = { + MILLIS : "milliseconds", + SECONDS : "seconds", + MINUTES : "minutes", + HOURS : "hours", + DAYS : "days" + } + + /** + * Tells whether two numeric values are close enough to each other + * @param actualValue - the value we're testing + * @param expectedValue - the value we're matching the test value against + * @param tolerancePercent - the percentage of tolerated deviation (0 means equals) + */ + function closeEnough(actualValue, expectedValue, tolerancePercent) { + var toReturn; + var toleranceFraction = expectedValue * (tolerancePercent/100); + var minExpected = expectedValue - toleranceFraction; + var maxExpected = expectedValue + toleranceFraction; + + if(actualValue >= minExpected && actualValue <= maxExpected) { + toReturn = true; + } else { + toReturn = false; + } + return toReturn; + } + + /** + * Runs a delay test + * @param aTimeout - the timeout quantity + * @param aTimeoutUnit - the unit of the timeout: milliseconds, seconds, minutes, hours, days + */ + function genericDelayTest(aTimeout, aTimeoutUnit, done) { + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":aTimeout,"timeoutUnits":aTimeoutUnit,"rate":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + helperNode1.on("input", function(msg) { + try { + var endTime = process.hrtime(startTime); + var runtimeNanos = ( (endTime[0] * nanosToSeconds) + endTime[1] ); + var runtimeSeconds = runtimeNanos / nanosToSeconds; + var aTimeoutUnifiedToSeconds; + + // calculating the timeout in seconds + if(aTimeoutUnit == TimeUnitEnum.MILLIS) { + aTimeoutUnifiedToSeconds = aTimeout / millisToSeconds; + } else if(aTimeoutUnit == TimeUnitEnum.SECONDS) { + aTimeoutUnifiedToSeconds = aTimeout; + } else if(aTimeoutUnit == TimeUnitEnum.MINUTES) { + aTimeoutUnifiedToSeconds = aTimeout * secondsToMinutes; + } else if(aTimeoutUnit == TimeUnitEnum.HOURS) { + aTimeoutUnifiedToSeconds = aTimeout * secondsToHours; + } else if(aTimeoutUnit == TimeUnitEnum.DAYS) { + aTimeoutUnifiedToSeconds = aTimeout * secondsToDays; + } + + if(closeEnough(runtimeSeconds, aTimeoutUnifiedToSeconds, GRACE_PERCENTAGE)) { + done(); + } else { + try { + should.fail(null, null, "Delayed runtime seconds " + runtimeSeconds + " was not close enough to exlected timeout seconds: " + aTimeoutUnifiedToSeconds); + } catch (err) { + done(err); + } + } + } catch(err) { + done(err); + } + }); + var startTime = process.hrtime(); + delayNode1.receive({payload:"delayMe"}); + }); + } + + /** + * We send a message, take a timestamp then when the message is received by the helper node, we take another timestamp. + * Then check if the message has been delayed by the expected amount. + */ + it('delays the message in seconds', function(done) { + genericDelayTest(0.5, "seconds", done); + }); + + it('delays the message in milliseconds', function(done) { + genericDelayTest(500, "milliseconds", done); + }); + + it('delays the message in minutes', function(done) { // this is also 0.5 seconds + genericDelayTest(0.00833, "minutes", done); + }); + + it('delays the message in hours', function(done) { // this is also 0.5 seconds + genericDelayTest(0.0001388, "hours", done); + }); + + it('delays the message in days', function(done) { // this is also 0.5 seconds + genericDelayTest(0.000005787, "days", done); + }); + + /** + * Runs a rate limit test - only testing seconds! + * @param aLimit - the message limit count + * @param aDrop - whether to drop the messages that are over the limit + * @param runtimeInMillis - when to terminate run and count messages received + */ + function genericRateLimitSECONDSTest(aLimit, aDrop, runtimeInMillis, done) { + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":aDrop,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var receivedMessagesStack = []; + helperNode1.on("input", function(msg) { + receivedMessagesStack.push(msg); + }); + + var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst + + var startTime = process.hrtime(); + + var i = 0; + for(; i < possibleMaxMessageCount + 1; i++) { + delayNode1.receive({payload:i}); + } + + // if drop is enabled then we need to send a few messages delayed so that they don't get dropped + if(aDrop === true) { + setTimeout(function() { + delayNode1.receive({payload:++i}); + }, runtimeInMillis - 300); // should give enough time to squeeze another message in + } + + setTimeout(function() { + if(aDrop === true) { // messages should be dropped in the middle, receive 1st and last one + try { + receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1); + receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination + receivedMessagesStack.pop().payload.should.be.exactly(i); // means we received the last message injected just before test termination + done(); + } catch (err) { + done(err); + } + } else { + try { + receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount); + for(var j = 0; j < receivedMessagesStack.length; j++) { + if(receivedMessagesStack[j].payload === j) { + if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far + done(); + } + } else { + should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i); + } + } + } catch (err) { + done(err); + } + } + }, runtimeInMillis); + }); + } + + it('limits the message rate to 1 per second', function(done) { + genericRateLimitSECONDSTest(1, false, 1500, done); + }); + + it('limits the message rate to 2 per second, 2 seconds', function(done) { + genericRateLimitSECONDSTest(2, false, 2100, done); + }); + + it('limits the message rate to 1 per second, 2 seconds, with drop', function(done) { + genericRateLimitSECONDSTest(1, true, 2300, done); + }); + + it('limits the message rate to 2 per second, 2 seconds, with drop', function(done) { + genericRateLimitSECONDSTest(2, true, 2300, done); + }); + + /** + * Returns true if the actualTimeout is gracefully in between the timeoutFrom and timeoutTo + * values. Gracefully means that inBetween could actually mean smaller/greater values + * than the timeout range so long as it's within an actual grace percentage. + * @param timeoutFrom - The expected timeout range (low number) + * @param timeoutTo - The expected timeout range (high number) + * @param actualTimeout - The actual measured timeout value of test + * @param allowedGracePercent - The percentage of grace allowed + */ + function inBetweenDelays(timeoutFrom, timeoutTo, actualTimeout, allowedGracePercent) { + if(closeEnough(actualTimeout, timeoutFrom, allowedGracePercent)) { + return true; + } else if(closeEnough(actualTimeout, timeoutTo, allowedGracePercent)) { + return true; + } else if(timeoutFrom < actualTimeout && timeoutTo > actualTimeout) { + return true; + } else { + return false; + } + } + + /** + * Runs a RANDOM DELAY test, checks if the delay is in between the given timeout values + * @param aTimeoutFrom - the timeout quantity which is the minimal acceptable wait period + * @param aTimeoutTo - the timeout quantity which is the maximum acceptable wait period + * @param aTimeoutUnit - the unit of the timeout: milliseconds, seconds, minutes, hours, days + */ + function randomDelayTest(aTimeoutFrom, aTimeoutTo, aTimeoutUnit, done) { + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"random","timeout":5,"timeoutUnits":"seconds","rate":"1","rateUnits":"second","randomFirst":aTimeoutFrom,"randomLast":aTimeoutTo,"randomUnits":aTimeoutUnit,"drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + helperNode1.on("input", function(msg) { + try { + var endTime = process.hrtime(startTime); + var runtimeNanos = ( (endTime[0] * nanosToSeconds) + endTime[1] ); + var runtimeSeconds = runtimeNanos / nanosToSeconds; + var aTimeoutFromUnifiedToSeconds; + var aTimeoutToUnifiedToSeconds; + + // calculating the timeout in seconds + if(aTimeoutUnit == TimeUnitEnum.MILLIS) { + aTimeoutFromUnifiedToSeconds = aTimeoutFrom / millisToSeconds; + aTimeoutToUnifiedToSeconds = aTimeoutTo / millisToSeconds; + } else if(aTimeoutUnit == TimeUnitEnum.SECONDS) { + aTimeoutFromUnifiedToSeconds = aTimeoutFrom; + aTimeoutToUnifiedToSeconds = aTimeoutTo; + } else if(aTimeoutUnit == TimeUnitEnum.MINUTES) { + aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToMinutes; + aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToMinutes; + } else if(aTimeoutUnit == TimeUnitEnum.HOURS) { + aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToHours; + aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToHours; + } else if(aTimeoutUnit == TimeUnitEnum.DAYS) { + aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToDays; + aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToDays; + } + + if(inBetweenDelays(aTimeoutFromUnifiedToSeconds, aTimeoutToUnifiedToSeconds, runtimeSeconds, GRACE_PERCENTAGE)) { + done(); + } else { + try { + should.fail(null, null, "Delayed runtime seconds " + runtimeSeconds + " was not \"in between enough\" enough to expected values of: " + aTimeoutFromUnifiedToSeconds + " and " + aTimeoutToUnifiedToSeconds); + } catch (err) { + done(err); + } + } + } catch(err) { + done(err); + } + }); + var startTime = process.hrtime(); + delayNode1.receive({payload:"delayMe"}); + }); + } + + it('randomly delays the message in seconds', function(done) { + randomDelayTest(0.4, 0.8, "seconds", done); + }); + + it(' randomly delays the message in milliseconds', function(done) { + randomDelayTest(400, 800, "milliseconds", done); + }); + + it('randomly delays the message in minutes', function(done) { + randomDelayTest(0.0066, 0.0133, "minutes", done); + }); + + it('delays the message in hours', function(done) { + randomDelayTest(0.000111111, 0.000222222, "hours", done); + }); + + it('delays the message in days', function(done) { + randomDelayTest(0.0000046296, 0.0000092593, "days", done); + }); + + it('handles bursts using a buffer', function(done) { + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":1000,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + + var sinon = require('sinon'); + + var receivedWarning = false; + var messageBurstSize = 1500; + + // we ensure that we note that a warning is received for buffer growth + sinon.stub(delayNode1, 'warn', function(warning){ + if(warning.indexOf("buffer exceeded 1000 messages" > -1)) { + receivedWarning = true; + } + }); + + // we ensure that the warning is received for buffer size and that we get the last message + helperNode1.on("input", function(msg) { + if(msg.payload === (messageBurstSize - 1) && receivedWarning === true) { + done(); // it will timeout if we don't receive the last message + } + }); + // send 1500 messages as quickly as possible + for(var i = 0; i < messageBurstSize; i++) { + delayNode1.receive({payload:i}); + } + }); + }); + +});