Catch node can target specific nodes

This commit is contained in:
Nick O'Leary
2015-08-13 13:58:19 +01:00
parent c64b5c2850
commit 3a6192bf73
5 changed files with 378 additions and 125 deletions

View File

@@ -57,7 +57,7 @@ function createSubflow(sf,sfn,subflows) {
var node;
var wires;
var i,j,k;
// Clone all of the subflow node definitions and give them new IDs
for (i=0;i<sf.nodes.length;i++) {
node = clone(sf.nodes[i].config);
@@ -72,7 +72,7 @@ function createSubflow(sf,sfn,subflows) {
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
var outputs = node.wires;
for (j=0;j<outputs.length;j++) {
wires = outputs[j];
for (k=0;k<wires.length;k++) {
@@ -80,7 +80,7 @@ function createSubflow(sf,sfn,subflows) {
}
}
}
// Create a subflow node to accept inbound messages and route appropriately
var Node = require("./Node");
var subflowInstance = {
@@ -95,20 +95,20 @@ function createSubflow(sf,sfn,subflows) {
subflowInstance._originalWires = clone(subflowInstance.wires);
}
var subflowNode = new Node(subflowInstance);
subflowNode.on("input", function(msg) { this.send(msg);});
subflowNode._updateWires = subflowNode.updateWires;
subflowNode.updateWires = function(newWires) {
// Wire the subflow outputs
if (sf.config.out) {
var node,wires,i,j;
// Restore the original wiring to the internal nodes
subflowInstance.wires = clone(subflowInstance._originalWires);
for (i=0;i<sf.config.out.length;i++) {
wires = sf.config.out[i].wires;
for (j=0;j<wires.length;j++) {
@@ -120,10 +120,10 @@ function createSubflow(sf,sfn,subflows) {
}
}
}
var modifiedNodes = {};
var subflowInstanceModified = false;
for (i=0;i<sf.config.out.length;i++) {
wires = sf.config.out[i].wires;
for (j=0;j<wires.length;j++) {
@@ -146,7 +146,7 @@ function createSubflow(sf,sfn,subflows) {
}
}
}
nodes.push(subflowNode);
// Wire the subflow outputs
@@ -170,12 +170,12 @@ function createSubflow(sf,sfn,subflows) {
}
}
}
// Instantiate the nodes
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
var type = node.type;
var m = /^subflow:(.+)$/.exec(type);
if (!m) {
nodes.push(createNode(type,node));
@@ -184,13 +184,13 @@ function createSubflow(sf,sfn,subflows) {
nodes = nodes.concat(createSubflow(subflows[subflowId],node,subflows));
}
}
subflowNode.instanceNodes = {};
nodes.forEach(function(node) {
subflowNode.instanceNodes[node.id] = node;
subflowNode.instanceNodes[node.id] = node;
});
return nodes;
}
@@ -221,7 +221,8 @@ function createCatchNodeMap(nodes) {
for (id in nodes) {
if (nodes.hasOwnProperty(id)) {
if (nodes[id].type === "catch") {
catchNodes[nodes[id].z] = nodes[id];
catchNodes[nodes[id].z] = catchNodes[nodes[id].z] || [];
catchNodes[nodes[id].z].push(nodes[id]);
}
}
}
@@ -245,7 +246,8 @@ function createCatchNodeMap(nodes) {
}
}
if (catchNodes[z]) {
catchNodes[id] = catchNodes[z];
catchNodes[id] = catchNodes[id]||[];
catchNodes[id].push(catchNodes[z]);
}
}
}
@@ -255,32 +257,32 @@ function createCatchNodeMap(nodes) {
var subflowInstanceRE = /^subflow:(.+)$/;
function Flow(config) {
this.activeNodes = {};
this.subflowInstanceNodes = {};
this.catchNodeMap = {};
this.started = false;
this.parseConfig(config);
}
Flow.prototype.parseConfig = function(config) {
var i;
var nodeConfig;
var nodeType;
this.config = config;
this.allNodes = {};
this.nodes = {};
this.subflows = {};
this.configNodes = {};
var unknownTypes = {};
for (i=0;i<this.config.length;i++) {
nodeConfig = this.config[i];
nodeType = nodeConfig.type;
@@ -292,19 +294,19 @@ Flow.prototype.parseConfig = function(config) {
nodes: []
}
}
}
//console.log("Known subflows:",Object.keys(this.subflows));
for (i=0;i<this.config.length;i++) {
nodeConfig = this.config[i];
nodeType = nodeConfig.type;
if (nodeConfig.credentials) {
delete nodeConfig.credentials;
}
if (nodeType != "tab" && nodeType != "subflow") {
var m = subflowInstanceRE.exec(nodeType);
if ((m && !this.subflows[m[1]]) || (!m && !typeRegistry.get(nodeType))) {
@@ -326,8 +328,8 @@ Flow.prototype.parseConfig = function(config) {
for (var prop in nodeConfig) {
if (nodeConfig.hasOwnProperty(prop) &&
prop != "type" &&
prop != "id" &&
prop != "z" &&
prop != "id" &&
prop != "z" &&
prop != "wires" &&
this.allNodes[nodeConfig[prop]]) {
this.configNodes[nodeConfig[prop]] = this.allNodes[nodeConfig[prop]];
@@ -336,7 +338,7 @@ Flow.prototype.parseConfig = function(config) {
}
}
}
//console.log("NODES");
//for (i in this.nodes) {
// if (this.nodes.hasOwnProperty(i)) {
@@ -352,8 +354,8 @@ Flow.prototype.parseConfig = function(config) {
// }
// }
//}
this.missingTypes = Object.keys(unknownTypes);
this.missingTypes = Object.keys(unknownTypes);
}
Flow.prototype.start = function(configDiff) {
@@ -365,16 +367,16 @@ Flow.prototype.start = function(configDiff) {
}
}
}
this.started = true;
if (this.missingTypes.length > 0) {
throw new Error(Log._("nodes.flow.missing-types"));
}
events.emit("nodes-starting");
var id;
var node;
for (id in this.configNodes) {
if (this.configNodes.hasOwnProperty(id)) {
node = this.configNodes[id];
@@ -383,7 +385,7 @@ Flow.prototype.start = function(configDiff) {
}
}
}
for (id in this.nodes) {
if (this.nodes.hasOwnProperty(id)) {
node = this.nodes[id];
@@ -408,16 +410,16 @@ Flow.prototype.start = function(configDiff) {
}
}
}
this.catchNodeMap = createCatchNodeMap(this.activeNodes);
credentials.clean(this.config);
events.emit("nodes-started");
}
Flow.prototype.stop = function(configDiff) {
var nodeList;
if (configDiff) {
nodeList = configDiff.stop;
} else {
@@ -450,7 +452,7 @@ Flow.prototype.stop = function(configDiff) {
});
}
Flow.prototype.getMissingTypes = function() {
Flow.prototype.getMissingTypes = function() {
return this.missingTypes;
}
@@ -466,7 +468,7 @@ Flow.prototype.typeRegistered = function(type) {
}
}
return false;
}
Flow.prototype.getNode = function(id) {
@@ -487,10 +489,10 @@ Flow.prototype.eachNode = function(callback) {
}
Flow.prototype.diffConfig = function(config,type) {
var activeNodesToStop = [];
var nodesToRewire = [];
if (type && type!="full") {
var diff = diffFlow(this,config);
//var diff = {
@@ -499,16 +501,16 @@ Flow.prototype.diffConfig = function(config,type) {
// linked:[]
// wiringChanged: []
//}
var nodesToStop = [];
nodesToRewire = diff.wiringChanged;
if (type == "nodes") {
nodesToStop = diff.deleted.concat(diff.changed);
} else if (type == "flows") {
nodesToStop = diff.deleted.concat(diff.changed).concat(diff.linked);
}
for (var i=0;i<nodesToStop.length;i++) {
var id = nodesToStop[i];
if (this.subflowInstanceNodes[id]) {
@@ -520,7 +522,7 @@ Flow.prototype.diffConfig = function(config,type) {
} else {
activeNodesToStop = Object.keys(this.activeNodes);
}
return {
type: type,
stop: activeNodesToStop,
@@ -530,7 +532,7 @@ Flow.prototype.diffConfig = function(config,type) {
}
function diffFlow(flow,config) {
//if (!flow.started) {
// throw new Error("Cannot diff an unstarted flow");
//}
@@ -540,13 +542,13 @@ function diffFlow(flow,config) {
var deletedSubflows = {};
var deletedTabs = {};
var linkChangedNodes = {};
var activeLinks = {};
var newLinks = {};
var changedSubflowStack = [];
var changedSubflows = {};
var buildNodeLinks = function(nodeLinks,n,nodes) {
nodeLinks[n.id] = nodeLinks[n.id] || [];
if (n.wires) {
@@ -563,11 +565,11 @@ function diffFlow(flow,config) {
}
}
}
config.forEach(function(node) {
flowNodes[node.id] = node;
});
config.forEach(function(node) {
var changed = false;
if (node.credentials) {
@@ -589,7 +591,7 @@ function diffFlow(flow,config) {
changedNodes[node.id] = node;
}
});
flow.config.forEach(function(node) {
if (!flowNodes[node.id]) {
if (node.type === "tab") {
@@ -602,7 +604,7 @@ function diffFlow(flow,config) {
}
buildNodeLinks(activeLinks,node,flow.allNodes);
});
flow.config.forEach(function(node) {
for (var prop in node) {
if (node.hasOwnProperty(prop) && prop != "z" && prop != "id" && prop != "wires") {
@@ -614,9 +616,9 @@ function diffFlow(flow,config) {
}
}
}
});
var checkSubflowMembership = function(nodes,id) {
var node = nodes[id];
if (node) {
@@ -631,13 +633,13 @@ function diffFlow(flow,config) {
}
}
};
Object.keys(changedNodes).forEach(function(n) { checkSubflowMembership(flowNodes,n)});
Object.keys(deletedNodes).forEach(function(n) { checkSubflowMembership(flow.allNodes,n)});
while (changedSubflowStack.length > 0) {
var subflowId = changedSubflowStack.pop();
config.forEach(function(node) {
if (node.type == "subflow:"+subflowId) {
if (!changedNodes[node.id]) {
@@ -646,17 +648,17 @@ function diffFlow(flow,config) {
}
}
});
}
config.forEach(function(node) {
buildNodeLinks(newLinks,node,flowNodes);
});
var markLinkedNodes = function(linkChanged,otherChangedNodes,linkMap,allNodes) {
var stack = Object.keys(changedNodes).concat(Object.keys(otherChangedNodes));
var visited = {};
while(stack.length > 0) {
var id = stack.pop();
var linkedNodes = linkMap[id];
@@ -675,7 +677,7 @@ function diffFlow(flow,config) {
}
markLinkedNodes(linkChangedNodes,{},newLinks,flowNodes);
markLinkedNodes(linkChangedNodes,{},activeLinks,flow.allNodes);
var modifiedLinkNodes = {};
config.forEach(function(node) {
@@ -683,13 +685,13 @@ function diffFlow(flow,config) {
// only concerned about unchanged nodes whose wiring may have changed
var newNodeLinks = newLinks[node.id];
var oldNodeLinks = activeLinks[node.id];
var newLinkMap = {};
newNodeLinks.forEach(function(l) { newLinkMap[l] = (newLinkMap[l]||0)+1;});
var oldLinkMap = {};
oldNodeLinks.forEach(function(l) { oldLinkMap[l] = (oldLinkMap[l]||0)+1;});
newNodeLinks.forEach(function(link) {
if (newLinkMap[link] != oldLinkMap[link]) {
modifiedLinkNodes[node.id] = node;
@@ -714,11 +716,11 @@ function diffFlow(flow,config) {
});
markLinkedNodes(linkChangedNodes,modifiedLinkNodes,newLinks,flowNodes);
// config.forEach(function(n) {
// console.log((changedNodes[n.id]!=null)?"[C]":"[ ]",(linkChangedNodes[n.id]!=null)?"[L]":"[ ]","[ ]",n.id,n.type,n.name);
// });
//
//
// Object.keys(deletedNodes).forEach(function(id) {
// var n = flow.allNodes[id];
// console.log("[ ] [ ] [D]",n.id,n.type);
@@ -729,7 +731,7 @@ function diffFlow(flow,config) {
linked: Object.keys(linkChangedNodes).filter(function(id) { return linkChangedNodes[id].type != "subflow" && (!linkChangedNodes[id].z || flowNodes[linkChangedNodes[id].z].type != "subflow")}),
wiringChanged: []
}
config.forEach(function(n) {
if (!flowNodes[n.z] || flowNodes[n.z].type != "subflow") {
var originalNode = flow.allNodes[n.id];
@@ -738,51 +740,57 @@ function diffFlow(flow,config) {
}
}
});
return diff;
}
Flow.prototype.handleError = function(node,logMessage,msg) {
var targetCatchNode = null;
if (this.catchNodeMap[node.z]) {
targetCatchNode = this.catchNodeMap[node.z];
} else if (this.activeNodes[node.z] && this.catchNodeMap[this.activeNodes[node.z].z]) {
targetCatchNode = this.catchNodeMap[this.activeNodes[node.z].z];
}
if (targetCatchNode) {
var count = 1;
if (msg && msg.hasOwnProperty("error")) {
if (msg.error.hasOwnProperty("source")) {
if (msg.error.source.id === node.id) {
count = msg.error.source.count+1;
if (count === 10) {
node.warn(Log._("nodes.flow.error-loop"));
return;
}
var count = 1;
if (msg && msg.hasOwnProperty("error")) {
if (msg.error.hasOwnProperty("source")) {
if (msg.error.source.id === node.id) {
count = msg.error.source.count+1;
if (count === 10) {
node.warn(Log._("nodes.flow.error-loop"));
return;
}
}
}
var errorMessage;
if (msg) {
errorMessage = redUtil.cloneMessage(msg);
} else {
errorMessage = {};
}
if (errorMessage.hasOwnProperty("error")) {
errorMessage._error = errorMessage.error;
}
errorMessage.error = {
message: logMessage.toString(),
source: {
id: node.id,
type: node.type,
count: count
}
var targetCatchNodes = null;
if (this.catchNodeMap[node.z]) {
targetCatchNodes = this.catchNodeMap[node.z];
} else if (this.activeNodes[node.z] && this.catchNodeMap[this.activeNodes[node.z].z]) {
targetCatchNodes = this.catchNodeMap[this.activeNodes[node.z].z];
}
if (targetCatchNodes) {
targetCatchNodes.forEach(function(targetCatchNode) {
console.log(targetCatchNode.scope);
if (targetCatchNode.scope && targetCatchNode.scope.indexOf(node.id) === -1) {
return;
}
};
targetCatchNode.receive(errorMessage);
var errorMessage;
if (msg) {
errorMessage = redUtil.cloneMessage(msg);
} else {
errorMessage = {};
}
if (errorMessage.hasOwnProperty("error")) {
errorMessage._error = errorMessage.error;
}
errorMessage.error = {
message: logMessage.toString(),
source: {
id: node.id,
type: node.type,
count: count
}
};
targetCatchNode.receive(errorMessage);
})
}
}