2014-05-03 22:26:35 +01:00
|
|
|
/**
|
2015-01-15 17:10:32 +00:00
|
|
|
* Copyright 2014, 2015 IBM Corp.
|
2014-05-03 22:26:35 +01:00
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
**/
|
|
|
|
|
2014-02-24 23:35:11 +00:00
|
|
|
var clone = require("clone");
|
2014-05-03 22:26:35 +01:00
|
|
|
var when = require("when");
|
2014-07-08 08:49:48 +01:00
|
|
|
|
2014-05-03 22:26:35 +01:00
|
|
|
var typeRegistry = require("./registry");
|
|
|
|
var credentials = require("./credentials");
|
2015-01-08 22:34:26 +00:00
|
|
|
var Flow = require("./Flow");
|
2014-05-03 22:26:35 +01:00
|
|
|
var log = require("../log");
|
|
|
|
var events = require("../events");
|
2014-11-23 22:25:09 +00:00
|
|
|
var redUtil = require("../util");
|
2014-05-03 22:26:35 +01:00
|
|
|
var storage = null;
|
|
|
|
|
2015-01-08 22:34:26 +00:00
|
|
|
|
|
|
|
var activeFlow = null;
|
|
|
|
|
2014-05-03 22:26:35 +01:00
|
|
|
var nodes = {};
|
2014-02-24 23:35:11 +00:00
|
|
|
var subflows = {};
|
2014-05-03 22:26:35 +01:00
|
|
|
var activeConfig = [];
|
2014-11-23 22:25:09 +00:00
|
|
|
var activeConfigNodes = {};
|
|
|
|
|
2014-05-03 22:26:35 +01:00
|
|
|
events.on('type-registered',function(type) {
|
2015-01-08 22:34:26 +00:00
|
|
|
if (activeFlow) {
|
2015-03-22 20:12:10 +00:00
|
|
|
activeFlow.typeRegistered(type);
|
|
|
|
log.info("Missing type registered: "+type);
|
2014-02-24 23:35:11 +00:00
|
|
|
}
|
2015-01-08 22:34:26 +00:00
|
|
|
});
|
2014-11-23 22:25:09 +00:00
|
|
|
|
2014-05-03 22:26:35 +01:00
|
|
|
var flowNodes = module.exports = {
|
|
|
|
init: function(_storage) {
|
|
|
|
storage = _storage;
|
|
|
|
},
|
2014-07-21 16:07:28 +01:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Load the current activeConfig from storage and start it running
|
|
|
|
* @return a promise for the loading of the config
|
|
|
|
*/
|
2014-05-03 22:26:35 +01:00
|
|
|
load: function() {
|
|
|
|
return storage.getFlows().then(function(flows) {
|
|
|
|
return credentials.load().then(function() {
|
2015-01-08 22:34:26 +00:00
|
|
|
activeFlow = new Flow(flows);
|
|
|
|
flowNodes.startFlows();
|
2014-05-03 22:26:35 +01:00
|
|
|
});
|
|
|
|
}).otherwise(function(err) {
|
2015-02-03 22:02:26 +00:00
|
|
|
log.warn("Error loading flows : "+err);
|
2015-01-08 22:34:26 +00:00
|
|
|
console.log(err.stack);
|
2014-05-03 22:26:35 +01:00
|
|
|
});
|
|
|
|
},
|
2014-07-21 16:07:28 +01:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Get a node
|
|
|
|
* @param i the node id
|
|
|
|
* @return the node
|
|
|
|
*/
|
2014-05-03 22:26:35 +01:00
|
|
|
get: function(i) {
|
2015-01-08 22:34:26 +00:00
|
|
|
return activeFlow.getNode(i);
|
2014-05-03 22:26:35 +01:00
|
|
|
},
|
2014-07-21 16:07:28 +01:00
|
|
|
|
2015-01-10 22:09:37 +00:00
|
|
|
eachNode: function(cb) {
|
|
|
|
activeFlow.eachNode(cb);
|
|
|
|
},
|
|
|
|
|
2014-07-21 16:07:28 +01:00
|
|
|
/**
|
|
|
|
* @return the active configuration
|
|
|
|
*/
|
2014-05-03 22:26:35 +01:00
|
|
|
getFlows: function() {
|
2015-01-08 22:34:26 +00:00
|
|
|
return activeFlow.getFlow();
|
2014-05-03 22:26:35 +01:00
|
|
|
},
|
2014-07-21 16:07:28 +01:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Sets the current active config.
|
|
|
|
* @param config the configuration to enable
|
2015-01-08 22:34:26 +00:00
|
|
|
* @param type the type of deployment to do: full (default), nodes, flows
|
2014-07-21 16:07:28 +01:00
|
|
|
* @return a promise for the starting of the new flow
|
|
|
|
*/
|
2015-01-08 22:34:26 +00:00
|
|
|
setFlows: function (config,type) {
|
|
|
|
|
|
|
|
type = type||"full";
|
2014-11-23 22:25:09 +00:00
|
|
|
|
2015-01-08 22:34:26 +00:00
|
|
|
var credentialsChanged = false;
|
2014-11-23 22:25:09 +00:00
|
|
|
|
2015-01-08 22:34:26 +00:00
|
|
|
var credentialSavePromise = null;
|
|
|
|
|
2015-02-24 22:02:39 +00:00
|
|
|
|
|
|
|
// Clone config and extract credentials prior to saving
|
|
|
|
// Original config needs to retain credentials so that flow.applyConfig
|
|
|
|
// knows which nodes have had changes.
|
|
|
|
var cleanConfig = clone(config);
|
|
|
|
cleanConfig.forEach(function(node) {
|
2014-07-21 15:56:38 +01:00
|
|
|
if (node.credentials) {
|
|
|
|
credentials.extract(node);
|
2015-01-08 22:34:26 +00:00
|
|
|
credentialsChanged = true;
|
2014-11-23 22:25:09 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2015-01-08 22:34:26 +00:00
|
|
|
if (credentialsChanged) {
|
2015-01-09 23:21:50 +00:00
|
|
|
credentialSavePromise = credentials.save();
|
2015-01-08 22:34:26 +00:00
|
|
|
} else {
|
|
|
|
credentialSavePromise = when.resolve();
|
2014-11-23 22:25:09 +00:00
|
|
|
}
|
2015-01-08 22:34:26 +00:00
|
|
|
if (type=="full") {
|
|
|
|
return credentialSavePromise
|
2015-02-24 22:02:39 +00:00
|
|
|
.then(function() { return storage.saveFlows(cleanConfig);})
|
|
|
|
.then(function() { return flowNodes.stopFlows(); })
|
|
|
|
.then(function() { activeFlow = new Flow(config); flowNodes.startFlows();});
|
2015-01-08 22:34:26 +00:00
|
|
|
} else {
|
|
|
|
return credentialSavePromise
|
2015-02-24 22:02:39 +00:00
|
|
|
.then(function() { return storage.saveFlows(cleanConfig);})
|
2015-03-13 13:15:20 +00:00
|
|
|
.then(function() {
|
2015-03-13 17:54:58 +00:00
|
|
|
var configDiff = activeFlow.diffConfig(config,type);
|
2015-03-13 13:15:20 +00:00
|
|
|
return flowNodes.stopFlows(configDiff).then(function() {
|
|
|
|
activeFlow.parseConfig(config);
|
|
|
|
flowNodes.startFlows(configDiff);
|
|
|
|
});
|
|
|
|
});
|
2015-01-08 22:34:26 +00:00
|
|
|
}
|
|
|
|
},
|
2015-03-13 13:15:20 +00:00
|
|
|
startFlows: function(configDiff) {
|
|
|
|
if (configDiff) {
|
|
|
|
log.info("Starting modified "+configDiff.type);
|
|
|
|
} else {
|
|
|
|
log.info("Starting flows");
|
|
|
|
}
|
2015-01-08 22:34:26 +00:00
|
|
|
try {
|
2015-03-13 13:15:20 +00:00
|
|
|
activeFlow.start(configDiff);
|
|
|
|
if (configDiff) {
|
|
|
|
log.info("Started modified "+configDiff.type);
|
|
|
|
} else {
|
|
|
|
log.info("Started flows");
|
|
|
|
}
|
2015-01-08 22:34:26 +00:00
|
|
|
} catch(err) {
|
|
|
|
var missingTypes = activeFlow.getMissingTypes();
|
|
|
|
if (missingTypes.length > 0) {
|
2015-02-03 22:02:26 +00:00
|
|
|
log.info("Waiting for missing types to be registered:");
|
2015-01-10 22:09:37 +00:00
|
|
|
for (var i=0;i<missingTypes.length;i++) {
|
2015-02-03 22:02:26 +00:00
|
|
|
log.info(" - "+missingTypes[i]);
|
2014-11-23 22:25:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-05-03 22:26:35 +01:00
|
|
|
},
|
2015-03-13 13:15:20 +00:00
|
|
|
stopFlows: function(configDiff) {
|
|
|
|
if (configDiff) {
|
|
|
|
log.info("Stopping modified "+configDiff.type);
|
|
|
|
} else {
|
|
|
|
log.info("Stopping flows");
|
|
|
|
}
|
2015-03-02 17:32:22 +00:00
|
|
|
if (activeFlow) {
|
2015-03-13 13:15:20 +00:00
|
|
|
return activeFlow.stop(configDiff).then(function() {
|
|
|
|
if (configDiff) {
|
|
|
|
log.info("Stopped modified "+configDiff.type);
|
|
|
|
} else {
|
|
|
|
log.info("Stopped flows");
|
|
|
|
}
|
2015-03-02 17:32:22 +00:00
|
|
|
return;
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
log.info("Stopped");
|
2015-02-24 22:02:39 +00:00
|
|
|
return;
|
2015-03-02 17:32:22 +00:00
|
|
|
}
|
2015-02-20 01:17:24 +00:00
|
|
|
},
|
|
|
|
handleError: function(node,logMessage,msg) {
|
|
|
|
activeFlow.handleError(node,logMessage,msg);
|
2015-01-08 22:34:26 +00:00
|
|
|
}
|
2014-07-14 21:46:36 +01:00
|
|
|
};
|
2015-01-08 22:34:26 +00:00
|
|
|
|
|
|
|
var activeFlow = null;
|