/** * Copyright 2013 IBM Corp. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. **/ var util = require("util"); var EventEmitter = require("events").EventEmitter; var fs = require("fs"); var path = require("path"); var clone = require("clone"); var events = require("./events"); var storage = null; var settings = null; var registry = (function() { var nodes = {}; var logHandlers = []; var obj = { add: function(n) { nodes[n.id] = n; n.on("log",function(msg) { for (var i in logHandlers) { logHandlers[i].emit("log",msg); } }); }, get: function(i) { return nodes[i]; }, clear: function() { events.emit("nodes-stopping"); for (var n in nodes) { nodes[n].close(); } events.emit("nodes-stopped"); nodes = {}; }, each: function(cb) { for (var n in nodes) { cb(nodes[n]); } }, addLogHandler: function(handler) { logHandlers.push(handler); } } return obj; })(); var ConsoleLogHandler = new EventEmitter(); ConsoleLogHandler.on("log",function(msg) { util.log("["+msg.level+"] ["+msg.type+":"+(msg.name||msg.id)+"] "+msg.msg); }); registry.addLogHandler(ConsoleLogHandler); var node_type_registry = (function() { var node_types = {}; var node_configs = []; var obj = { register: function(type,node) { util.inherits(node, Node); node_types[type] = node; events.emit("type-registered",type); }, registerConfig: function(config) { node_configs.push(config); }, get: function(type) { return node_types[type]; }, getNodeConfigs: function() { var result = ""; for (var i=0;i 0) { var i = missingTypes.indexOf(type); if (i != -1) { missingTypes.splice(i,1); util.log("[red] Missing type registered: "+type); } if (missingTypes.length == 0) { parseConfig(); } } }); function getNode(nid) { return registry.get(nid); } function stopFlows() { if (activeConfig&&activeConfig.length > 0) { util.log("[red] Stopping flows"); } registry.clear(); } function setConfig(conf) { stopFlows(); activeConfig = conf; if (!storage) { // Do this lazily to ensure the storage provider as been initialised storage = require("./storage"); } storage.getCredentials().then(function(creds) { credentials = creds; parseConfig(); }).otherwise(function(err) { util.log("[red] Error loading credentials : "+err); }); } function getConfig() { return activeConfig; } var parseConfig = function() { missingTypes = []; for (var i in activeConfig) { var type = activeConfig[i].type; // TODO: remove workspace in next release+1 if (type != "workspace" && type != "tab") { var nt = node_type_registry.get(type); if (!nt && missingTypes.indexOf(type) == -1) { missingTypes.push(type); } } }; if (missingTypes.length > 0) { util.log("[red] Waiting for missing types to be registered:"); for (var i in missingTypes) { util.log("[red] - "+missingTypes[i]); } return; } util.log("[red] Starting flows"); events.emit("nodes-starting"); for (var i in activeConfig) { var nn = null; // TODO: remove workspace in next release+1 if (activeConfig[i].type != "workspace" && activeConfig[i].type != "tab") { var nt = node_type_registry.get(activeConfig[i].type); if (nt) { try { nn = new nt(activeConfig[i]); } catch (err) { util.log("[red] "+activeConfig[i].type+" : "+err); } } // console.log(nn); if (nn == null) { util.log("[red] unknown type: "+activeConfig[i].type); } } } // Clean up any orphaned credentials var deletedCredentials = false; for (var c in credentials) { var n = registry.get(c); if (!n) { deletedCredentials = true; delete credentials[c]; } } if (deletedCredentials) { storage.saveCredentials(credentials); } events.emit("nodes-started"); } module.exports = { Node:Node, addCredentials: addCredentials, getCredentials: getCredentials, deleteCredentials: deleteCredentials, createNode: createNode, registerType: node_type_registry.register, getType: node_type_registry.get, getNodeConfigs: node_type_registry.getNodeConfigs, addLogHandler: registry.addLogHandler, load: load, getNode: getNode, stopFlows: stopFlows, setConfig: setConfig, getConfig: getConfig }