Allow nodes types to initialise asynchronously at start

The code scans the flows that are to be started and checks all
of the necessary node types are registered; if not, it doesn't
start the flows.

An event is emitted when each node type is registered - allowing
the server to track when all of the 'missing' node types register
and then start the flows.

This is a step towards adding node-types without restarting.
This commit is contained in:
Nicholas O'Leary 2013-09-18 21:12:59 +01:00
parent 0878071a91
commit 3b22b585d0
2 changed files with 54 additions and 15 deletions

View File

@ -98,6 +98,7 @@ var node_type_registry = (function() {
if (! node_configs[configFilename]) { if (! node_configs[configFilename]) {
node_configs[configFilename] = fs.readFileSync(configFilename,'utf8'); node_configs[configFilename] = fs.readFileSync(configFilename,'utf8');
} }
events.emit("type-registered",type);
} else { } else {
util.log("["+type+"] missing template file: "+configFilename); util.log("["+type+"] missing template file: "+configFilename);
} }
@ -106,9 +107,6 @@ var node_type_registry = (function() {
get: function(type) { get: function(type) {
return node_types[type]; return node_types[type];
}, },
registerNodeConfig: function(type,config) {
node_configs[type] = config;
},
getNodeConfigs: function() { getNodeConfigs: function() {
var result = ""; var result = "";
for (var nt in node_configs) { for (var nt in node_configs) {
@ -256,33 +254,75 @@ module.exports.load = function() {
loadNodes("nodes"); loadNodes("nodes");
events.emit("nodes-loaded"); //events.emit("nodes-loaded");
} }
var activeConfig = null;
var missingTypes = [];
events.on('type-registered',function(type) {
if (missingTypes.length > 0) {
var i = missingTypes.indexOf(type);
if (i != -1) {
missingTypes.splice(i,1);
util.log("[red] Missing type registered: "+type);
}
if (missingTypes.length == 0) {
parseConfig();
}
}
});
module.exports.getNode = function(nid) { module.exports.getNode = function(nid) {
return registry.get(nid); return registry.get(nid);
} }
module.exports.parseConfig = function(conf) { module.exports.setConfig = function(conf) {
if (activeConfig&&activeConfig.length > 0) {
util.log("[red] Stopping flows");
}
registry.clear(); registry.clear();
activeConfig = conf;
parseConfig();
}
var parseConfig = function() {
missingTypes = [];
for (var i in activeConfig) {
var type = activeConfig[i].type;
var nt = node_type_registry.get(type);
if (!nt && missingTypes.indexOf(type) == -1) {
missingTypes.push(type);
}
};
if (missingTypes.length > 0) {
util.log("[red] Waiting for missing types to be registered:");
for (var i in missingTypes) {
util.log("[red] - "+missingTypes[i]);
}
return;
}
util.log("[red] Starting flows");
events.emit("nodes-starting"); events.emit("nodes-starting");
for (var i in conf) { for (var i in activeConfig) {
var nn = null; var nn = null;
var nt = node_type_registry.get(conf[i].type); var nt = node_type_registry.get(activeConfig[i].type);
if (nt) { if (nt) {
try { try {
nn = new nt(conf[i]); nn = new nt(activeConfig[i]);
} }
catch (err) { catch (err) {
util.log("[red] "+conf[i].type+" : "+err); util.log("[red] "+activeConfig[i].type+" : "+err);
} }
} }
// console.log(nn); // console.log(nn);
if (nn == null) { if (nn == null) {
util.log("[red] unknown type: "+conf[i].type); util.log("[red] unknown type: "+activeConfig[i].type);
} }
} }

View File

@ -60,7 +60,7 @@ function createServer(_server,settings) {
if(err) { if(err) {
util.log(err); util.log(err);
} else { } else {
redNodes.parseConfig(JSON.parse(fullBody)); redNodes.setConfig(JSON.parse(fullBody));
} }
}); });
}); });
@ -79,13 +79,12 @@ function createServer(_server,settings) {
util.log('or any other errors are resolved'); util.log('or any other errors are resolved');
util.log("------------------------------------------"); util.log("------------------------------------------");
util.log("[red] Loading workspace flow : "+rulesfile);
fs.exists(rulesfile, function (exists) { fs.exists(rulesfile, function (exists) {
if (exists) { if (exists) {
util.log("[red] Loading workspace flow : "+rulesfile);
fs.readFile(rulesfile,'utf8',function(err,data) { fs.readFile(rulesfile,'utf8',function(err,data) {
redNodes.parseConfig(JSON.parse(data)); redNodes.setConfig(JSON.parse(data));
}); });
} }
}); });