Add status node

This commit is contained in:
Nick O'Leary
2015-08-19 21:14:45 +01:00
parent 658746d2a3
commit a6644ad5ff
6 changed files with 395 additions and 36 deletions

View File

@@ -68,7 +68,7 @@ function createSubflow(sf,sfn,subflows) {
node.z = sfn.id;
newNodes.push(node);
}
// Look for any catch nodes and update their scope ids
// Look for any catch/status nodes and update their scope ids
// Update all subflow interior wiring to reflect new node IDs
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
@@ -79,7 +79,7 @@ function createSubflow(sf,sfn,subflows) {
outputs[j][k] = node_map[outputs[j][k]].id
}
}
if (node.type === 'catch' && node.scope) {
if ((node.type === 'catch' || node.type === 'status') && node.scope) {
node.scope = node.scope.map(function(id) {
return node_map[id]?node_map[id].id:""
})
@@ -216,25 +216,6 @@ function diffNodeConfigs(oldNode,newNode) {
return false;
}
function createCatchNodeMap(nodes) {
var catchNodes = {};
var subflowInstances = {};
var id;
/*
- a catchNode with same z as error node
- if error occurs on a subflow without catchNode, look at z of subflow instance
*/
for (id in nodes) {
if (nodes.hasOwnProperty(id)) {
if (nodes[id].type === "catch") {
catchNodes[nodes[id].z] = catchNodes[nodes[id].z] || [];
catchNodes[nodes[id].z].push(nodes[id]);
}
}
}
return catchNodes;
}
var subflowInstanceRE = /^subflow:(.+)$/;
function Flow(config) {
@@ -242,6 +223,7 @@ function Flow(config) {
this.activeNodes = {};
this.subflowInstanceNodes = {};
this.catchNodeMap = {};
this.statusNodeMap = {};
this.started = false;
this.parseConfig(config);
@@ -306,7 +288,7 @@ Flow.prototype.parseConfig = function(config) {
} else {
this.nodes[nodeConfig.id] = nodeInfo;
}
if (nodeConfig.type != "catch") {
if (nodeConfig.type != "catch" && nodeConfig.type != "status") {
for (var prop in nodeConfig) {
if (nodeConfig.hasOwnProperty(prop) &&
prop != "type" &&
@@ -394,7 +376,22 @@ Flow.prototype.start = function(configDiff) {
}
}
this.catchNodeMap = createCatchNodeMap(this.activeNodes);
this.catchNodeMap = {};
this.statusNodeMap = {};
for (id in this.activeNodes) {
if (this.activeNodes.hasOwnProperty(id)) {
node = this.activeNodes[id];
if (node.type === "catch") {
this.catchNodeMap[node.z] = this.catchNodeMap[node.z] || [];
this.catchNodeMap[node.z].push(node);
} else if (node.type === "status") {
this.statusNodeMap[node.z] = this.statusNodeMap[node.z] || [];
this.statusNodeMap[node.z].push(node);
}
}
}
credentials.clean(this.config);
events.emit("nodes-started");
@@ -727,6 +724,38 @@ function diffFlow(flow,config) {
return diff;
}
Flow.prototype.handleStatus = function(node,statusMessage) {
var targetStatusNodes = null;
var reportingNode = node;
var handled = false;
while(reportingNode && !handled) {
targetStatusNodes = this.statusNodeMap[reportingNode.z];
if (targetStatusNodes) {
targetStatusNodes.forEach(function(targetStatusNode) {
if (targetStatusNode.scope && targetStatusNode.scope.indexOf(node.id) === -1) {
return;
}
var message = {
status: {
source: {
id: node.id,
type: node.type,
name: node.name
}
}
};
if (statusMessage.text) {
message.status.text = statusMessage.text;
}
targetStatusNode.receive(message);
handled = true;
});
}
if (!handled) {
reportingNode = this.activeNodes[reportingNode.z];
}
}
}
Flow.prototype.handleError = function(node,logMessage,msg) {
var count = 1;

View File

@@ -189,7 +189,7 @@ Node.prototype.receive = function(msg) {
msg._msgid = redUtil.generateId();
}
this.metric("receive",msg);
try {
try {
this.emit("input", msg);
} catch(err) {
this.error(err,msg);
@@ -247,5 +247,6 @@ Node.prototype.metric = function(eventname, msg, metricValue) {
*/
Node.prototype.status = function(status) {
comms.publish("status/" + this.id, status, true);
flows.handleStatus(this,status);
};
module.exports = Node;

View File

@@ -46,7 +46,7 @@ var flowNodes = module.exports = {
settings = _settings;
storage = _storage;
},
/**
* Load the current activeConfig from storage and start it running
* @return a promise for the loading of the config
@@ -62,7 +62,7 @@ var flowNodes = module.exports = {
console.log(err.stack);
});
},
/**
* Get a node
* @param i the node id
@@ -71,18 +71,18 @@ var flowNodes = module.exports = {
get: function(i) {
return activeFlow.getNode(i);
},
eachNode: function(cb) {
activeFlow.eachNode(cb);
},
/**
* @return the active configuration
*/
getFlows: function() {
return activeFlow.getFlow();
},
/**
* Sets the current active config.
* @param config the configuration to enable
@@ -90,14 +90,14 @@ var flowNodes = module.exports = {
* @return a promise for the starting of the new flow
*/
setFlows: function (config,type) {
type = type||"full";
var credentialsChanged = false;
var credentialSavePromise = null;
// Clone config and extract credentials prior to saving
// Original config needs to retain credentials so that flow.applyConfig
// knows which nodes have had changes.
@@ -108,7 +108,7 @@ var flowNodes = module.exports = {
credentialsChanged = true;
}
});
if (credentialsChanged) {
credentialSavePromise = credentials.save();
} else {
@@ -122,7 +122,7 @@ var flowNodes = module.exports = {
} else {
return credentialSavePromise
.then(function() { return storage.saveFlows(cleanConfig);})
.then(function() {
.then(function() {
var configDiff = activeFlow.diffConfig(config,type);
return flowNodes.stopFlows(configDiff).then(function() {
activeFlow.parseConfig(config);
@@ -190,6 +190,9 @@ var flowNodes = module.exports = {
},
handleError: function(node,logMessage,msg) {
activeFlow.handleError(node,logMessage,msg);
},
handleStatus: function(node,statusMessage) {
activeFlow.handleStatus(node,statusMessage);
}
};