/** * Copyright 2015 IBM Corp. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. **/ var when = require("when"); var clone = require("clone"); var typeRegistry = require("./registry"); var credentials = require("./credentials"); var redUtil = require("../util"); var events = require("../events"); var Log = require("../log"); function getID() { return (1+Math.random()*4294967295).toString(16); } function createNode(type,config) { var nn = null; var nt = typeRegistry.get(type); if (nt) { try { nn = new nt(clone(config)); } catch (err) { Log.log({ level: Log.ERROR, id:config.id, type: type, msg: err }); } } else { Log.error("Unknown type: "+type); } return nn; } function createSubflow(sf,sfn,subflows) { //console.log("CREATE SUBFLOW",sf.config.id,sfn.id); var nodes = []; var node_map = {}; var newNodes = []; var node; var wires; var i,j,k; // Clone all of the subflow node definitions and give them new IDs for (i=0;i 0) { throw new Error("missing types"); } events.emit("nodes-starting"); var id; var node; for (id in this.configNodes) { if (this.configNodes.hasOwnProperty(id)) { node = this.configNodes[id]; if (!this.activeNodes[id]) { this.activeNodes[id] = createNode(node.type,node); } } } for (id in this.nodes) { if (this.nodes.hasOwnProperty(id)) { node = this.nodes[id]; if (!node.subflow) { if (!this.activeNodes[id]) { this.activeNodes[id] = createNode(node.type,node.config); //console.log(id,"created"); } else { //console.log(id,"already running"); } } else { if (!this.subflowInstanceNodes[id]) { var nodes = createSubflow(this.subflows[node.subflow],node.config,this.subflows); this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id}); for (var i=0;i 0) { var i = this.missingTypes.indexOf(type); if (i != -1) { this.missingTypes.splice(i,1); if (this.missingTypes.length === 0 && this.started) { this.start(); } return true; } } return false; } Flow.prototype.getNode = function(id) { return this.activeNodes[id]; } Flow.prototype.getFlow = function() { //console.log(this.config); return this.config; } Flow.prototype.eachNode = function(callback) { for (var id in this.activeNodes) { if (this.activeNodes.hasOwnProperty(id)) { callback(this.activeNodes[id]); } } } Flow.prototype.diffConfig = function(config,type) { var activeNodesToStop = []; var nodesToRewire = []; if (type && type!="full") { var diff = diffFlow(this,config); //var diff = { // deleted:[] // changed:[] // linked:[] // wiringChanged: [] //} var nodesToStop = []; nodesToRewire = diff.wiringChanged; if (type == "nodes") { nodesToStop = diff.deleted.concat(diff.changed); } else if (type == "flows") { nodesToStop = diff.deleted.concat(diff.changed).concat(diff.linked); } for (var i=0;i 0) { var subflowId = changedSubflowStack.pop(); config.forEach(function(node) { if (node.type == "subflow:"+subflowId) { if (!changedNodes[node.id]) { changedNodes[node.id] = node; checkSubflowMembership(flowNodes,node.id); } } }); } config.forEach(function(node) { buildNodeLinks(newLinks,node,flowNodes); }); var markLinkedNodes = function(linkChanged,otherChangedNodes,linkMap,allNodes) { var stack = Object.keys(changedNodes).concat(Object.keys(otherChangedNodes)); var visited = {}; while(stack.length > 0) { var id = stack.pop(); var linkedNodes = linkMap[id]; if (linkedNodes) { for (var i=0;i