Adding drop check to Delay tests

This commit is contained in:
zobalogh 2014-09-08 14:26:35 +01:00
parent a03861cb6d
commit 4706b814dd
1 changed files with 94 additions and 40 deletions

View File

@ -153,83 +153,137 @@ describe('delayNode', function() {
/**
* Runs a rate limit test - only testing seconds!
* @param aLimit - the message limit count
* @param aDrop - whether to drop the messages that are over the limit
* @param runtimeInMillis - when to terminate run and count messages received
*/
function genericRateLimitSECONDSTest(aLimit, aDrop, runtimeInMillis, done) {
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":aDrop,"wires":[["helperNode1"]]},
function genericRateLimitSECONDSTest(aLimit, runtimeInMillis, done) {
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var receivedMessagesStack = [];
var rate = 1000/aLimit;
var receiveTimestamp;
helperNode1.on("input", function(msg) {
if(receiveTimestamp) {
var elapse = process.hrtime(receiveTimestamp);
var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000);
receiveInterval.should.be.above(rate * 0.9);
}
receiveTimestamp = process.hrtime();
receivedMessagesStack.push(msg);
});
var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst
var startTime = process.hrtime();
var i = 0;
for(; i < possibleMaxMessageCount + 1; i++) {
delayNode1.receive({payload:i});
}
// if drop is enabled then we need to send a few messages delayed so that they don't get dropped
if(aDrop === true) {
setTimeout(function() {
delayNode1.receive({payload:++i});
}, runtimeInMillis - 300); // should give enough time to squeeze another message in
}
setTimeout(function() {
if(aDrop === true) { // messages should be dropped in the middle, receive 1st and last one
try {
receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1);
receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination
receivedMessagesStack.pop().payload.should.be.exactly(i); // means we received the last message injected just before test termination
done();
} catch (err) {
done(err);
}
} else {
try {
receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount);
for(var j = 0; j < receivedMessagesStack.length; j++) {
if(receivedMessagesStack[j].payload === j) {
if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far
done();
}
} else {
should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i);
try {
receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount);
for(var j = 0; j < receivedMessagesStack.length; j++) {
if(receivedMessagesStack[j].payload === j) {
if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far
done();
}
}
} catch (err) {
done(err);
}
} else {
should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i);
}
}
} catch (err) {
done(err);
}
}, runtimeInMillis);
});
}
it('limits the message rate to 1 per second', function(done) {
genericRateLimitSECONDSTest(1, false, 1500, done);
genericRateLimitSECONDSTest(1, 1500, done);
});
it('limits the message rate to 2 per second, 2 seconds', function(done) {
this.timeout(6000);
genericRateLimitSECONDSTest(2, false, 2100, done);
genericRateLimitSECONDSTest(2, 2100, done);
});
it('limits the message rate to 1 per second, 2 seconds, with drop', function(done) {
/**
* Runs a rate limit test with drop support - only testing seconds!
* @param aLimit - the message limit count
* @param runtimeInMillis - when to terminate run and count messages received
*/
function dropRateLimitSECONDSTest(aLimit, runtimeInMillis, done) {
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var receivedMessagesStack = [];
var rate = 1000/aLimit;
var receiveTimestamp;
helperNode1.on("input", function(msg) {
if(receiveTimestamp) {
var elapse = process.hrtime(receiveTimestamp);
var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000);
receiveInterval.should.be.above(rate * 0.9);
}
receiveTimestamp = process.hrtime();
receivedMessagesStack.push(msg);
});
var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst
var i = 0;
delayNode1.receive({payload:i});
i++;
for(; i < possibleMaxMessageCount + 1; i++) {
setTimeout(function() {
delayNode1.receive({payload:i});
}, 2 * ((rate * i) / possibleMaxMessageCount) );
}
//we need to send a message delayed so that it doesn't get dropped
setTimeout(function() {
delayNode1.receive({payload:++i});
}, runtimeInMillis - 300); // should give enough time to squeeze another message in
setTimeout(function() {
try {
receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1);
receivedMessagesStack.length.should.be.greaterThan(2); // ensure that we receive more than 1st and last message
receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination
var foundAtLeastOneDrop = false;
for(var i = 0; i < receivedMessagesStack.length; i++) {
if(i > 0) {
if(receivedMessagesStack[i].payload - receivedMessagesStack[i - 1].payload > 1) {
foundAtLeastOneDrop = true;
}
}
}
foundAtLeastOneDrop.should.be.true;
done();
} catch (err) {
done(err);
}
}, runtimeInMillis);
});
}
it('limits the message rate to 1 per second, 4 seconds, with drop', function(done) {
this.timeout(6000);
genericRateLimitSECONDSTest(1, true, 2300, done);
dropRateLimitSECONDSTest(1, 4000, done);
});
it('limits the message rate to 2 per second, 5 seconds, with drop', function(done) {
this.timeout(6000);
genericRateLimitSECONDSTest(2, true, 5000, done);
dropRateLimitSECONDSTest(2, 5000, done);
});
/**