Add some comments to Flow and Subflow classes

This commit is contained in:
Nick O'Leary 2019-01-16 23:33:04 +00:00
parent 6286b34d00
commit dd72046922
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
2 changed files with 139 additions and 28 deletions

View File

@ -24,7 +24,18 @@ var events = require("../../events");
var nodeCloseTimeout = 15000;
/**
* This class represents a flow within the runtime. It is responsible for
* creating, starting and stopping all nodes within the flow.
*/
class Flow {
/**
* Create a Flow object.
* @param {[type]} parent The parent flow
* @param {[type]} globalFlow The global flow definition
* @param {[type]} flow This flow's definition
*/
constructor(parent,globalFlow,flow) {
this.TYPE = 'flow';
this.parent = parent;
@ -43,6 +54,11 @@ class Flow {
this.statusNodes = [];
}
/**
* Log a debug-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
debug(msg) {
Log.log({
id: this.id||"global",
@ -52,6 +68,11 @@ class Flow {
})
}
/**
* Log a info-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
log(msg) {
Log.log({
id: this.id||"global",
@ -61,6 +82,11 @@ class Flow {
})
}
/**
* Log a trace-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
trace(msg) {
Log.log({
id: this.id||"global",
@ -71,6 +97,13 @@ class Flow {
}
/**
* Start this flow.
* The `diff` argument helps define what needs to be started in the case
* of a modified-nodes/flows type deploy.
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
start(diff) {
this.trace("start "+this.TYPE);
var node;
@ -185,6 +218,14 @@ class Flow {
// this.dump();
}
/**
* Stop this flow.
* The `stopList` argument helps define what needs to be stopped in the case
* of a modified-nodes/flows type deploy.
* @param {[type]} stopList [description]
* @param {[type]} removedList [description]
* @return {[type]} [description]
*/
stop(stopList, removedList) {
return new Promise((resolve,reject) => {
this.trace("stop "+this.TYPE);
@ -216,7 +257,7 @@ class Flow {
if (this.subflowInstanceNodes[stopList[i]]) {
try {
var subflow = this.subflowInstanceNodes[stopList[i]];
promises.push(this.stopNode(node,false).then(() => { subflow.stop() }));
promises.push(stopNode(node,false).then(() => { subflow.stop() }));
} catch(err) {
node.error(err);
}
@ -224,7 +265,7 @@ class Flow {
} else {
try {
var removed = removedMap[stopList[i]];
promises.push(this.stopNode(node,removed));
promises.push(stopNode(node,removed));
} catch(err) {
node.error(err);
}
@ -237,31 +278,25 @@ class Flow {
});
}
stopNode(node,removed) {
return when.promise(function(resolve, reject) {
var start;
when.promise(function(resolve) {
Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":""));
start = Date.now();
resolve(node.close(removed));
}).timeout(nodeCloseTimeout).then(function(){
var delta = Date.now() - start;
Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" );
resolve(delta);
},function(err) {
var delta = Date.now() - start;
node.error(Log._("nodes.flows.stopping-error",{message:err}));
Log.debug(err.stack);
reject(err);
});
})
}
/**
* Update the flow definition. This doesn't change anything that is running.
* This should be called after `stop` and before `start`.
* @param {[type]} _global [description]
* @param {[type]} _flow [description]
* @return {[type]} [description]
*/
update(_global,_flow) {
this.global = _global;
this.flow = _flow;
}
/**
* Get a node instance from this flow. If the node is not known to this
* flow, pass the request up to the parent.
* @param {[type]} id [description]
* @return {[type]} [description]
*/
getNode(id) {
// console.log('getNode',id,!!this.activeNodes[id])
if (!id) {
@ -279,14 +314,32 @@ class Flow {
return this.parent.getNode(id);
}
/**
* Get all of the nodes instantiated within this flow
* @return {[type]} [description]
*/
getActiveNodes() {
return this.activeNodes;
}
/**
* Get a flow setting value. This currently automatically defers to the parent
* flow which, as defined in ./index.js returns `process.env[key]`.
* This lays the groundwork for Subflow to have instance-specific settings
* @param {[type]} key [description]
* @return {[type]} [description]
*/
getSetting(key) {
return this.parent.getSetting(key);
}
/**
* Handle a status event from a node within this flow. If there are no Status
* nodes within this flow, pass the request to the parent flow.
* @param {[type]} node [description]
* @param {[type]} statusMessage [description]
* @return {[type]} [description]
*/
handleStatus(node,statusMessage) {
events.emit("node-status",{
id: node.id,
@ -320,6 +373,14 @@ class Flow {
}
}
/**
* Handle an error event from a node within this flow. If there are no Catch
* nodes within this flow, pass the event to the parent flow.
* @param {[type]} node [description]
* @param {[type]} logMessage [description]
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
handleError(node,logMessage,msg) {
// console.log("HE",logMessage);
var count = 1;
@ -386,6 +447,34 @@ class Flow {
}
}
/**
* Stop an individual node within this flow.
*
* @param {[type]} node [description]
* @param {[type]} removed [description]
* @return {[type]} [description]
*/
function stopNode(node,removed) {
return when.promise(function(resolve, reject) {
var start;
when.promise(function(resolve) {
Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":""));
start = Date.now();
resolve(node.close(removed));
}).timeout(nodeCloseTimeout).then(function(){
var delta = Date.now() - start;
Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" );
resolve(delta);
},function(err) {
var delta = Date.now() - start;
node.error(Log._("nodes.flows.stopping-error",{message:err}));
Log.debug(err.stack);
reject(err);
});
})
}
module.exports = {
init: function(runtime) {
nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000;

View File

@ -22,8 +22,20 @@ const flowUtil = require("./util");
var Log;
/**
* This class represents a subflow - which is handled as a special type of Flow
*/
class Subflow extends Flow {
/**
* Create a Subflow object.
* This takes a subflow definition and instance node, creates a clone of the
* definition with unique ids applied and passes to the super class.
* @param {[type]} parent [description]
* @param {[type]} globalFlow [description]
* @param {[type]} subflowDef [description]
* @param {[type]} subflowInstance [description]
*/
constructor(parent,globalFlow,subflowDef,subflowInstance) {
// console.log(subflowDef);
// console.log("CREATE SUBFLOW",subflowDef.id,subflowInstance.id);
@ -80,6 +92,14 @@ class Subflow extends Flow {
this.node_map = node_map;
}
/**
* Start the subflow.
* This creates a subflow instance node to handle the inbound messages. It also
* rewires an subflow internal node that is connected to an output so it is connected
* to the parent flow nodes the subflow instance is wired to.
* @param {[type]} diff [description]
* @return {[type]} [description]
*/
start(diff) {
var self = this;
// Create a subflow node to accept inbound messages and route appropriately
@ -169,17 +189,19 @@ class Subflow extends Flow {
}
}
}
super.start(diff);
}
stop(stopList,removedList) {
return super.stop(stopList,removedList);
}
}
/**
* Clone a node definition for use within a subflow instance.
* Give the node a new id and set its _alias property to record
* its association with the original node definition.
* @param {[type]} subflowInstanceId [description]
* @param {[type]} def [description]
* @return {[type]} [description]
*/
function createNodeInSubflow(subflowInstanceId, def) {
let node = clone(def);
let nid = redUtil.generateId();