2015-01-08 23:34:26 +01:00
|
|
|
/**
|
2015-01-15 18:10:32 +01:00
|
|
|
* Copyright 2015 IBM Corp.
|
2015-01-08 23:34:26 +01:00
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
**/
|
|
|
|
|
|
|
|
var when = require("when");
|
|
|
|
var clone = require("clone");
|
|
|
|
|
|
|
|
var typeRegistry = require("./registry");
|
|
|
|
var credentials = require("./credentials");
|
|
|
|
var redUtil = require("../util");
|
|
|
|
var events = require("../events");
|
2015-03-20 23:09:58 +01:00
|
|
|
var Log = require("../log");
|
2015-01-08 23:34:26 +01:00
|
|
|
|
|
|
|
function getID() {
|
|
|
|
return (1+Math.random()*4294967295).toString(16);
|
|
|
|
}
|
|
|
|
|
|
|
|
function createNode(type,config) {
|
|
|
|
var nn = null;
|
|
|
|
var nt = typeRegistry.get(type);
|
|
|
|
if (nt) {
|
|
|
|
try {
|
|
|
|
nn = new nt(clone(config));
|
|
|
|
}
|
|
|
|
catch (err) {
|
2015-03-20 23:09:58 +01:00
|
|
|
Log.log({
|
|
|
|
level: Log.ERROR,
|
|
|
|
id:config.id,
|
|
|
|
type: type,
|
|
|
|
msg: err
|
|
|
|
});
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
} else {
|
2015-03-20 23:09:58 +01:00
|
|
|
Log.error("Unknown type: "+type);
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
return nn;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
function createSubflow(sf,sfn,subflows) {
|
|
|
|
//console.log("CREATE SUBFLOW",sf.config.id,sfn.id);
|
|
|
|
var nodes = [];
|
|
|
|
var node_map = {};
|
|
|
|
var newNodes = [];
|
|
|
|
var node;
|
|
|
|
var wires;
|
|
|
|
var i,j,k;
|
|
|
|
|
|
|
|
// Clone all of the subflow node definitions and give them new IDs
|
|
|
|
for (i=0;i<sf.nodes.length;i++) {
|
|
|
|
node = clone(sf.nodes[i].config);
|
|
|
|
var nid = getID();
|
|
|
|
node_map[node.id] = node;
|
|
|
|
node._alias = node.id;
|
|
|
|
node.id = nid;
|
2015-02-20 02:17:24 +01:00
|
|
|
node.z = sfn.id;
|
2015-01-08 23:34:26 +01:00
|
|
|
newNodes.push(node);
|
|
|
|
}
|
|
|
|
// Update all subflow interior wiring to reflect new node IDs
|
|
|
|
for (i=0;i<newNodes.length;i++) {
|
|
|
|
node = newNodes[i];
|
|
|
|
var outputs = node.wires;
|
|
|
|
|
|
|
|
for (j=0;j<outputs.length;j++) {
|
|
|
|
wires = outputs[j];
|
|
|
|
for (k=0;k<wires.length;k++) {
|
|
|
|
outputs[j][k] = node_map[outputs[j][k]].id
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a subflow node to accept inbound messages and route appropriately
|
|
|
|
var Node = require("./Node");
|
|
|
|
var subflowInstance = {
|
|
|
|
id: sfn.id,
|
|
|
|
type: sfn.type,
|
2015-02-20 02:17:24 +01:00
|
|
|
z: sfn.z,
|
2015-01-08 23:34:26 +01:00
|
|
|
name: sfn.name,
|
|
|
|
wires: []
|
|
|
|
}
|
|
|
|
if (sf.config.in) {
|
|
|
|
subflowInstance.wires = sf.config.in.map(function(n) { return n.wires.map(function(w) { return node_map[w.id].id;})})
|
|
|
|
subflowInstance._originalWires = clone(subflowInstance.wires);
|
|
|
|
}
|
|
|
|
var subflowNode = new Node(subflowInstance);
|
|
|
|
|
|
|
|
subflowNode.on("input", function(msg) { this.send(msg);});
|
|
|
|
|
|
|
|
|
|
|
|
subflowNode._updateWires = subflowNode.updateWires;
|
|
|
|
|
|
|
|
subflowNode.updateWires = function(newWires) {
|
|
|
|
// Wire the subflow outputs
|
|
|
|
if (sf.config.out) {
|
|
|
|
var node,wires,i,j;
|
|
|
|
// Restore the original wiring to the internal nodes
|
|
|
|
|
|
|
|
subflowInstance.wires = clone(subflowInstance._originalWires);
|
|
|
|
|
|
|
|
for (i=0;i<sf.config.out.length;i++) {
|
|
|
|
wires = sf.config.out[i].wires;
|
|
|
|
for (j=0;j<wires.length;j++) {
|
|
|
|
if (wires[j].id != sf.config.id) {
|
|
|
|
node = node_map[wires[j].id];
|
|
|
|
if (node._originalWires) {
|
|
|
|
node.wires = clone(node._originalWires);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var modifiedNodes = {};
|
|
|
|
var subflowInstanceModified = false;
|
|
|
|
|
|
|
|
for (i=0;i<sf.config.out.length;i++) {
|
|
|
|
wires = sf.config.out[i].wires;
|
|
|
|
for (j=0;j<wires.length;j++) {
|
|
|
|
if (wires[j].id === sf.config.id) {
|
|
|
|
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(newWires[i]);
|
|
|
|
subflowInstanceModified = true;
|
|
|
|
} else {
|
|
|
|
node = node_map[wires[j].id];
|
|
|
|
node.wires[wires[j].port] = node.wires[wires[j].port].concat(newWires[i]);
|
|
|
|
modifiedNodes[node.id] = node;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-01-10 00:30:53 +01:00
|
|
|
Object.keys(modifiedNodes).forEach(function(id) {
|
|
|
|
var node = modifiedNodes[id];
|
|
|
|
subflowNode.instanceNodes[id].updateWires(node.wires);
|
|
|
|
});
|
|
|
|
if (subflowInstanceModified) {
|
|
|
|
subflowNode._updateWires(subflowInstance.wires);
|
|
|
|
}
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
nodes.push(subflowNode);
|
|
|
|
|
|
|
|
// Wire the subflow outputs
|
|
|
|
if (sf.config.out) {
|
|
|
|
var modifiedNodes = {};
|
|
|
|
for (i=0;i<sf.config.out.length;i++) {
|
|
|
|
wires = sf.config.out[i].wires;
|
|
|
|
for (j=0;j<wires.length;j++) {
|
|
|
|
if (wires[j].id === sf.config.id) {
|
|
|
|
// A subflow input wired straight to a subflow output
|
|
|
|
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(sfn.wires[i])
|
|
|
|
subflowNode._updateWires(subflowInstance.wires);
|
|
|
|
} else {
|
|
|
|
node = node_map[wires[j].id];
|
|
|
|
modifiedNodes[node.id] = node;
|
|
|
|
if (!node._originalWires) {
|
|
|
|
node._originalWires = clone(node.wires);
|
|
|
|
}
|
2015-03-13 18:54:58 +01:00
|
|
|
node.wires[wires[j].port] = (node.wires[wires[j].port]||[]).concat(sfn.wires[i]);
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Instantiate the nodes
|
|
|
|
for (i=0;i<newNodes.length;i++) {
|
|
|
|
node = newNodes[i];
|
|
|
|
var type = node.type;
|
|
|
|
|
|
|
|
var m = /^subflow:(.+)$/.exec(type);
|
|
|
|
if (!m) {
|
|
|
|
nodes.push(createNode(type,node));
|
|
|
|
} else {
|
|
|
|
var subflowId = m[1];
|
|
|
|
nodes = nodes.concat(createSubflow(subflows[subflowId],node,subflows));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
subflowNode.instanceNodes = {};
|
|
|
|
|
|
|
|
nodes.forEach(function(node) {
|
|
|
|
subflowNode.instanceNodes[node.id] = node;
|
|
|
|
});
|
|
|
|
|
|
|
|
return nodes;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
function diffNodeConfigs(oldNode,newNode) {
|
|
|
|
if (oldNode == null) {
|
|
|
|
return true;
|
|
|
|
} else {
|
|
|
|
for (var p in newNode) {
|
|
|
|
if (newNode.hasOwnProperty(p) && p != "x" && p != "y" && p != "wires") {
|
|
|
|
if (!redUtil.compareObjects(oldNode[p],newNode[p])) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2015-02-20 02:17:24 +01:00
|
|
|
function createCatchNodeMap(nodes) {
|
|
|
|
var catchNodes = {};
|
|
|
|
var subflowInstances = {};
|
2015-02-20 20:54:19 +01:00
|
|
|
var id;
|
2015-02-20 02:17:24 +01:00
|
|
|
/*
|
|
|
|
- a catchNode with same z as error node
|
|
|
|
- if error occurs on a subflow without catchNode, look at z of subflow instance
|
|
|
|
*/
|
2015-02-20 20:54:19 +01:00
|
|
|
for (id in nodes) {
|
2015-02-20 02:17:24 +01:00
|
|
|
if (nodes.hasOwnProperty(id)) {
|
|
|
|
if (nodes[id].type === "catch") {
|
|
|
|
catchNodes[nodes[id].z] = nodes[id];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-02-20 20:54:19 +01:00
|
|
|
for (id in nodes) {
|
2015-02-20 02:17:24 +01:00
|
|
|
if (nodes.hasOwnProperty(id)) {
|
|
|
|
var m = /^subflow:(.+)$/.exec(nodes[id].type);
|
|
|
|
if (m) {
|
|
|
|
subflowInstances[id] = nodes[id];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-02-20 20:54:19 +01:00
|
|
|
for (id in subflowInstances) {
|
2015-02-20 02:17:24 +01:00
|
|
|
if (subflowInstances.hasOwnProperty(id)) {
|
|
|
|
var z = id;
|
|
|
|
while(subflowInstances[z]) {
|
|
|
|
var sfi = subflowInstances[z];
|
|
|
|
if (!catchNodes[z]) {
|
|
|
|
z = sfi.z;
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (catchNodes[z]) {
|
|
|
|
catchNodes[id] = catchNodes[z];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return catchNodes;
|
|
|
|
}
|
2015-01-08 23:34:26 +01:00
|
|
|
|
|
|
|
var subflowInstanceRE = /^subflow:(.+)$/;
|
|
|
|
|
|
|
|
function Flow(config) {
|
2015-01-10 23:09:37 +01:00
|
|
|
|
2015-01-08 23:34:26 +01:00
|
|
|
this.activeNodes = {};
|
|
|
|
this.subflowInstanceNodes = {};
|
2015-03-13 14:15:20 +01:00
|
|
|
this.catchNodeMap = {};
|
2015-01-15 18:10:32 +01:00
|
|
|
this.started = false;
|
2015-01-08 23:34:26 +01:00
|
|
|
|
|
|
|
this.parseConfig(config);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
Flow.prototype.parseConfig = function(config) {
|
|
|
|
var i;
|
2015-01-10 00:30:53 +01:00
|
|
|
var nodeConfig;
|
|
|
|
var nodeType;
|
2015-01-08 23:34:26 +01:00
|
|
|
|
2015-01-10 00:30:53 +01:00
|
|
|
this.config = config;
|
2015-01-08 23:34:26 +01:00
|
|
|
|
|
|
|
this.allNodes = {};
|
|
|
|
|
|
|
|
this.nodes = {};
|
|
|
|
this.subflows = {};
|
|
|
|
|
2015-02-25 00:02:45 +01:00
|
|
|
this.configNodes = {};
|
|
|
|
|
2015-01-08 23:34:26 +01:00
|
|
|
var unknownTypes = {};
|
|
|
|
|
|
|
|
for (i=0;i<this.config.length;i++) {
|
2015-01-10 00:30:53 +01:00
|
|
|
nodeConfig = this.config[i];
|
|
|
|
nodeType = nodeConfig.type;
|
2015-02-25 00:02:45 +01:00
|
|
|
this.allNodes[nodeConfig.id] = nodeConfig;
|
2015-01-08 23:34:26 +01:00
|
|
|
if (nodeType == "subflow") {
|
|
|
|
this.subflows[nodeConfig.id] = {
|
|
|
|
type: "subflow",
|
|
|
|
config: nodeConfig,
|
|
|
|
nodes: []
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
//console.log("Known subflows:",Object.keys(this.subflows));
|
|
|
|
for (i=0;i<this.config.length;i++) {
|
2015-01-10 00:30:53 +01:00
|
|
|
nodeConfig = this.config[i];
|
2015-01-08 23:34:26 +01:00
|
|
|
|
|
|
|
|
2015-01-10 00:30:53 +01:00
|
|
|
nodeType = nodeConfig.type;
|
2015-01-08 23:34:26 +01:00
|
|
|
|
|
|
|
if (nodeConfig.credentials) {
|
|
|
|
delete nodeConfig.credentials;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (nodeType != "tab" && nodeType != "subflow") {
|
|
|
|
var m = subflowInstanceRE.exec(nodeType);
|
|
|
|
if ((m && !this.subflows[m[1]]) || (!m && !typeRegistry.get(nodeType))) {
|
|
|
|
// This is an unknown subflow or an unknown type
|
|
|
|
unknownTypes[nodeType] = true;
|
|
|
|
} else {
|
|
|
|
var nodeInfo = {
|
|
|
|
type: nodeType,
|
|
|
|
config:nodeConfig
|
|
|
|
}
|
|
|
|
if (m) {
|
|
|
|
nodeInfo.subflow = m[1];
|
|
|
|
}
|
|
|
|
if (this.subflows[nodeConfig.z]) {
|
|
|
|
this.subflows[nodeConfig.z].nodes.push(nodeInfo);
|
|
|
|
} else {
|
|
|
|
this.nodes[nodeConfig.id] = nodeInfo;
|
|
|
|
}
|
2015-02-25 00:02:45 +01:00
|
|
|
for (var prop in nodeConfig) {
|
|
|
|
if (nodeConfig.hasOwnProperty(prop) &&
|
2015-03-04 23:38:53 +01:00
|
|
|
prop != "type" &&
|
2015-02-25 00:02:45 +01:00
|
|
|
prop != "id" &&
|
|
|
|
prop != "z" &&
|
|
|
|
prop != "wires" &&
|
|
|
|
this.allNodes[nodeConfig[prop]]) {
|
|
|
|
this.configNodes[nodeConfig[prop]] = this.allNodes[nodeConfig[prop]];
|
|
|
|
}
|
|
|
|
}
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
//console.log("NODES");
|
|
|
|
//for (i in this.nodes) {
|
|
|
|
// if (this.nodes.hasOwnProperty(i)) {
|
|
|
|
// console.log(" ",i,this.nodes[i].type,this.nodes[i].config.name||"");
|
|
|
|
// }
|
|
|
|
//}
|
|
|
|
//console.log("SUBFLOWS");
|
|
|
|
//for (i in this.subflows) {
|
|
|
|
// if (this.subflows.hasOwnProperty(i)) {
|
|
|
|
// console.log(" ",i,this.subflows[i].type,this.subflows[i].config.name||"");
|
|
|
|
// for (var j=0;j<this.subflows[i].nodes.length;j++) {
|
|
|
|
// console.log(" ",this.subflows[i].nodes[j].config.id,this.subflows[i].nodes[j].type,this.subflows[i].nodes[j].config.name||"");
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
//}
|
|
|
|
|
|
|
|
this.missingTypes = Object.keys(unknownTypes);
|
|
|
|
}
|
|
|
|
|
2015-03-13 14:15:20 +01:00
|
|
|
Flow.prototype.start = function(configDiff) {
|
|
|
|
if (configDiff) {
|
2015-03-13 18:54:58 +01:00
|
|
|
for (var j=0;j<configDiff.rewire.length;j++) {
|
|
|
|
var rewireNode = this.activeNodes[configDiff.rewire[j]];
|
|
|
|
if (rewireNode) {
|
|
|
|
rewireNode.updateWires(this.allNodes[rewireNode.id].wires);
|
2015-03-13 14:15:20 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-01-15 18:10:32 +01:00
|
|
|
this.started = true;
|
2015-01-08 23:34:26 +01:00
|
|
|
if (this.missingTypes.length > 0) {
|
|
|
|
throw new Error("missing types");
|
|
|
|
}
|
|
|
|
events.emit("nodes-starting");
|
2015-02-25 00:02:45 +01:00
|
|
|
|
|
|
|
var id;
|
|
|
|
var node;
|
|
|
|
|
|
|
|
for (id in this.configNodes) {
|
|
|
|
if (this.configNodes.hasOwnProperty(id)) {
|
|
|
|
node = this.configNodes[id];
|
|
|
|
if (!this.activeNodes[id]) {
|
|
|
|
this.activeNodes[id] = createNode(node.type,node);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (id in this.nodes) {
|
2015-01-08 23:34:26 +01:00
|
|
|
if (this.nodes.hasOwnProperty(id)) {
|
2015-02-25 00:02:45 +01:00
|
|
|
node = this.nodes[id];
|
2015-01-08 23:34:26 +01:00
|
|
|
if (!node.subflow) {
|
|
|
|
if (!this.activeNodes[id]) {
|
|
|
|
this.activeNodes[id] = createNode(node.type,node.config);
|
|
|
|
//console.log(id,"created");
|
|
|
|
} else {
|
|
|
|
//console.log(id,"already running");
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (!this.subflowInstanceNodes[id]) {
|
|
|
|
var nodes = createSubflow(this.subflows[node.subflow],node.config,this.subflows);
|
|
|
|
this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
|
|
|
|
for (var i=0;i<nodes.length;i++) {
|
|
|
|
this.activeNodes[nodes[i].id] = nodes[i];
|
|
|
|
}
|
|
|
|
//console.log(id,"(sf) created");
|
|
|
|
} else {
|
|
|
|
//console.log(id,"(sf) already running");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-02-20 02:17:24 +01:00
|
|
|
|
|
|
|
this.catchNodeMap = createCatchNodeMap(this.activeNodes);
|
|
|
|
|
2015-01-08 23:34:26 +01:00
|
|
|
credentials.clean(this.config);
|
|
|
|
events.emit("nodes-started");
|
|
|
|
}
|
|
|
|
|
2015-03-13 14:15:20 +01:00
|
|
|
Flow.prototype.stop = function(configDiff) {
|
|
|
|
var nodeList;
|
|
|
|
|
|
|
|
if (configDiff) {
|
|
|
|
nodeList = configDiff.stop;
|
|
|
|
} else {
|
|
|
|
nodeList = Object.keys(this.activeNodes);
|
|
|
|
}
|
2015-01-08 23:34:26 +01:00
|
|
|
var flow = this;
|
|
|
|
return when.promise(function(resolve) {
|
|
|
|
events.emit("nodes-stopping");
|
|
|
|
var promises = [];
|
|
|
|
for (var i=0;i<nodeList.length;i++) {
|
|
|
|
var node = flow.activeNodes[nodeList[i]];
|
|
|
|
if (node) {
|
|
|
|
try {
|
|
|
|
var p = node.close();
|
|
|
|
if (p) {
|
|
|
|
promises.push(p);
|
|
|
|
}
|
|
|
|
} catch(err) {
|
|
|
|
node.error(err);
|
|
|
|
}
|
|
|
|
delete flow.subflowInstanceNodes[nodeList[i]];
|
|
|
|
delete flow.activeNodes[nodeList[i]];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
when.settle(promises).then(function() {
|
|
|
|
events.emit("nodes-stopped");
|
2015-01-15 18:10:32 +01:00
|
|
|
flow.started = false;
|
2015-01-08 23:34:26 +01:00
|
|
|
resolve();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
Flow.prototype.getMissingTypes = function() {
|
|
|
|
return this.missingTypes;
|
|
|
|
}
|
|
|
|
|
|
|
|
Flow.prototype.typeRegistered = function(type) {
|
|
|
|
if (this.missingTypes.length > 0) {
|
|
|
|
var i = this.missingTypes.indexOf(type);
|
|
|
|
if (i != -1) {
|
|
|
|
this.missingTypes.splice(i,1);
|
2015-01-15 18:10:32 +01:00
|
|
|
if (this.missingTypes.length === 0 && this.started) {
|
2015-01-08 23:34:26 +01:00
|
|
|
this.start();
|
|
|
|
}
|
2015-03-26 12:37:24 +01:00
|
|
|
return true;
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
}
|
2015-03-26 12:37:24 +01:00
|
|
|
return false;
|
2015-01-08 23:34:26 +01:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
Flow.prototype.getNode = function(id) {
|
|
|
|
return this.activeNodes[id];
|
|
|
|
}
|
|
|
|
|
|
|
|
Flow.prototype.getFlow = function() {
|
|
|
|
//console.log(this.config);
|
|
|
|
return this.config;
|
|
|
|
}
|
|
|
|
|
2015-01-10 23:09:37 +01:00
|
|
|
Flow.prototype.eachNode = function(callback) {
|
|
|
|
for (var id in this.activeNodes) {
|
|
|
|
if (this.activeNodes.hasOwnProperty(id)) {
|
|
|
|
callback(this.activeNodes[id]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-03-13 18:54:58 +01:00
|
|
|
Flow.prototype.diffConfig = function(config,type) {
|
2015-01-08 23:34:26 +01:00
|
|
|
|
|
|
|
var activeNodesToStop = [];
|
2015-01-16 16:43:47 +01:00
|
|
|
var nodesToRewire = [];
|
|
|
|
|
|
|
|
if (type && type!="full") {
|
2015-03-13 18:54:58 +01:00
|
|
|
var diff = diffFlow(this,config);
|
2015-01-16 16:43:47 +01:00
|
|
|
//var diff = {
|
|
|
|
// deleted:[]
|
|
|
|
// changed:[]
|
|
|
|
// linked:[]
|
|
|
|
// wiringChanged: []
|
|
|
|
//}
|
|
|
|
|
|
|
|
var nodesToStop = [];
|
|
|
|
nodesToRewire = diff.wiringChanged;
|
|
|
|
|
|
|
|
if (type == "nodes") {
|
|
|
|
nodesToStop = diff.deleted.concat(diff.changed);
|
|
|
|
} else if (type == "flows") {
|
|
|
|
nodesToStop = diff.deleted.concat(diff.changed).concat(diff.linked);
|
|
|
|
}
|
|
|
|
|
|
|
|
for (var i=0;i<nodesToStop.length;i++) {
|
|
|
|
var id = nodesToStop[i];
|
|
|
|
if (this.subflowInstanceNodes[id]) {
|
|
|
|
activeNodesToStop = activeNodesToStop.concat(this.subflowInstanceNodes[id]);
|
|
|
|
} else if (this.activeNodes[id]) {
|
|
|
|
activeNodesToStop.push(id);
|
|
|
|
}
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
2015-01-16 16:43:47 +01:00
|
|
|
} else {
|
|
|
|
activeNodesToStop = Object.keys(this.activeNodes);
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
2015-03-13 14:15:20 +01:00
|
|
|
|
|
|
|
return {
|
|
|
|
type: type,
|
|
|
|
stop: activeNodesToStop,
|
|
|
|
rewire: nodesToRewire,
|
|
|
|
config: config
|
2015-02-24 23:02:39 +01:00
|
|
|
}
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
|
2015-03-13 18:54:58 +01:00
|
|
|
function diffFlow(flow,config) {
|
|
|
|
|
|
|
|
//if (!flow.started) {
|
|
|
|
// throw new Error("Cannot diff an unstarted flow");
|
|
|
|
//}
|
2015-02-25 00:02:45 +01:00
|
|
|
var flowNodes = {};
|
2015-01-08 23:34:26 +01:00
|
|
|
var changedNodes = {};
|
|
|
|
var deletedNodes = {};
|
2015-05-05 14:52:55 +02:00
|
|
|
var deletedSubflows = {};
|
2015-03-15 22:27:11 +01:00
|
|
|
var deletedTabs = {};
|
2015-01-08 23:34:26 +01:00
|
|
|
var linkChangedNodes = {};
|
|
|
|
|
|
|
|
var activeLinks = {};
|
|
|
|
var newLinks = {};
|
|
|
|
|
|
|
|
var changedSubflowStack = [];
|
|
|
|
var changedSubflows = {};
|
|
|
|
|
|
|
|
var buildNodeLinks = function(nodeLinks,n,nodes) {
|
|
|
|
nodeLinks[n.id] = nodeLinks[n.id] || [];
|
|
|
|
if (n.wires) {
|
|
|
|
for (var j=0;j<n.wires.length;j++) {
|
|
|
|
var wires = n.wires[j];
|
|
|
|
for (var k=0;k<wires.length;k++) {
|
|
|
|
nodeLinks[n.id].push(wires[k]);
|
|
|
|
var nn = nodes[wires[k]];
|
|
|
|
if (nn) {
|
|
|
|
nodeLinks[nn.id] = nodeLinks[nn.id] || [];
|
|
|
|
nodeLinks[nn.id].push(n.id);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
config.forEach(function(node) {
|
2015-02-25 00:02:45 +01:00
|
|
|
flowNodes[node.id] = node;
|
2015-01-08 23:34:26 +01:00
|
|
|
});
|
|
|
|
|
|
|
|
config.forEach(function(node) {
|
|
|
|
var changed = false;
|
|
|
|
if (node.credentials) {
|
|
|
|
changed = true;
|
|
|
|
delete node.credentials;
|
|
|
|
} else {
|
|
|
|
changed = diffNodeConfigs(flow.allNodes[node.id],node);
|
|
|
|
if (!changed) {
|
2015-02-25 00:02:45 +01:00
|
|
|
if (flowNodes[node.z] && flowNodes[node.z].type == "subflow") {
|
2015-01-08 23:34:26 +01:00
|
|
|
var originalNode = flow.allNodes[node.id];
|
|
|
|
if (originalNode && !redUtil.compareObjects(originalNode.wires,node.wires)) {
|
|
|
|
// This is a node in a subflow whose wiring has changed. Mark subflow as changed
|
|
|
|
changed = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (changed) {
|
|
|
|
changedNodes[node.id] = node;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2015-03-13 18:54:58 +01:00
|
|
|
flow.config.forEach(function(node) {
|
2015-03-15 22:27:11 +01:00
|
|
|
if (!flowNodes[node.id]) {
|
2015-05-05 14:52:55 +02:00
|
|
|
if (node.type === "tab") {
|
2015-03-15 22:27:11 +01:00
|
|
|
deletedTabs[node.id] = node;
|
2015-05-05 14:52:55 +02:00
|
|
|
} else if (node.type === "subflow") {
|
|
|
|
deletedSubflows[node.id] = node;
|
|
|
|
} else {
|
|
|
|
deletedNodes[node.id] = node;
|
2015-03-15 22:27:11 +01:00
|
|
|
}
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
buildNodeLinks(activeLinks,node,flow.allNodes);
|
|
|
|
});
|
|
|
|
|
2015-03-13 18:54:58 +01:00
|
|
|
flow.config.forEach(function(node) {
|
2015-01-08 23:34:26 +01:00
|
|
|
for (var prop in node) {
|
2015-01-15 18:10:32 +01:00
|
|
|
if (node.hasOwnProperty(prop) && prop != "z" && prop != "id" && prop != "wires") {
|
2015-01-08 23:34:26 +01:00
|
|
|
// This node has a property that references a changed node
|
|
|
|
// Assume it is a config node change and mark this node as
|
|
|
|
// changed.
|
|
|
|
if (changedNodes[node[prop]]) {
|
|
|
|
changedNodes[node.id] = node;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
var checkSubflowMembership = function(nodes,id) {
|
|
|
|
var node = nodes[id];
|
|
|
|
if (node) {
|
|
|
|
if (node.type == "subflow") {
|
|
|
|
changedSubflows[id] = node;
|
|
|
|
changedSubflowStack.push(id);
|
|
|
|
} else if (nodes[node.z] && nodes[node.z].type == "subflow") {
|
|
|
|
if (!changedSubflows[node.z]) {
|
|
|
|
changedSubflows[node.z] = nodes[node.z];
|
|
|
|
changedSubflowStack.push(node.z);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2015-02-25 00:02:45 +01:00
|
|
|
Object.keys(changedNodes).forEach(function(n) { checkSubflowMembership(flowNodes,n)});
|
2015-01-08 23:34:26 +01:00
|
|
|
Object.keys(deletedNodes).forEach(function(n) { checkSubflowMembership(flow.allNodes,n)});
|
|
|
|
|
|
|
|
while (changedSubflowStack.length > 0) {
|
|
|
|
var subflowId = changedSubflowStack.pop();
|
|
|
|
|
|
|
|
config.forEach(function(node) {
|
|
|
|
if (node.type == "subflow:"+subflowId) {
|
|
|
|
if (!changedNodes[node.id]) {
|
|
|
|
changedNodes[node.id] = node;
|
2015-02-25 00:02:45 +01:00
|
|
|
checkSubflowMembership(flowNodes,node.id);
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
config.forEach(function(node) {
|
2015-02-25 00:02:45 +01:00
|
|
|
buildNodeLinks(newLinks,node,flowNodes);
|
2015-01-08 23:34:26 +01:00
|
|
|
});
|
|
|
|
|
2015-01-15 18:10:32 +01:00
|
|
|
var markLinkedNodes = function(linkChanged,otherChangedNodes,linkMap,allNodes) {
|
2015-01-16 16:43:47 +01:00
|
|
|
var stack = Object.keys(changedNodes).concat(Object.keys(otherChangedNodes));
|
2015-01-08 23:34:26 +01:00
|
|
|
var visited = {};
|
|
|
|
|
|
|
|
while(stack.length > 0) {
|
|
|
|
var id = stack.pop();
|
|
|
|
var linkedNodes = linkMap[id];
|
|
|
|
if (linkedNodes) {
|
|
|
|
for (var i=0;i<linkedNodes.length;i++) {
|
2015-01-15 18:10:32 +01:00
|
|
|
var linkedNodeId = linkedNodes[i];
|
|
|
|
if (changedNodes[linkedNodeId] || deletedNodes[linkedNodeId] || otherChangedNodes[linkedNodeId] || linkChanged[linkedNodeId]) {
|
2015-01-08 23:34:26 +01:00
|
|
|
// Do nothing - this linked node is already marked as changed, so will get done
|
|
|
|
} else {
|
|
|
|
linkChanged[linkedNodeId] = true;
|
|
|
|
stack.push(linkedNodeId);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-02-25 00:02:45 +01:00
|
|
|
markLinkedNodes(linkChangedNodes,{},newLinks,flowNodes);
|
2015-01-15 18:10:32 +01:00
|
|
|
markLinkedNodes(linkChangedNodes,{},activeLinks,flow.allNodes);
|
2015-01-08 23:34:26 +01:00
|
|
|
|
|
|
|
var modifiedLinkNodes = {};
|
2015-01-15 18:10:32 +01:00
|
|
|
|
2015-01-08 23:34:26 +01:00
|
|
|
config.forEach(function(node) {
|
|
|
|
if (!changedNodes[node.id]) {
|
|
|
|
// only concerned about unchanged nodes whose wiring may have changed
|
|
|
|
var newNodeLinks = newLinks[node.id];
|
|
|
|
var oldNodeLinks = activeLinks[node.id];
|
|
|
|
|
|
|
|
var newLinkMap = {};
|
|
|
|
newNodeLinks.forEach(function(l) { newLinkMap[l] = (newLinkMap[l]||0)+1;});
|
|
|
|
|
|
|
|
var oldLinkMap = {};
|
|
|
|
oldNodeLinks.forEach(function(l) { oldLinkMap[l] = (oldLinkMap[l]||0)+1;});
|
|
|
|
|
|
|
|
newNodeLinks.forEach(function(link) {
|
|
|
|
if (newLinkMap[link] != oldLinkMap[link]) {
|
|
|
|
modifiedLinkNodes[node.id] = node;
|
|
|
|
linkChangedNodes[node.id] = node;
|
2015-01-15 18:10:32 +01:00
|
|
|
if (!changedNodes[link] && !deletedNodes[link]) {
|
2015-02-25 00:02:45 +01:00
|
|
|
modifiedLinkNodes[link] = flowNodes[link];
|
|
|
|
linkChangedNodes[link] = flowNodes[link];
|
2015-01-15 18:10:32 +01:00
|
|
|
}
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
});
|
|
|
|
oldNodeLinks.forEach(function(link) {
|
|
|
|
if (newLinkMap[link] != oldLinkMap[link]) {
|
|
|
|
modifiedLinkNodes[node.id] = node;
|
|
|
|
linkChangedNodes[node.id] = node;
|
2015-01-15 18:10:32 +01:00
|
|
|
if (!changedNodes[link] && !deletedNodes[link]) {
|
2015-02-25 00:02:45 +01:00
|
|
|
modifiedLinkNodes[link] = flowNodes[link];
|
|
|
|
linkChangedNodes[link] = flowNodes[link];
|
2015-01-15 18:10:32 +01:00
|
|
|
}
|
2015-01-08 23:34:26 +01:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
});
|
2015-01-16 16:43:47 +01:00
|
|
|
|
2015-02-25 00:02:45 +01:00
|
|
|
markLinkedNodes(linkChangedNodes,modifiedLinkNodes,newLinks,flowNodes);
|
2015-01-08 23:34:26 +01:00
|
|
|
|
2015-03-15 22:27:11 +01:00
|
|
|
// config.forEach(function(n) {
|
|
|
|
// console.log((changedNodes[n.id]!=null)?"[C]":"[ ]",(linkChangedNodes[n.id]!=null)?"[L]":"[ ]","[ ]",n.id,n.type,n.name);
|
|
|
|
// });
|
|
|
|
//
|
|
|
|
// Object.keys(deletedNodes).forEach(function(id) {
|
|
|
|
// var n = flow.allNodes[id];
|
|
|
|
// console.log("[ ] [ ] [D]",n.id,n.type);
|
|
|
|
// });
|
2015-01-08 23:34:26 +01:00
|
|
|
var diff = {
|
2015-05-05 14:52:55 +02:00
|
|
|
deleted: Object.keys(deletedNodes).filter(function(id) { return deletedNodes[id].type != "subflow" && (!deletedNodes[id].z || deletedTabs[deletedNodes[id].z] || !(deletedSubflows[deletedNodes[id].z] || flowNodes[deletedNodes[id].z].type == "subflow"))}),
|
2015-02-25 00:02:45 +01:00
|
|
|
changed: Object.keys(changedNodes).filter(function(id) { return changedNodes[id].type != "subflow" && (!changedNodes[id].z || flowNodes[changedNodes[id].z].type != "subflow")}),
|
|
|
|
linked: Object.keys(linkChangedNodes).filter(function(id) { return linkChangedNodes[id].type != "subflow" && (!linkChangedNodes[id].z || flowNodes[linkChangedNodes[id].z].type != "subflow")}),
|
2015-01-08 23:34:26 +01:00
|
|
|
wiringChanged: []
|
|
|
|
}
|
|
|
|
|
|
|
|
config.forEach(function(n) {
|
2015-02-25 00:02:45 +01:00
|
|
|
if (!flowNodes[n.z] || flowNodes[n.z].type != "subflow") {
|
2015-01-08 23:34:26 +01:00
|
|
|
var originalNode = flow.allNodes[n.id];
|
|
|
|
if (originalNode && !redUtil.compareObjects(originalNode.wires,n.wires)) {
|
|
|
|
diff.wiringChanged.push(n.id);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
2015-01-16 16:43:47 +01:00
|
|
|
|
2015-01-08 23:34:26 +01:00
|
|
|
return diff;
|
|
|
|
}
|
|
|
|
|
2015-02-20 02:17:24 +01:00
|
|
|
|
|
|
|
Flow.prototype.handleError = function(node,logMessage,msg) {
|
2015-06-15 16:09:50 +02:00
|
|
|
var targetCatchNode = null;
|
|
|
|
if (this.catchNodeMap[node.z]) {
|
|
|
|
targetCatchNode = this.catchNodeMap[node.z];
|
|
|
|
} else if (this.activeNodes[node.z] && this.catchNodeMap[this.activeNodes[node.z].z]) {
|
|
|
|
targetCatchNode = this.catchNodeMap[this.activeNodes[node.z].z];
|
2015-02-20 02:17:24 +01:00
|
|
|
}
|
2015-06-15 16:09:50 +02:00
|
|
|
|
|
|
|
if (targetCatchNode) {
|
|
|
|
var count = 1;
|
2015-06-15 16:22:51 +02:00
|
|
|
if (msg && msg.hasOwnProperty("error")) {
|
2015-06-15 16:09:50 +02:00
|
|
|
if (msg.error.hasOwnProperty("source")) {
|
|
|
|
if (msg.error.source.id === node.id) {
|
|
|
|
count = msg.error.source.count+1;
|
|
|
|
if (count === 10) {
|
|
|
|
node.warn("Message exceeded maximum number of catches");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-02-20 02:17:24 +01:00
|
|
|
}
|
2015-06-15 16:09:50 +02:00
|
|
|
|
|
|
|
var errorMessage;
|
|
|
|
if (msg) {
|
|
|
|
errorMessage = redUtil.cloneMessage(msg);
|
|
|
|
} else {
|
|
|
|
errorMessage = {};
|
|
|
|
}
|
|
|
|
if (errorMessage.hasOwnProperty("error")) {
|
|
|
|
errorMessage._error = errorMessage.error;
|
2015-02-20 02:17:24 +01:00
|
|
|
}
|
2015-06-15 16:09:50 +02:00
|
|
|
errorMessage.error = {
|
|
|
|
message: logMessage.toString(),
|
|
|
|
source: {
|
|
|
|
id: node.id,
|
|
|
|
type: node.type,
|
|
|
|
count: count
|
|
|
|
}
|
|
|
|
};
|
|
|
|
targetCatchNode.receive(errorMessage);
|
2015-02-20 02:17:24 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2015-01-08 23:34:26 +01:00
|
|
|
module.exports = Flow;
|