From 4706b814ddb1841e4605b1cf707d132259bf1d8e Mon Sep 17 00:00:00 2001 From: zobalogh Date: Mon, 8 Sep 2014 14:26:35 +0100 Subject: [PATCH] Adding drop check to Delay tests --- test/nodes/core/core/89-delay_spec.js | 134 ++++++++++++++++++-------- 1 file changed, 94 insertions(+), 40 deletions(-) diff --git a/test/nodes/core/core/89-delay_spec.js b/test/nodes/core/core/89-delay_spec.js index aa2bd1164..22c3173a5 100644 --- a/test/nodes/core/core/89-delay_spec.js +++ b/test/nodes/core/core/89-delay_spec.js @@ -153,83 +153,137 @@ describe('delayNode', function() { /** * Runs a rate limit test - only testing seconds! * @param aLimit - the message limit count - * @param aDrop - whether to drop the messages that are over the limit * @param runtimeInMillis - when to terminate run and count messages received */ - function genericRateLimitSECONDSTest(aLimit, aDrop, runtimeInMillis, done) { - var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":aDrop,"wires":[["helperNode1"]]}, + function genericRateLimitSECONDSTest(aLimit, runtimeInMillis, done) { + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, {id:"helperNode1", type:"helper", wires:[]}]; helper.load(delayNode, flow, function() { var delayNode1 = helper.getNode("delayNode1"); var helperNode1 = helper.getNode("helperNode1"); var receivedMessagesStack = []; + var rate = 1000/aLimit; + + var receiveTimestamp; + helperNode1.on("input", function(msg) { + if(receiveTimestamp) { + var elapse = process.hrtime(receiveTimestamp); + var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000); + receiveInterval.should.be.above(rate * 0.9); + } + receiveTimestamp = process.hrtime(); receivedMessagesStack.push(msg); }); var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst - var startTime = process.hrtime(); - var i = 0; for(; i < possibleMaxMessageCount + 1; i++) { delayNode1.receive({payload:i}); } - // if drop is enabled then we need to send a few messages delayed so that they don't get dropped - if(aDrop === true) { - setTimeout(function() { - delayNode1.receive({payload:++i}); - }, runtimeInMillis - 300); // should give enough time to squeeze another message in - } - setTimeout(function() { - if(aDrop === true) { // messages should be dropped in the middle, receive 1st and last one - try { - receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1); - receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination - receivedMessagesStack.pop().payload.should.be.exactly(i); // means we received the last message injected just before test termination - done(); - } catch (err) { - done(err); - } - } else { - try { - receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount); - for(var j = 0; j < receivedMessagesStack.length; j++) { - if(receivedMessagesStack[j].payload === j) { - if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far - done(); - } - } else { - should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i); + try { + receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount); + for(var j = 0; j < receivedMessagesStack.length; j++) { + if(receivedMessagesStack[j].payload === j) { + if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far + done(); } - } - } catch (err) { - done(err); - } + } else { + should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i); + } + } + } catch (err) { + done(err); } }, runtimeInMillis); }); } it('limits the message rate to 1 per second', function(done) { - genericRateLimitSECONDSTest(1, false, 1500, done); + genericRateLimitSECONDSTest(1, 1500, done); }); it('limits the message rate to 2 per second, 2 seconds', function(done) { this.timeout(6000); - genericRateLimitSECONDSTest(2, false, 2100, done); + genericRateLimitSECONDSTest(2, 2100, done); }); - it('limits the message rate to 1 per second, 2 seconds, with drop', function(done) { + /** + * Runs a rate limit test with drop support - only testing seconds! + * @param aLimit - the message limit count + * @param runtimeInMillis - when to terminate run and count messages received + */ + function dropRateLimitSECONDSTest(aLimit, runtimeInMillis, done) { + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var receivedMessagesStack = []; + + var rate = 1000/aLimit; + + var receiveTimestamp; + + helperNode1.on("input", function(msg) { + if(receiveTimestamp) { + var elapse = process.hrtime(receiveTimestamp); + var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000); + receiveInterval.should.be.above(rate * 0.9); + } + receiveTimestamp = process.hrtime(); + receivedMessagesStack.push(msg); + }); + + var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst + + var i = 0; + delayNode1.receive({payload:i}); + i++; + for(; i < possibleMaxMessageCount + 1; i++) { + setTimeout(function() { + delayNode1.receive({payload:i}); + }, 2 * ((rate * i) / possibleMaxMessageCount) ); + } + + //we need to send a message delayed so that it doesn't get dropped + setTimeout(function() { + delayNode1.receive({payload:++i}); + }, runtimeInMillis - 300); // should give enough time to squeeze another message in + + setTimeout(function() { + try { + receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1); + receivedMessagesStack.length.should.be.greaterThan(2); // ensure that we receive more than 1st and last message + receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination + var foundAtLeastOneDrop = false; + for(var i = 0; i < receivedMessagesStack.length; i++) { + if(i > 0) { + if(receivedMessagesStack[i].payload - receivedMessagesStack[i - 1].payload > 1) { + foundAtLeastOneDrop = true; + } + } + } + foundAtLeastOneDrop.should.be.true; + done(); + } catch (err) { + done(err); + } + }, runtimeInMillis); + }); + } + + it('limits the message rate to 1 per second, 4 seconds, with drop', function(done) { this.timeout(6000); - genericRateLimitSECONDSTest(1, true, 2300, done); + dropRateLimitSECONDSTest(1, 4000, done); }); it('limits the message rate to 2 per second, 5 seconds, with drop', function(done) { this.timeout(6000); - genericRateLimitSECONDSTest(2, true, 5000, done); + dropRateLimitSECONDSTest(2, 5000, done); }); /**