2015-10-11 21:37:11 +02:00
|
|
|
/**
|
|
|
|
* Copyright 2014, 2015 IBM Corp.
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
**/
|
|
|
|
|
|
|
|
var clone = require("clone");
|
|
|
|
var when = require("when");
|
|
|
|
|
|
|
|
var Flow = require('./Flow');
|
|
|
|
|
|
|
|
var typeRegistry = require("../registry");
|
|
|
|
var credentials = require("../credentials");
|
|
|
|
|
|
|
|
var flowUtil = require("./util");
|
|
|
|
var log = require("../../log");
|
|
|
|
var events = require("../../events");
|
|
|
|
var redUtil = require("../../util");
|
|
|
|
var deprecated = require("../registry/deprecated");
|
|
|
|
|
|
|
|
var storage = null;
|
|
|
|
var settings = null;
|
|
|
|
|
|
|
|
var activeConfig = null;
|
|
|
|
var activeFlowConfig = null;
|
|
|
|
|
|
|
|
var activeFlows = {};
|
|
|
|
var started = false;
|
|
|
|
|
|
|
|
var activeNodesToFlow = {};
|
2015-11-02 21:41:59 +01:00
|
|
|
var subflowInstanceNodeMap = {};
|
2015-10-11 21:37:11 +02:00
|
|
|
|
2015-11-02 16:38:16 +01:00
|
|
|
var typeEventRegistered = false;
|
|
|
|
|
2015-10-11 21:37:11 +02:00
|
|
|
function init(_settings, _storage) {
|
2015-11-02 16:38:16 +01:00
|
|
|
if (started) {
|
|
|
|
throw new Error("Cannot init without a stop");
|
|
|
|
}
|
2015-10-11 21:37:11 +02:00
|
|
|
settings = _settings;
|
|
|
|
storage = _storage;
|
|
|
|
started = false;
|
2015-11-02 16:38:16 +01:00
|
|
|
if (!typeEventRegistered) {
|
|
|
|
events.on('type-registered',function(type) {
|
|
|
|
if (activeFlowConfig && activeFlowConfig.missingTypes.length > 0) {
|
|
|
|
var i = activeFlowConfig.missingTypes.indexOf(type);
|
|
|
|
if (i != -1) {
|
|
|
|
log.info(log._("nodes.flows.registered-missing", {type:type}));
|
|
|
|
activeFlowConfig.missingTypes.splice(i,1);
|
|
|
|
if (activeFlowConfig.missingTypes.length === 0 && started) {
|
|
|
|
start();
|
|
|
|
}
|
2015-10-11 21:37:11 +02:00
|
|
|
}
|
|
|
|
}
|
2015-11-02 16:38:16 +01:00
|
|
|
});
|
|
|
|
typeEventRegistered = true;
|
|
|
|
}
|
2015-10-11 21:37:11 +02:00
|
|
|
}
|
|
|
|
function load() {
|
|
|
|
return storage.getFlows().then(function(flows) {
|
|
|
|
return credentials.load().then(function() {
|
|
|
|
return setConfig(flows,"load");
|
|
|
|
});
|
|
|
|
}).otherwise(function(err) {
|
|
|
|
log.warn(log._("nodes.flows.error",{message:err.toString()}));
|
|
|
|
console.log(err.stack);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2015-11-02 16:38:16 +01:00
|
|
|
function setConfig(_config,type) {
|
|
|
|
var config = clone(_config);
|
2015-10-11 21:37:11 +02:00
|
|
|
type = type||"full";
|
|
|
|
|
|
|
|
var credentialsChanged = false;
|
|
|
|
var credentialSavePromise = null;
|
|
|
|
var configSavePromise = null;
|
|
|
|
|
2015-11-02 16:38:16 +01:00
|
|
|
var diff;
|
|
|
|
var newFlowConfig = flowUtil.parseConfig(clone(config));
|
|
|
|
if (type !== 'full' && type !== 'load') {
|
|
|
|
diff = flowUtil.diffConfigs(activeFlowConfig,newFlowConfig);
|
|
|
|
}
|
|
|
|
config.forEach(function(node) {
|
2015-10-11 21:37:11 +02:00
|
|
|
if (node.credentials) {
|
|
|
|
credentials.extract(node);
|
|
|
|
credentialsChanged = true;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
if (credentialsChanged) {
|
|
|
|
credentialSavePromise = credentials.save();
|
|
|
|
} else {
|
|
|
|
credentialSavePromise = when.resolve();
|
|
|
|
}
|
|
|
|
if (type === 'load') {
|
|
|
|
configSavePromise = credentialSavePromise;
|
|
|
|
type = 'full';
|
|
|
|
} else {
|
|
|
|
configSavePromise = credentialSavePromise.then(function() {
|
2015-11-02 16:38:16 +01:00
|
|
|
return storage.saveFlows(config);
|
2015-10-11 21:37:11 +02:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
return configSavePromise
|
|
|
|
.then(function() {
|
2015-11-02 16:38:16 +01:00
|
|
|
activeConfig = config;
|
|
|
|
activeFlowConfig = newFlowConfig;
|
|
|
|
return credentials.clean(activeConfig).then(function() {
|
2015-10-11 21:37:11 +02:00
|
|
|
if (started) {
|
|
|
|
return stop(type,diff).then(function() {
|
|
|
|
start(type,diff);
|
|
|
|
}).otherwise(function(err) {
|
|
|
|
})
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
function getNode(id) {
|
|
|
|
var node;
|
|
|
|
if (activeNodesToFlow[id]) {
|
|
|
|
return activeFlows[activeNodesToFlow[id]].getNode(id);
|
|
|
|
}
|
|
|
|
for (var flowId in activeFlows) {
|
|
|
|
if (activeFlows.hasOwnProperty(flowId)) {
|
|
|
|
node = activeFlows[flowId].getNode(id);
|
|
|
|
if (node) {
|
|
|
|
return node;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
function eachNode(cb) {
|
2015-11-02 16:38:16 +01:00
|
|
|
for (var id in activeFlowConfig.allNodes) {
|
|
|
|
if (activeFlowConfig.allNodes.hasOwnProperty(id)) {
|
|
|
|
cb(activeFlowConfig.allNodes[id]);
|
2015-10-11 21:37:11 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function getConfig() {
|
|
|
|
return activeConfig;
|
|
|
|
}
|
|
|
|
|
2015-11-02 21:41:59 +01:00
|
|
|
function delegateError(node,logMessage,msg) {
|
2015-10-11 21:37:11 +02:00
|
|
|
if (activeFlows[node.z]) {
|
|
|
|
activeFlows[node.z].handleError(node,logMessage,msg);
|
|
|
|
} else if (activeNodesToFlow[node.z]) {
|
|
|
|
activeFlows[activeNodesToFlow[node.z]].handleError(node,logMessage,msg);
|
2015-11-02 21:41:59 +01:00
|
|
|
} else if (activeFlowConfig.subflows[node.z]) {
|
|
|
|
subflowInstanceNodeMap[node.id].forEach(function(n) {
|
|
|
|
delegateError(getNode(n),logMessage,msg);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
function handleError(node,logMessage,msg) {
|
|
|
|
if (node.z) {
|
|
|
|
delegateError(node,logMessage,msg);
|
|
|
|
} else {
|
|
|
|
if (activeFlowConfig.configs[node.id]) {
|
|
|
|
activeFlowConfig.configs[node.id]._users.forEach(function(id) {
|
|
|
|
var userNode = activeFlowConfig.allNodes[id];
|
|
|
|
delegateError(userNode,logMessage,msg);
|
|
|
|
})
|
|
|
|
}
|
2015-10-11 21:37:11 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-11-02 21:41:59 +01:00
|
|
|
function delegateStatus(node,statusMessage) {
|
2015-10-11 21:37:11 +02:00
|
|
|
if (activeFlows[node.z]) {
|
|
|
|
activeFlows[node.z].handleStatus(node,statusMessage);
|
|
|
|
}
|
|
|
|
}
|
2015-11-02 21:41:59 +01:00
|
|
|
function handleStatus(node,statusMessage) {
|
2015-11-12 10:03:03 +01:00
|
|
|
events.emit("node-status",{
|
|
|
|
id: node.id,
|
|
|
|
status:statusMessage
|
|
|
|
});
|
2015-11-02 21:41:59 +01:00
|
|
|
if (node.z) {
|
|
|
|
delegateStatus(node,statusMessage);
|
|
|
|
} else {
|
|
|
|
if (activeFlowConfig.configs[node.id]) {
|
|
|
|
activeFlowConfig.configs[node.id]._users.forEach(function(id) {
|
|
|
|
var userNode = activeFlowConfig.allNodes[id];
|
|
|
|
delegateStatus(userNode,statusMessage);
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-10-11 21:37:11 +02:00
|
|
|
|
|
|
|
|
|
|
|
function start(type,diff) {
|
|
|
|
type = type||"full";
|
|
|
|
started = true;
|
|
|
|
var i;
|
|
|
|
if (activeFlowConfig.missingTypes.length > 0) {
|
|
|
|
log.info(log._("nodes.flows.missing-types"));
|
|
|
|
var knownUnknowns = 0;
|
|
|
|
for (i=0;i<activeFlowConfig.missingTypes.length;i++) {
|
|
|
|
var nodeType = activeFlowConfig.missingTypes[i];
|
|
|
|
var info = deprecated.get(nodeType);
|
|
|
|
if (info) {
|
|
|
|
log.info(log._("nodes.flows.missing-type-provided",{type:activeFlowConfig.missingTypes[i],module:info.module}));
|
|
|
|
knownUnknowns += 1;
|
|
|
|
} else {
|
|
|
|
log.info(" - "+activeFlowConfig.missingTypes[i]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (knownUnknowns > 0) {
|
|
|
|
log.info(log._("nodes.flows.missing-type-install-1"));
|
|
|
|
log.info(" npm install <module name>");
|
|
|
|
log.info(log._("nodes.flows.missing-type-install-2"));
|
|
|
|
log.info(" "+settings.userDir);
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (diff) {
|
|
|
|
log.info(log._("nodes.flows.starting-modified-"+type));
|
|
|
|
} else {
|
|
|
|
log.info(log._("nodes.flows.starting-flows"));
|
|
|
|
}
|
|
|
|
var id;
|
|
|
|
if (!diff) {
|
|
|
|
activeFlows['_GLOBAL_'] = Flow.create(activeFlowConfig);
|
|
|
|
for (id in activeFlowConfig.flows) {
|
|
|
|
if (activeFlowConfig.flows.hasOwnProperty(id)) {
|
|
|
|
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
activeFlows['_GLOBAL_'].update(activeFlowConfig,activeFlowConfig);
|
|
|
|
for (id in activeFlowConfig.flows) {
|
|
|
|
if (activeFlowConfig.flows.hasOwnProperty(id)) {
|
|
|
|
if (activeFlows[id]) {
|
|
|
|
activeFlows[id].update(activeFlowConfig,activeFlowConfig.flows[id]);
|
|
|
|
} else {
|
|
|
|
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (id in activeFlows) {
|
|
|
|
if (activeFlows.hasOwnProperty(id)) {
|
|
|
|
activeFlows[id].start(diff);
|
|
|
|
var activeNodes = activeFlows[id].getActiveNodes();
|
|
|
|
Object.keys(activeNodes).forEach(function(nid) {
|
|
|
|
activeNodesToFlow[nid] = id;
|
2015-11-02 21:41:59 +01:00
|
|
|
if (activeNodes[nid]._alias) {
|
|
|
|
subflowInstanceNodeMap[activeNodes[nid]._alias] = subflowInstanceNodeMap[activeNodes[nid]._alias] || [];
|
|
|
|
subflowInstanceNodeMap[activeNodes[nid]._alias].push(nid);
|
|
|
|
}
|
2015-10-11 21:37:11 +02:00
|
|
|
});
|
2015-11-02 21:41:59 +01:00
|
|
|
|
2015-10-11 21:37:11 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
events.emit("nodes-started");
|
|
|
|
if (diff) {
|
|
|
|
log.info(log._("nodes.flows.started-modified-"+type));
|
|
|
|
} else {
|
|
|
|
log.info(log._("nodes.flows.started-flows"));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function stop(type,diff) {
|
|
|
|
type = type||"full";
|
|
|
|
if (diff) {
|
|
|
|
log.info(log._("nodes.flows.stopping-modified-"+type));
|
|
|
|
} else {
|
|
|
|
log.info(log._("nodes.flows.stopping-flows"));
|
|
|
|
}
|
|
|
|
started = false;
|
|
|
|
var promises = [];
|
|
|
|
var stopList;
|
|
|
|
if (type === 'nodes') {
|
|
|
|
stopList = diff.changed.concat(diff.removed);
|
|
|
|
} else if (type === 'flows') {
|
|
|
|
stopList = diff.changed.concat(diff.removed).concat(diff.linked);
|
|
|
|
}
|
|
|
|
for (var id in activeFlows) {
|
|
|
|
if (activeFlows.hasOwnProperty(id)) {
|
|
|
|
promises = promises.concat(activeFlows[id].stop(stopList));
|
2015-12-10 16:47:15 +01:00
|
|
|
if (!diff || diff.removed.indexOf(id)!==-1) {
|
2015-10-11 21:37:11 +02:00
|
|
|
delete activeFlows[id];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return when.promise(function(resolve,reject) {
|
|
|
|
when.settle(promises).then(function() {
|
|
|
|
for (id in activeNodesToFlow) {
|
|
|
|
if (activeNodesToFlow.hasOwnProperty(id)) {
|
|
|
|
if (!activeFlows[activeNodesToFlow[id]]) {
|
|
|
|
delete activeNodesToFlow[id];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-11-02 21:41:59 +01:00
|
|
|
if (stopList) {
|
|
|
|
stopList.forEach(function(id) {
|
|
|
|
delete activeNodesToFlow[id];
|
|
|
|
});
|
|
|
|
}
|
|
|
|
// Ideally we'd prune just what got stopped - but mapping stopList
|
|
|
|
// id to the list of subflow instance nodes is something only Flow
|
|
|
|
// can do... so cheat by wiping the map knowing it'll be rebuilt
|
|
|
|
// in start()
|
|
|
|
subflowInstanceNodeMap = {};
|
2015-10-11 21:37:11 +02:00
|
|
|
if (diff) {
|
|
|
|
log.info(log._("nodes.flows.stopped-modified-"+type));
|
|
|
|
} else {
|
|
|
|
log.info(log._("nodes.flows.stopped-flows"));
|
|
|
|
}
|
|
|
|
resolve();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2015-11-09 12:29:48 +01:00
|
|
|
function checkTypeInUse(id) {
|
|
|
|
var nodeInfo = typeRegistry.getNodeInfo(id);
|
|
|
|
if (!nodeInfo) {
|
|
|
|
throw new Error(log._("nodes.index.unrecognised-id", {id:id}));
|
|
|
|
} else {
|
|
|
|
var inUse = {};
|
|
|
|
var config = getConfig();
|
|
|
|
config.forEach(function(n) {
|
|
|
|
inUse[n.type] = (inUse[n.type]||0)+1;
|
|
|
|
});
|
|
|
|
var nodesInUse = [];
|
|
|
|
nodeInfo.types.forEach(function(t) {
|
|
|
|
if (inUse[t]) {
|
|
|
|
nodesInUse.push(t);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
if (nodesInUse.length > 0) {
|
|
|
|
var msg = nodesInUse.join(", ");
|
|
|
|
var err = new Error(log._("nodes.index.type-in-use", {msg:msg}));
|
|
|
|
err.code = "type_in_use";
|
|
|
|
throw err;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-10-11 21:37:11 +02:00
|
|
|
|
|
|
|
module.exports = {
|
|
|
|
init: init,
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Load the current flow configuration from storage
|
|
|
|
* @return a promise for the loading of the config
|
|
|
|
*/
|
|
|
|
load: load,
|
|
|
|
|
|
|
|
get:getNode,
|
|
|
|
eachNode: eachNode,
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Gets the current flow configuration
|
|
|
|
*/
|
|
|
|
getFlows: getConfig,
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Sets the current active config.
|
|
|
|
* @param config the configuration to enable
|
|
|
|
* @param type the type of deployment to do: full (default), nodes, flows, load
|
|
|
|
* @return a promise for the saving/starting of the new flow
|
|
|
|
*/
|
|
|
|
setFlows: setConfig,
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Starts the current flow configuration
|
|
|
|
*/
|
|
|
|
startFlows: start,
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Stops the current flow configuration
|
|
|
|
* @return a promise for the stopping of the flow
|
|
|
|
*/
|
|
|
|
stopFlows: stop,
|
|
|
|
|
|
|
|
|
|
|
|
handleError: handleError,
|
2015-11-09 12:29:48 +01:00
|
|
|
handleStatus: handleStatus,
|
|
|
|
|
|
|
|
checkTypeInUse: checkTypeInUse
|
|
|
|
|
2015-10-11 21:37:11 +02:00
|
|
|
};
|