Merge pull request #385 from zobalogh/delay-test-fix

Adding drop check to Delay tests
This commit is contained in:
Nick O'Leary 2014-09-08 14:48:06 +01:00
commit 141c8a1faf
1 changed files with 94 additions and 40 deletions

View File

@ -153,83 +153,137 @@ describe('delayNode', function() {
/** /**
* Runs a rate limit test - only testing seconds! * Runs a rate limit test - only testing seconds!
* @param aLimit - the message limit count * @param aLimit - the message limit count
* @param aDrop - whether to drop the messages that are over the limit
* @param runtimeInMillis - when to terminate run and count messages received * @param runtimeInMillis - when to terminate run and count messages received
*/ */
function genericRateLimitSECONDSTest(aLimit, aDrop, runtimeInMillis, done) { function genericRateLimitSECONDSTest(aLimit, runtimeInMillis, done) {
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":aDrop,"wires":[["helperNode1"]]}, var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}]; {id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() { helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1"); var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1"); var helperNode1 = helper.getNode("helperNode1");
var receivedMessagesStack = []; var receivedMessagesStack = [];
var rate = 1000/aLimit;
var receiveTimestamp;
helperNode1.on("input", function(msg) { helperNode1.on("input", function(msg) {
if(receiveTimestamp) {
var elapse = process.hrtime(receiveTimestamp);
var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000);
receiveInterval.should.be.above(rate * 0.9);
}
receiveTimestamp = process.hrtime();
receivedMessagesStack.push(msg); receivedMessagesStack.push(msg);
}); });
var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst
var startTime = process.hrtime();
var i = 0; var i = 0;
for(; i < possibleMaxMessageCount + 1; i++) { for(; i < possibleMaxMessageCount + 1; i++) {
delayNode1.receive({payload:i}); delayNode1.receive({payload:i});
} }
// if drop is enabled then we need to send a few messages delayed so that they don't get dropped
if(aDrop === true) {
setTimeout(function() {
delayNode1.receive({payload:++i});
}, runtimeInMillis - 300); // should give enough time to squeeze another message in
}
setTimeout(function() { setTimeout(function() {
if(aDrop === true) { // messages should be dropped in the middle, receive 1st and last one try {
try { receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount);
receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1); for(var j = 0; j < receivedMessagesStack.length; j++) {
receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination if(receivedMessagesStack[j].payload === j) {
receivedMessagesStack.pop().payload.should.be.exactly(i); // means we received the last message injected just before test termination if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far
done(); done();
} catch (err) {
done(err);
}
} else {
try {
receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount);
for(var j = 0; j < receivedMessagesStack.length; j++) {
if(receivedMessagesStack[j].payload === j) {
if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far
done();
}
} else {
should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i);
} }
} } else {
} catch (err) { should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i);
done(err); }
} }
} catch (err) {
done(err);
} }
}, runtimeInMillis); }, runtimeInMillis);
}); });
} }
it('limits the message rate to 1 per second', function(done) { it('limits the message rate to 1 per second', function(done) {
genericRateLimitSECONDSTest(1, false, 1500, done); genericRateLimitSECONDSTest(1, 1500, done);
}); });
it('limits the message rate to 2 per second, 2 seconds', function(done) { it('limits the message rate to 2 per second, 2 seconds', function(done) {
this.timeout(6000); this.timeout(6000);
genericRateLimitSECONDSTest(2, false, 2100, done); genericRateLimitSECONDSTest(2, 2100, done);
}); });
it('limits the message rate to 1 per second, 2 seconds, with drop', function(done) { /**
* Runs a rate limit test with drop support - only testing seconds!
* @param aLimit - the message limit count
* @param runtimeInMillis - when to terminate run and count messages received
*/
function dropRateLimitSECONDSTest(aLimit, runtimeInMillis, done) {
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var receivedMessagesStack = [];
var rate = 1000/aLimit;
var receiveTimestamp;
helperNode1.on("input", function(msg) {
if(receiveTimestamp) {
var elapse = process.hrtime(receiveTimestamp);
var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000);
receiveInterval.should.be.above(rate * 0.9);
}
receiveTimestamp = process.hrtime();
receivedMessagesStack.push(msg);
});
var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst
var i = 0;
delayNode1.receive({payload:i});
i++;
for(; i < possibleMaxMessageCount + 1; i++) {
setTimeout(function() {
delayNode1.receive({payload:i});
}, 2 * ((rate * i) / possibleMaxMessageCount) );
}
//we need to send a message delayed so that it doesn't get dropped
setTimeout(function() {
delayNode1.receive({payload:++i});
}, runtimeInMillis - 300); // should give enough time to squeeze another message in
setTimeout(function() {
try {
receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1);
receivedMessagesStack.length.should.be.greaterThan(2); // ensure that we receive more than 1st and last message
receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination
var foundAtLeastOneDrop = false;
for(var i = 0; i < receivedMessagesStack.length; i++) {
if(i > 0) {
if(receivedMessagesStack[i].payload - receivedMessagesStack[i - 1].payload > 1) {
foundAtLeastOneDrop = true;
}
}
}
foundAtLeastOneDrop.should.be.true;
done();
} catch (err) {
done(err);
}
}, runtimeInMillis);
});
}
it('limits the message rate to 1 per second, 4 seconds, with drop', function(done) {
this.timeout(6000); this.timeout(6000);
genericRateLimitSECONDSTest(1, true, 2300, done); dropRateLimitSECONDSTest(1, 4000, done);
}); });
it('limits the message rate to 2 per second, 5 seconds, with drop', function(done) { it('limits the message rate to 2 per second, 5 seconds, with drop', function(done) {
this.timeout(6000); this.timeout(6000);
genericRateLimitSECONDSTest(2, true, 5000, done); dropRateLimitSECONDSTest(2, 5000, done);
}); });
/** /**