/** * Copyright JS Foundation and other contributors, http://js.foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. **/ const clone = require("clone"); const redUtil = require("@node-red/util").util; const events = require("@node-red/util").events; const flowUtil = require("./util"); const context = require('../nodes/context'); const hooks = require("@node-red/util").hooks; const credentials = require("../nodes/credentials"); let Subflow; let Log; let Group; let nodeCloseTimeout = 15000; let asyncMessageDelivery = true; /** * This class represents a flow within the runtime. It is responsible for * creating, starting and stopping all nodes within the flow. */ class Flow { /** * Create a Flow object. * @param {[type]} parent The parent flow * @param {[type]} globalFlow The global flow definition * @param {[type]} flow This flow's definition */ constructor(parent,globalFlow,flow) { this.TYPE = 'flow'; this.parent = parent; this.global = globalFlow; if (typeof flow === 'undefined') { this.flow = globalFlow; this.isGlobalFlow = true; } else { this.flow = flow; this.isGlobalFlow = false; } this.id = this.flow.id || "global"; this.groups = {} this.groupOrder = [] this.activeNodes = {}; this.subflowInstanceNodes = {}; this.catchNodes = []; this.statusNodes = []; this.path = this.id; // Ensure a context exists for this flow this.context = context.getFlowContext(this.id,this.parent.id); // env is an array of env definitions // _env is an object for direct lookup of env name -> value this.env = this.flow.env this._env = {} } /** * Log a debug-level message from this flow * @param {[type]} msg [description] * @return {[type]} [description] */ debug(msg) { Log.log({ id: this.id||"global", level: Log.DEBUG, type:this.TYPE, msg:msg }) } /** * Log an error-level message from this flow * @param {[type]} msg [description] * @return {[type]} [description] */ error(msg) { Log.log({ id: this.id||"global", level: Log.ERROR, type:this.TYPE, msg:msg }) } /** * Log a info-level message from this flow * @param {[type]} msg [description] * @return {[type]} [description] */ info(msg) { Log.log({ id: this.id||"global", level: Log.INFO, type:this.TYPE, msg:msg }) } /** * Log a trace-level message from this flow * @param {[type]} msg [description] * @return {[type]} [description] */ trace(msg) { Log.log({ id: this.id||"global", level: Log.TRACE, type:this.TYPE, msg:msg }) } /** * [log description] * @param {[type]} msg [description] * @return {[type]} [description] */ log(msg) { if (!msg.path) { msg.path = this.path; } this.parent.log(msg); } /** * Start this flow. * The `diff` argument helps define what needs to be started in the case * of a modified-nodes/flows type deploy. * @param {[type]} msg [description] * @return {[type]} [description] */ async start(diff) { this.trace("start "+this.TYPE+" ["+this.path+"]"); var node; var newNode; var id; this.catchNodes = []; this.statusNodes = []; this.completeNodeMap = {}; if (this.isGlobalFlow) { // This is the global flow. It needs to go find the `global-config` // node and extract any env properties from it const configNodes = Object.keys(this.flow.configs); for (let i = 0; i < configNodes.length; i++) { const node = this.flow.configs[configNodes[i]] if (node.type === 'global-config' && node.env) { const nodeEnv = await flowUtil.evaluateEnvProperties(this, node.env, credentials.get(node.id) || {}) this._env = { ...this._env, ...nodeEnv } } } } if (this.env) { this._env = { ...this._env, ...await flowUtil.evaluateEnvProperties(this, this.env, credentials.get(this.id) || {}) } } // Initialise the group objects. These must be done in the right order // starting from outer-most to inner-most so that the parent hierarchy // is maintained. this.groups = {} this.groupOrder = [] const groupIds = Object.keys(this.flow.groups || {}) while (groupIds.length > 0) { const id = groupIds.shift() const groupDef = this.flow.groups[id] if (!groupDef.g || this.groups[groupDef.g]) { // The parent of this group is available - either another group // or the top-level flow (this) const parent = this.groups[groupDef.g] || this this.groups[groupDef.id] = new Group(parent, groupDef) this.groupOrder.push(groupDef.id) } else { // Try again once we've processed the other groups groupIds.push(id) } } for (let i = 0; i < this.groupOrder.length; i++) { // Start the groups in the right order so they // can setup their env vars knowning their parent // will have been started await this.groups[this.groupOrder[i]].start() } var configNodes = Object.keys(this.flow.configs); var configNodeAttempts = {}; while (configNodes.length > 0) { id = configNodes.shift(); node = this.flow.configs[id]; if (!this.activeNodes[id]) { if (node.d !== true) { var readyToCreate = true; // This node doesn't exist. // Check it doesn't reference another non-existent config node for (var prop in node) { if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && this.flow.configs[node[prop]] && this.flow.configs[node[prop]].d !== true ) { if (!this.activeNodes[node[prop]]) { // References a non-existent config node // Add it to the back of the list to try again later configNodes.push(id); configNodeAttempts[id] = (configNodeAttempts[id]||0)+1; if (configNodeAttempts[id] === 100) { throw new Error("Circular config node dependency detected: "+id); } readyToCreate = false; break; } } } if (readyToCreate) { newNode = await flowUtil.createNode(this,node); if (newNode) { this.activeNodes[id] = newNode; } } } else { this.debug("not starting disabled config node : "+id); } } } if (diff && diff.rewired) { for (var j=0;j 0) { this.trace("------------------|--------------|-----------------"); this.trace(" id | type | alias"); this.trace("------------------|--------------|-----------------"); } // Build the map of catch/status/complete nodes. for (id in this.activeNodes) { if (this.activeNodes.hasOwnProperty(id)) { node = this.activeNodes[id]; this.trace(" "+id.padEnd(16)+" | "+node.type.padEnd(12)+" | "+(node._alias||"")+(node._zAlias?" [zAlias:"+node._zAlias+"]":"")); if (node.type === "catch") { this.catchNodes.push(node); } else if (node.type === "status") { this.statusNodes.push(node); } else if (node.type === "complete") { if (node.scope) { node.scope.forEach(id => { this.completeNodeMap[id] = this.completeNodeMap[id] || []; this.completeNodeMap[id].push(node); }) } } } } this.catchNodes.sort(function(A,B) { if (A.scope && !B.scope) { return -1; } else if (!A.scope && B.scope) { return 1; } else if (A.scope && B.scope) { return 0; } else if (A.uncaught && !B.uncaught) { return 1; } else if (!A.uncaught && B.uncaught) { return -1; } return 0; }); if (activeCount > 0) { this.trace("------------------|--------------|-----------------"); } // this.dump(); } /** * Stop this flow. * The `stopList` argument helps define what needs to be stopped in the case * of a modified-nodes/flows type deploy. * @param {[type]} stopList [description] * @param {[type]} removedList [description] * @return {[type]} [description] */ stop(stopList, removedList) { this.trace("stop "+this.TYPE); var i; if (!stopList) { stopList = Object.keys(this.activeNodes); } // this.trace(" stopList: "+stopList.join(",")) // Convert the list to a map to avoid multiple scans of the list var removedMap = {}; removedList = removedList || []; removedList.forEach(function(id) { removedMap[id] = true; }); let nodesToStop = []; let configsToStop = []; stopList.forEach(id => { if (this.flow.configs[id]) { configsToStop.push(id); } else { nodesToStop.push(id); } }); stopList = nodesToStop.concat(configsToStop); var promises = []; for (i=0;i{})); } catch(err) { node.error(err); } if (removedMap[stopList[i]]) { events.emit("node-status",{ id: node.id }); } } } return Promise.all(promises); } /** * Update the flow definition. This doesn't change anything that is running. * This should be called after `stop` and before `start`. * @param {[type]} _global [description] * @param {[type]} _flow [description] * @return {[type]} [description] */ update(_global,_flow) { this.global = _global; this.flow = _flow; } /** * Get a node instance from this flow. If the node is not known to this * flow, pass the request up to the parent. * @param {String} id [description] * @param {Boolean} cancelBubble if true, prevents the flow from passing the request to the parent * This stops infinite loops when the parent asked this Flow for the * node to begin with. * @return {[type]} [description] */ getNode(id, cancelBubble) { if (!id) { return undefined; } // console.log((new Error().stack).toString().split("\n").slice(1,3).join("\n")) if ((this.flow.configs && this.flow.configs[id]) || (this.flow.nodes && this.flow.nodes[id] && this.flow.nodes[id].type.substring(0,8) != "subflow:")) { // This is a node owned by this flow, so return whatever we have got // During a stop/restart, activeNodes could be null for this id return this.activeNodes[id]; } else if (this.activeNodes[id]) { // TEMP: this is a subflow internal node within this flow or subflow instance node return this.activeNodes[id]; } else if (this.subflowInstanceNodes[id]) { return this.subflowInstanceNodes[id]; } else if (cancelBubble) { // The node could be inside one of this flow's subflows var node; for (var sfId in this.subflowInstanceNodes) { if (this.subflowInstanceNodes.hasOwnProperty(sfId)) { node = this.subflowInstanceNodes[sfId].getNode(id,cancelBubble); if (node) { return node; } } } } else { // Node not found inside this flow - ask the parent return this.parent.getNode(id); } return undefined; } /** * Get a group node instance * @param {String} id * @return {Node} group node */ getGroupNode(id) { return this.groups[id]; } /** * Get all of the nodes instantiated within this flow * @return {[type]} [description] */ getActiveNodes() { return this.activeNodes; } /** * Get a flow setting value. * @param {[type]} key [description] * @return {[type]} [description] */ getSetting(key) { const flow = this.flow; if (key === "NR_FLOW_NAME") { return flow.label; } if (key === "NR_FLOW_ID") { return flow.id; } if (!key.startsWith("$parent.")) { if (this._env.hasOwnProperty(key)) { return this._env[key] } } else { key = key.substring(8); } // Delegate to the parent flow. return this.parent.getSetting(key); } /** * Handle a status event from a node within this flow. * @param {Node} node The original node that triggered the event * @param {Object} statusMessage The status object * @param {Node} reportingNode The node emitting the status event. * This could be a subflow instance node when the status * is being delegated up. * @param {boolean} muteStatusEvent Whether to emit the status event * @return {[type]} [description] */ handleStatus(node,statusMessage,reportingNode,muteStatusEvent) { if (!reportingNode) { reportingNode = node; } if (!muteStatusEvent) { if (statusMessage.hasOwnProperty("text") && typeof statusMessage.text !== "string") { try { statusMessage.text = statusMessage.text.toString(); } catch(e) {} } events.emit("node-status",{ id: node.id, status:statusMessage }); } let handled = false; if (this.id === 'global' && node.users) { // This is a global config node // Delegate status to any nodes using this config node for (let userNode in node.users) { if (node.users.hasOwnProperty(userNode)) { handled = node.users[userNode]._flow.handleStatus(node,statusMessage,node.users[userNode],true) || handled; } } } else { const candidateNodes = []; this.statusNodes.forEach(targetStatusNode => { if (targetStatusNode.g && targetStatusNode.scope === 'group' && !reportingNode.g) { // Status node inside a group, reporting node not in a group - skip it return } if (Array.isArray(targetStatusNode.scope) && targetStatusNode.scope.indexOf(reportingNode.id) === -1) { return; } let distance = 0 if (reportingNode.g) { // Reporting node inside a group. Calculate the distance between it and the status node let containingGroup = this.groups[reportingNode.g] while (containingGroup && containingGroup.id !== targetStatusNode.g) { distance++ containingGroup = this.groups[containingGroup.g] } if (!containingGroup && targetStatusNode.g && targetStatusNode.scope === 'group') { // This status node is in a group, but not in the same hierachy // the reporting node is in return } } candidateNodes.push({ d: distance, n: targetStatusNode }) }) candidateNodes.sort((A,B) => { return A.d - B.d }) candidateNodes.forEach(candidate => { const targetStatusNode = candidate.n var message = { status: clone(statusMessage) } if (statusMessage.hasOwnProperty("text")) { message.status.text = statusMessage.text.toString(); } message.status.source = { id: node.id, type: node.type, name: node.name } targetStatusNode.receive(message); handled = true; }); } return handled; } /** * Handle an error event from a node within this flow. If there are no Catch * nodes within this flow, pass the event to the parent flow. * @param {[type]} node [description] * @param {[type]} logMessage [description] * @param {[type]} msg [description] * @param {[type]} reportingNode [description] * @return {[type]} [description] */ handleError(node,logMessage,msg,reportingNode) { if (!reportingNode) { reportingNode = node; } // console.log("HE",logMessage); var count = 1; if (msg && msg.hasOwnProperty("error") && msg.error) { if (msg.error.hasOwnProperty("source") && msg.error.source) { if (msg.error.source.id === node.id) { count = msg.error.source.count+1; if (count === 10) { node.warn(Log._("nodes.flow.error-loop")); return false; } } } } let handled = false; if (this.id === 'global' && node.users) { // This is a global config node // Delegate status to any nodes using this config node for (let userNode in node.users) { if (node.users.hasOwnProperty(userNode)) { handled = node.users[userNode]._flow.handleError(node,logMessage,msg,node.users[userNode]) || handled; } } } else { const candidateNodes = []; this.catchNodes.forEach(targetCatchNode => { if (targetCatchNode.g && targetCatchNode.scope === 'group' && !reportingNode.g) { // Catch node inside a group, reporting node not in a group - skip it return } if (Array.isArray(targetCatchNode.scope) && targetCatchNode.scope.indexOf(reportingNode.id) === -1) { // Catch node has a scope set and it doesn't include the reporting node return; } let distance = 0 if (reportingNode.g) { // Reporting node inside a group. Calculate the distance between it and the catch node let containingGroup = this.groups[reportingNode.g] while (containingGroup && containingGroup.id !== targetCatchNode.g) { distance++ containingGroup = this.groups[containingGroup.g] } if (!containingGroup && targetCatchNode.g && targetCatchNode.scope === 'group') { // This catch node is in a group, but not in the same hierachy // the reporting node is in return } } candidateNodes.push({ d: distance, n: targetCatchNode }) }) candidateNodes.sort((A,B) => { return A.d - B.d }) let handledByUncaught = false candidateNodes.forEach(candidate => { const targetCatchNode = candidate.n if (targetCatchNode.uncaught && !handledByUncaught) { // This node only wants errors that haven't already been handled if (handled) { return } handledByUncaught = true } let errorMessage; if (msg) { errorMessage = redUtil.cloneMessage(msg); } else { errorMessage = {}; } if (errorMessage.hasOwnProperty("error")) { errorMessage._error = errorMessage.error; } errorMessage.error = { message: logMessage.toString(), source: { id: node.id, type: node.type, name: node.name, count: count } }; if (logMessage.hasOwnProperty('stack')) { errorMessage.error.stack = logMessage.stack; } targetCatchNode.receive(errorMessage); handled = true; }); } return handled; } handleComplete(node,msg) { if (this.completeNodeMap[node.id]) { let toSend = msg; this.completeNodeMap[node.id].forEach((completeNode,index) => { toSend = redUtil.cloneMessage(msg); completeNode.receive(toSend); }) } } send(sendEvents) { // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. // preRoute - called once for each SendEvent object in turn // preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed. // postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) // onReceive - a node is about to receive a message // postReceive - the message has been passed to the node's input handler // onDone, onError - the node has completed with a message or logged an error handleOnSend(this,sendEvents, (err, eventData) => { if (err) { let srcNode; if (Array.isArray(eventData)) { srcNode = eventData[0].source.node; } else { srcNode = eventData.source.node; } srcNode.error(err); } }); } dump() { console.log("==================") console.log(this.TYPE, this.id); for (var id in this.activeNodes) { if (this.activeNodes.hasOwnProperty(id)) { var node = this.activeNodes[id]; console.log(" ",id.padEnd(16),node.type) if (node.wires) { console.log(" -> ",node.wires) } } } console.log("==================") } } /** * Stop an individual node within this flow. * * @param {[type]} node [description] * @param {[type]} removed [description] * @return {[type]} [description] */ function stopNode(node,removed) { Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":"")); const start = Date.now(); const closePromise = node.close(removed); let closeTimer = null; const closeTimeout = new Promise((resolve,reject) => { closeTimer = setTimeout(() => { reject("Close timed out"); }, nodeCloseTimeout); }); return Promise.race([closePromise,closeTimeout]).then(() => { clearTimeout(closeTimer); var delta = Date.now() - start; Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" ); }).catch(err => { clearTimeout(closeTimer); node.error(Log._("nodes.flows.stopping-error",{message:err})); Log.debug(err.stack); }) } function handleOnSend(flow,sendEvents, reportError) { // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. hooks.trigger("onSend",sendEvents,(err) => { if (err) { reportError(err,sendEvents); return } else if (err !== false) { for (var i=0;i { if (err) { reportError(err,sendEvent); return; } else if (err !== false) { sendEvent.destination.node = flow.getNode(sendEvent.destination.id); if (sendEvent.destination.node && typeof sendEvent.destination.node === 'object') { if (sendEvent.cloneMessage) { sendEvent.msg = redUtil.cloneMessage(sendEvent.msg); } handlePreDeliver(flow,sendEvent,reportError); } } }) } function deliverMessageToDestination(sendEvent) { if (sendEvent?.destination?.node) { try { sendEvent.destination.node.receive(sendEvent.msg); } catch(err) { Log.error(`Error delivering message to node:${sendEvent.destination.node._path} [${sendEvent.destination.node.type}]`) Log.error(err.stack) } } } function handlePreDeliver(flow,sendEvent, reportError) { // preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed. hooks.trigger("preDeliver",sendEvent,(err) => { if (err) { reportError(err,sendEvent); return; } else if (err !== false) { if (asyncMessageDelivery) { setImmediate(function() { deliverMessageToDestination(sendEvent) }) } else { deliverMessageToDestination(sendEvent) } // postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) hooks.trigger("postDeliver", sendEvent, function(err) { if (err) { reportError(err,sendEvent); } }) } }) } module.exports = { init: function(runtime) { nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000; asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery Log = runtime.log; Subflow = require("./Subflow"); Group = require("./Group").Group }, create: function(parent,global,conf) { return new Flow(parent,global,conf) }, Flow: Flow }