/** * Copyright 2014 IBM Corp. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. **/ var should = require("should"); var delayNode = require("../../../../nodes/core/core/89-delay.js"); var helper = require("../../helper.js"); var GRACE_PERCENTAGE=10; var nanosToSeconds = 1000000000; var millisToSeconds = 1000; var secondsToMinutes = 60; var secondsToHours = 3600; var secondsToDays = 86400; describe('delayNode', function() { beforeEach(function(done) { helper.startServer(done); }); afterEach(function(done) { helper.unload(); helper.stopServer(done); }); it('should be loaded', function(done) { var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[[]]}]; helper.load(delayNode, flow, function() { var delayNode1 = helper.getNode("delayNode1"); delayNode1.should.have.property('name', 'delayNode'); done(); }); }); var TimeUnitEnum = { MILLIS : "milliseconds", SECONDS : "seconds", MINUTES : "minutes", HOURS : "hours", DAYS : "days" } /** * Tells whether two numeric values are close enough to each other * @param actualValue - the value we're testing * @param expectedValue - the value we're matching the test value against * @param tolerancePercent - the percentage of tolerated deviation (0 means equals) */ function closeEnough(actualValue, expectedValue, tolerancePercent) { var toReturn; var toleranceFraction = expectedValue * (tolerancePercent/100); var minExpected = expectedValue - toleranceFraction; var maxExpected = expectedValue + toleranceFraction; if(actualValue >= minExpected && actualValue <= maxExpected) { toReturn = true; } else { toReturn = false; } return toReturn; } /** * Runs a delay test * @param aTimeout - the timeout quantity * @param aTimeoutUnit - the unit of the timeout: milliseconds, seconds, minutes, hours, days */ function genericDelayTest(aTimeout, aTimeoutUnit, done) { var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":aTimeout,"timeoutUnits":aTimeoutUnit,"rate":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, {id:"helperNode1", type:"helper", wires:[]}]; helper.load(delayNode, flow, function() { var delayNode1 = helper.getNode("delayNode1"); var helperNode1 = helper.getNode("helperNode1"); helperNode1.on("input", function(msg) { try { var endTime = process.hrtime(startTime); var runtimeNanos = ( (endTime[0] * nanosToSeconds) + endTime[1] ); var runtimeSeconds = runtimeNanos / nanosToSeconds; var aTimeoutUnifiedToSeconds; // calculating the timeout in seconds if(aTimeoutUnit == TimeUnitEnum.MILLIS) { aTimeoutUnifiedToSeconds = aTimeout / millisToSeconds; } else if(aTimeoutUnit == TimeUnitEnum.SECONDS) { aTimeoutUnifiedToSeconds = aTimeout; } else if(aTimeoutUnit == TimeUnitEnum.MINUTES) { aTimeoutUnifiedToSeconds = aTimeout * secondsToMinutes; } else if(aTimeoutUnit == TimeUnitEnum.HOURS) { aTimeoutUnifiedToSeconds = aTimeout * secondsToHours; } else if(aTimeoutUnit == TimeUnitEnum.DAYS) { aTimeoutUnifiedToSeconds = aTimeout * secondsToDays; } if(closeEnough(runtimeSeconds, aTimeoutUnifiedToSeconds, GRACE_PERCENTAGE)) { done(); } else { try { should.fail(null, null, "Delayed runtime seconds " + runtimeSeconds + " was not close enough to exlected timeout seconds: " + aTimeoutUnifiedToSeconds); } catch (err) { done(err); } } } catch(err) { done(err); } }); var startTime = process.hrtime(); delayNode1.receive({payload:"delayMe"}); }); } /** * We send a message, take a timestamp then when the message is received by the helper node, we take another timestamp. * Then check if the message has been delayed by the expected amount. */ it('delays the message in seconds', function(done) { genericDelayTest(0.5, "seconds", done); }); it('delays the message in milliseconds', function(done) { genericDelayTest(500, "milliseconds", done); }); it('delays the message in minutes', function(done) { // this is also 0.5 seconds genericDelayTest(0.00833, "minutes", done); }); it('delays the message in hours', function(done) { // this is also 0.5 seconds genericDelayTest(0.0001388, "hours", done); }); it('delays the message in days', function(done) { // this is also 0.5 seconds genericDelayTest(0.000005787, "days", done); }); /** * Runs a rate limit test - only testing seconds! * @param aLimit - the message limit count * @param runtimeInMillis - when to terminate run and count messages received */ function genericRateLimitSECONDSTest(aLimit, runtimeInMillis, done) { var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, {id:"helperNode1", type:"helper", wires:[]}]; helper.load(delayNode, flow, function() { var delayNode1 = helper.getNode("delayNode1"); var helperNode1 = helper.getNode("helperNode1"); var receivedMessagesStack = []; var rate = 1000/aLimit; var receiveTimestamp; helperNode1.on("input", function(msg) { if(receiveTimestamp) { var elapse = process.hrtime(receiveTimestamp); var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000); receiveInterval.should.be.above(rate * 0.9); } receiveTimestamp = process.hrtime(); receivedMessagesStack.push(msg); }); var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst var i = 0; for(; i < possibleMaxMessageCount + 1; i++) { delayNode1.receive({payload:i}); } setTimeout(function() { try { receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount); for(var j = 0; j < receivedMessagesStack.length; j++) { if(receivedMessagesStack[j].payload === j) { if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far done(); } } else { should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i); } } } catch (err) { done(err); } }, runtimeInMillis); }); } it('limits the message rate to 1 per second', function(done) { genericRateLimitSECONDSTest(1, 1500, done); }); it('limits the message rate to 2 per second, 2 seconds', function(done) { this.timeout(6000); genericRateLimitSECONDSTest(2, 2100, done); }); /** * Runs a rate limit test with drop support - only testing seconds! * @param aLimit - the message limit count * @param runtimeInMillis - when to terminate run and count messages received */ function dropRateLimitSECONDSTest(aLimit, runtimeInMillis, done) { var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,"wires":[["helperNode1"]]}, {id:"helperNode1", type:"helper", wires:[]}]; helper.load(delayNode, flow, function() { var delayNode1 = helper.getNode("delayNode1"); var helperNode1 = helper.getNode("helperNode1"); var receivedMessagesStack = []; var rate = 1000/aLimit; var receiveTimestamp; helperNode1.on("input", function(msg) { if(receiveTimestamp) { var elapse = process.hrtime(receiveTimestamp); var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000); receiveInterval.should.be.above(rate * 0.9); } receiveTimestamp = process.hrtime(); receivedMessagesStack.push(msg); }); var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst var i = 0; delayNode1.receive({payload:i}); i++; for(; i < possibleMaxMessageCount + 1; i++) { setTimeout(function() { delayNode1.receive({payload:i}); }, 2 * ((rate * i) / possibleMaxMessageCount) ); } //we need to send a message delayed so that it doesn't get dropped setTimeout(function() { delayNode1.receive({payload:++i}); }, runtimeInMillis - 300); // should give enough time to squeeze another message in setTimeout(function() { try { receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1); receivedMessagesStack.length.should.be.greaterThan(2); // ensure that we receive more than 1st and last message receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination var foundAtLeastOneDrop = false; for(var i = 0; i < receivedMessagesStack.length; i++) { if(i > 0) { if(receivedMessagesStack[i].payload - receivedMessagesStack[i - 1].payload > 1) { foundAtLeastOneDrop = true; } } } foundAtLeastOneDrop.should.be.true; done(); } catch (err) { done(err); } }, runtimeInMillis); }); } it('limits the message rate to 1 per second, 4 seconds, with drop', function(done) { this.timeout(6000); dropRateLimitSECONDSTest(1, 4000, done); }); it('limits the message rate to 2 per second, 5 seconds, with drop', function(done) { this.timeout(6000); dropRateLimitSECONDSTest(2, 5000, done); }); /** * Returns true if the actualTimeout is gracefully in between the timeoutFrom and timeoutTo * values. Gracefully means that inBetween could actually mean smaller/greater values * than the timeout range so long as it's within an actual grace percentage. * @param timeoutFrom - The expected timeout range (low number) * @param timeoutTo - The expected timeout range (high number) * @param actualTimeout - The actual measured timeout value of test * @param allowedGracePercent - The percentage of grace allowed */ function inBetweenDelays(timeoutFrom, timeoutTo, actualTimeout, allowedGracePercent) { if(closeEnough(actualTimeout, timeoutFrom, allowedGracePercent)) { return true; } else if(closeEnough(actualTimeout, timeoutTo, allowedGracePercent)) { return true; } else if(timeoutFrom < actualTimeout && timeoutTo > actualTimeout) { return true; } else { return false; } } /** * Runs a RANDOM DELAY test, checks if the delay is in between the given timeout values * @param aTimeoutFrom - the timeout quantity which is the minimal acceptable wait period * @param aTimeoutTo - the timeout quantity which is the maximum acceptable wait period * @param aTimeoutUnit - the unit of the timeout: milliseconds, seconds, minutes, hours, days */ function randomDelayTest(aTimeoutFrom, aTimeoutTo, aTimeoutUnit, done) { var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"random","timeout":5,"timeoutUnits":"seconds","rate":"1","rateUnits":"second","randomFirst":aTimeoutFrom,"randomLast":aTimeoutTo,"randomUnits":aTimeoutUnit,"drop":false,"wires":[["helperNode1"]]}, {id:"helperNode1", type:"helper", wires:[]}]; helper.load(delayNode, flow, function() { var delayNode1 = helper.getNode("delayNode1"); var helperNode1 = helper.getNode("helperNode1"); helperNode1.on("input", function(msg) { try { var endTime = process.hrtime(startTime); var runtimeNanos = ( (endTime[0] * nanosToSeconds) + endTime[1] ); var runtimeSeconds = runtimeNanos / nanosToSeconds; var aTimeoutFromUnifiedToSeconds; var aTimeoutToUnifiedToSeconds; // calculating the timeout in seconds if(aTimeoutUnit == TimeUnitEnum.MILLIS) { aTimeoutFromUnifiedToSeconds = aTimeoutFrom / millisToSeconds; aTimeoutToUnifiedToSeconds = aTimeoutTo / millisToSeconds; } else if(aTimeoutUnit == TimeUnitEnum.SECONDS) { aTimeoutFromUnifiedToSeconds = aTimeoutFrom; aTimeoutToUnifiedToSeconds = aTimeoutTo; } else if(aTimeoutUnit == TimeUnitEnum.MINUTES) { aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToMinutes; aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToMinutes; } else if(aTimeoutUnit == TimeUnitEnum.HOURS) { aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToHours; aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToHours; } else if(aTimeoutUnit == TimeUnitEnum.DAYS) { aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToDays; aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToDays; } if(inBetweenDelays(aTimeoutFromUnifiedToSeconds, aTimeoutToUnifiedToSeconds, runtimeSeconds, GRACE_PERCENTAGE)) { done(); } else { try { should.fail(null, null, "Delayed runtime seconds " + runtimeSeconds + " was not \"in between enough\" enough to expected values of: " + aTimeoutFromUnifiedToSeconds + " and " + aTimeoutToUnifiedToSeconds); } catch (err) { done(err); } } } catch(err) { done(err); } }); var startTime = process.hrtime(); delayNode1.receive({payload:"delayMe"}); }); } it('randomly delays the message in seconds', function(done) { randomDelayTest(0.4, 0.8, "seconds", done); }); it(' randomly delays the message in milliseconds', function(done) { randomDelayTest("400", "800", "milliseconds", done); }); it('randomly delays the message in minutes', function(done) { randomDelayTest(0.0066, 0.0133, "minutes", done); }); it('delays the message in hours', function(done) { randomDelayTest(0.000111111, 0.000222222, "hours", done); }); it('delays the message in days', function(done) { randomDelayTest(0.0000046296, 0.0000092593, "days", done); }); it('handles bursts using a buffer', function(done) { this.timeout(6000); var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":1000,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, {id:"helperNode1", type:"helper", wires:[]}]; helper.load(delayNode, flow, function() { var delayNode1 = helper.getNode("delayNode1"); var helperNode1 = helper.getNode("helperNode1"); var sinon = require('sinon'); var receivedWarning = false; var messageBurstSize = 1500; // we ensure that we note that a warning is received for buffer growth sinon.stub(delayNode1, 'warn', function(warning){ if(warning.indexOf("buffer exceeded 1000 messages" > -1)) { receivedWarning = true; } }); // we ensure that the warning is received for buffer size and that we get the last message helperNode1.on("input", function(msg) { if(msg.payload === (messageBurstSize - 1) && receivedWarning === true) { done(); // it will timeout if we don't receive the last message } }); // send 1500 messages as quickly as possible for(var i = 0; i < messageBurstSize; i++) { delayNode1.receive({payload:i}); } }); }); });