Timeout a node that fails to close - default 15s timeout

This commit is contained in:
Nick O'Leary 2017-05-15 13:05:33 +01:00
parent a5ade39d7c
commit d4135e80a6
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
4 changed files with 68 additions and 7 deletions

View File

@ -99,6 +99,7 @@
"stopped-modified-flows": "Stopped modified flows",
"stopped-flows": "Stopped flows",
"stopped": "Stopped",
"stopping-error": "Error stopping node: __message__",
"added-flow": "Adding flow: __label__",
"updated-flow": "Updated flow: __label__",
"removed-flow": "Removed flow: __label__",

View File

@ -21,6 +21,8 @@ var Log = require("../../log");
var redUtil = require("../../util");
var flowUtil = require("./util");
var nodeCloseTimeout = 15000;
function Flow(global,flow) {
if (typeof flow === 'undefined') {
flow = global;
@ -151,16 +153,35 @@ function Flow(global,flow) {
delete subflowInstanceNodes[stopList[i]];
}
try {
var p = node.close(removedMap[stopList[i]]);
if (p) {
promises.push(p);
}
var removed = removedMap[stopList[i]];
promises.push(
when.promise(function(resolve, reject) {
var start;
var nt = node.type;
var nid = node.id;
var n = node;
when.promise(function(resolve) {
Log.trace("Stopping node "+nt+":"+nid+(removed?" removed":""));
start = Date.now();
resolve(n.close(removed));
}).timeout(nodeCloseTimeout).then(function(){
var delta = Date.now() - start;
Log.trace("Stopped node "+nt+":"+nid+" ("+delta+"ms)" );
resolve(delta);
},function(err) {
var delta = Date.now() - start;
n.error(Log._("nodes.flows.stopping-error",{message:err}));
Log.debug(err.stack);
reject(err);
});
})
);
} catch(err) {
node.error(err);
}
}
}
when.settle(promises).then(function() {
when.settle(promises).then(function(results) {
resolve();
});
});
@ -472,7 +493,11 @@ function createSubflow(sf,sfn,subflows,globalSubflows,activeNodes) {
return nodes;
}
module.exports = {
init: function(settings) {
nodeCloseTimeout = settings.nodeCloseTimeout || 15000;
},
create: function(global,conf) {
return new Flow(global,conf);
}

View File

@ -66,6 +66,7 @@ function init(runtime) {
});
typeEventRegistered = true;
}
Flow.init(settings);
}
function loadFlows() {

View File

@ -41,6 +41,7 @@ describe('Flow', function() {
stoppedNodes = {};
rewiredNodes = {};
createCount = 0;
Flow.init({});
});
var TestNode = function(n) {
@ -65,7 +66,7 @@ describe('Flow', function() {
this.updateWires = function(newWires) {
rewiredNodes[node.id] = node;
node.newWires = newWires;
node.__updateWires[newWires];
node.__updateWires(newWires);
};
}
util.inherits(TestNode,Node);
@ -78,6 +79,7 @@ describe('Flow', function() {
this.handled = 0;
this.messages = [];
this.stopped = false;
this.closeDelay = n.closeDelay || 50;
currentNodes[node.id] = node;
this.on('input',function(msg) {
node.handled++;
@ -90,7 +92,7 @@ describe('Flow', function() {
stoppedNodes[node.id] = node;
delete currentNodes[node.id];
done();
},50);
},node.closeDelay);
});
}
util.inherits(TestAsyncNode,Node);
@ -549,6 +551,38 @@ describe('Flow', function() {
});
});
it("Times out a node that fails to close", function(done) {
Flow.init({nodeCloseTimeout:50});
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"testAsync",closeDelay: 80, foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}
]);
var flow = Flow.create(config,config.flows["t1"]);
flow.start();
currentNodes.should.have.a.property("1");
currentNodes.should.have.a.property("2");
currentNodes.should.have.a.property("3");
flow.stop().then(function() {
currentNodes.should.have.a.property("1");
currentNodes.should.not.have.a.property("2");
currentNodes.should.not.have.a.property("3");
stoppedNodes.should.not.have.a.property("1");
stoppedNodes.should.have.a.property("2");
stoppedNodes.should.have.a.property("3");
setTimeout(function() {
currentNodes.should.not.have.a.property("1");
stoppedNodes.should.have.a.property("1");
done();
},40)
});
});
});