/** * Copyright JS Foundation and other contributors, http://js.foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. **/ var when = require("when"); var clone = require("clone"); var Subflow; var Log; var redUtil = require("@node-red/util").util; var flowUtil = require("./util"); var events = require("../../events"); var nodeCloseTimeout = 15000; class Flow { constructor(parent,globalFlow,flow) { this.TYPE = 'flow'; this.parent = parent; this.global = globalFlow; if (typeof flow === 'undefined') { this.flow = globalFlow; this.isGlobalFlow = true; } else { this.flow = flow; this.isGlobalFlow = false; } this.id = this.flow.id || "global"; this.activeNodes = {}; this.subflowInstanceNodes = {}; this.catchNodes = []; this.statusNodes = []; } debug(msg) { Log.log({ id: this.id||"global", level: Log.DEBUG, type:this.TYPE, msg:msg }) } log(msg) { Log.log({ id: this.id||"global", level: Log.INFO, type:this.TYPE, msg:msg }) } trace(msg) { Log.log({ id: this.id||"global", level: Log.TRACE, type:this.TYPE, msg:msg }) } start(diff) { this.trace("start "+this.TYPE); var node; var newNode; var id; this.catchNodes = []; this.statusNodes = []; var configNodes = Object.keys(this.flow.configs); var configNodeAttempts = {}; while (configNodes.length > 0) { id = configNodes.shift(); node = this.flow.configs[id]; if (!this.activeNodes[id]) { var readyToCreate = true; // This node doesn't exist. // Check it doesn't reference another non-existent config node for (var prop in node) { if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && this.flow.configs[node[prop]]) { if (!this.activeNodes[node[prop]]) { // References a non-existent config node // Add it to the back of the list to try again later configNodes.push(id); configNodeAttempts[id] = (configNodeAttempts[id]||0)+1; if (configNodeAttempts[id] === 100) { throw new Error("Circular config node dependency detected: "+id); } readyToCreate = false; break; } } } if (readyToCreate) { newNode = flowUtil.createNode(this,node); if (newNode) { this.activeNodes[id] = newNode; } } } } if (diff && diff.rewired) { for (var j=0;j 0) { this.trace("------------------|--------------|-----------------"); this.trace(" id | type | alias"); this.trace("------------------|--------------|-----------------"); } // Build the map of catch/status nodes. for (id in this.activeNodes) { if (this.activeNodes.hasOwnProperty(id)) { node = this.activeNodes[id]; this.trace(" "+id.padEnd(16)+" | "+node.type.padEnd(12)+" | "+(node._alias||"")); if (node.type === "catch") { this.catchNodes.push(node); } else if (node.type === "status") { this.statusNodes.push(node); } } } if (activeCount > 0) { this.trace("------------------|--------------|-----------------"); } // this.dump(); } stop(stopList, removedList) { return new Promise((resolve,reject) => { this.trace("stop "+this.TYPE); var i; if (stopList) { // for (i=0;i { subflow.stop() })); } catch(err) { node.error(err); } delete this.subflowInstanceNodes[stopList[i]]; } else { try { var removed = removedMap[stopList[i]]; promises.push(this.stopNode(node,removed)); } catch(err) { node.error(err); } } } } when.settle(promises).then(function(results) { resolve(); }); }); } stopNode(node,removed) { return when.promise(function(resolve, reject) { var start; when.promise(function(resolve) { Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":"")); start = Date.now(); resolve(node.close(removed)); }).timeout(nodeCloseTimeout).then(function(){ var delta = Date.now() - start; Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" ); resolve(delta); },function(err) { var delta = Date.now() - start; node.error(Log._("nodes.flows.stopping-error",{message:err})); Log.debug(err.stack); reject(err); }); }) } update(_global,_flow) { this.global = _global; this.flow = _flow; } getNode(id) { // console.log('getNode',id,!!this.activeNodes[id]) if (!id) { return undefined; } // console.log((new Error().stack).toString().split("\n").slice(1,3).join("\n")) if ((this.flow.configs && this.flow.configs[id]) || (this.flow.nodes && this.flow.nodes[id])) { // This is a node owned by this flow, so return whatever we have got // During a stop/restart, activeNodes could be null for this id return this.activeNodes[id]; } else if (this.activeNodes[id]) { // TEMP: this is a subflow internal node within this flow return this.activeNodes[id]; } return this.parent.getNode(id); } getActiveNodes() { return this.activeNodes; } handleStatus(node,statusMessage) { events.emit("node-status",{ id: node.id, status:statusMessage }); var handled = false; this.statusNodes.forEach(function(targetStatusNode) { if (targetStatusNode.scope && targetStatusNode.scope.indexOf(node.id) === -1) { return; } var message = { status: { text: "", source: { id: node.id, type: node.type, name: node.name } } }; if (statusMessage.hasOwnProperty("text")) { message.status.text = statusMessage.text.toString(); } targetStatusNode.receive(message); handled = true; }); if (!handled) { // // Nothing in this flow handled the status - pass it to the parent this.parent.handleStatus(node,statusMessage); } } handleError(node,logMessage,msg) { // console.log("HE",logMessage); var count = 1; if (msg && msg.hasOwnProperty("error") && msg.error !== null) { if (msg.error.hasOwnProperty("source") && msg.error.source !== null) { if (msg.error.source.id === node.id) { count = msg.error.source.count+1; if (count === 10) { node.warn(Log._("nodes.flow.error-loop")); return false; } } } } var handled = false; this.catchNodes.forEach(function(targetCatchNode) { if (targetCatchNode.scope && targetCatchNode.scope.indexOf(node.id) === -1) { return; } var errorMessage; if (msg) { errorMessage = redUtil.cloneMessage(msg); } else { errorMessage = {}; } if (errorMessage.hasOwnProperty("error")) { errorMessage._error = errorMessage.error; } errorMessage.error = { message: logMessage.toString(), source: { id: node.id, type: node.type, name: node.name, count: count } }; if (logMessage.hasOwnProperty('stack')) { errorMessage.error.stack = logMessage.stack; } targetCatchNode.receive(errorMessage); handled = true; }); if (!handled) { // Nothing in this flow handled the error - pass it to the parent handled = this.parent.handleError(node,logMessage); } return handled; } dump() { console.log("==================") console.log(this.TYPE, this.id); for (var id in this.activeNodes) { if (this.activeNodes.hasOwnProperty(id)) { var node = this.activeNodes[id]; console.log(" ",id.padEnd(16),node.type) if (node.wires) { console.log(" -> ",node.wires) } } } console.log("==================") } } module.exports = { init: function(runtime) { nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000; Log = runtime.log; Subflow = require("./Subflow"); Subflow.init(runtime); }, create: function(parent,global,conf) { return new Flow(parent,global,conf); }, Flow: Flow }