mirror of
https://github.com/node-red/node-red.git
synced 2025-03-01 10:36:34 +00:00
Testing with Mocha requires errors raised within flows to propagate to the test runner, not getting swallowed by Node. This commit tests whether Mocha's global functions exist and, if they do, propagates exceptions that are raised there.
851 lines
32 KiB
JavaScript
851 lines
32 KiB
JavaScript
/**
|
|
* Copyright JS Foundation and other contributors, http://js.foundation
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
**/
|
|
|
|
const clone = require("clone");
|
|
const redUtil = require("@node-red/util").util;
|
|
const events = require("@node-red/util").events;
|
|
const flowUtil = require("./util");
|
|
const context = require('../nodes/context');
|
|
const hooks = require("@node-red/util").hooks;
|
|
const credentials = require("../nodes/credentials");
|
|
const detectMocha = require("detect-mocha");
|
|
|
|
let Subflow;
|
|
let Log;
|
|
let Group;
|
|
|
|
let nodeCloseTimeout = 15000;
|
|
let asyncMessageDelivery = true;
|
|
|
|
/**
|
|
* This class represents a flow within the runtime. It is responsible for
|
|
* creating, starting and stopping all nodes within the flow.
|
|
*/
|
|
class Flow {
|
|
|
|
/**
|
|
* Create a Flow object.
|
|
* @param {[type]} parent The parent flow
|
|
* @param {[type]} globalFlow The global flow definition
|
|
* @param {[type]} flow This flow's definition
|
|
*/
|
|
constructor(parent,globalFlow,flow) {
|
|
this.TYPE = 'flow';
|
|
this.parent = parent;
|
|
this.global = globalFlow;
|
|
if (typeof flow === 'undefined') {
|
|
this.flow = globalFlow;
|
|
this.isGlobalFlow = true;
|
|
} else {
|
|
this.flow = flow;
|
|
this.isGlobalFlow = false;
|
|
}
|
|
this.id = this.flow.id || "global";
|
|
this.groups = {}
|
|
this.groupOrder = []
|
|
this.activeNodes = {};
|
|
this.subflowInstanceNodes = {};
|
|
this.catchNodes = [];
|
|
this.statusNodes = [];
|
|
this.path = this.id;
|
|
// Ensure a context exists for this flow
|
|
this.context = context.getFlowContext(this.id,this.parent.id);
|
|
|
|
// env is an array of env definitions
|
|
// _env is an object for direct lookup of env name -> value
|
|
this.env = this.flow.env
|
|
this._env = {}
|
|
}
|
|
|
|
/**
|
|
* Log a debug-level message from this flow
|
|
* @param {[type]} msg [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
debug(msg) {
|
|
Log.log({
|
|
id: this.id||"global",
|
|
level: Log.DEBUG,
|
|
type:this.TYPE,
|
|
msg:msg
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Log an error-level message from this flow
|
|
* @param {[type]} msg [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
error(msg) {
|
|
Log.log({
|
|
id: this.id||"global",
|
|
level: Log.ERROR,
|
|
type:this.TYPE,
|
|
msg:msg
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Log a info-level message from this flow
|
|
* @param {[type]} msg [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
info(msg) {
|
|
Log.log({
|
|
id: this.id||"global",
|
|
level: Log.INFO,
|
|
type:this.TYPE,
|
|
msg:msg
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Log a trace-level message from this flow
|
|
* @param {[type]} msg [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
trace(msg) {
|
|
Log.log({
|
|
id: this.id||"global",
|
|
level: Log.TRACE,
|
|
type:this.TYPE,
|
|
msg:msg
|
|
})
|
|
}
|
|
|
|
/**
|
|
* [log description]
|
|
* @param {[type]} msg [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
log(msg) {
|
|
if (!msg.path) {
|
|
msg.path = this.path;
|
|
}
|
|
this.parent.log(msg);
|
|
}
|
|
|
|
/**
|
|
* Start this flow.
|
|
* The `diff` argument helps define what needs to be started in the case
|
|
* of a modified-nodes/flows type deploy.
|
|
* @param {[type]} msg [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
async start(diff) {
|
|
this.trace("start "+this.TYPE+" ["+this.path+"]");
|
|
var node;
|
|
var newNode;
|
|
var id;
|
|
this.catchNodes = [];
|
|
this.statusNodes = [];
|
|
this.completeNodeMap = {};
|
|
|
|
|
|
if (this.isGlobalFlow) {
|
|
// This is the global flow. It needs to go find the `global-config`
|
|
// node and extract any env properties from it
|
|
const configNodes = Object.keys(this.flow.configs);
|
|
for (let i = 0; i < configNodes.length; i++) {
|
|
const node = this.flow.configs[configNodes[i]]
|
|
if (node.type === 'global-config' && node.env) {
|
|
const globalCreds = credentials.get(node.id)?.map || {}
|
|
const nodeEnv = await flowUtil.evaluateEnvProperties(this, node.env, globalCreds)
|
|
this._env = { ...this._env, ...nodeEnv }
|
|
}
|
|
}
|
|
}
|
|
|
|
if (this.env) {
|
|
this._env = { ...this._env, ...await flowUtil.evaluateEnvProperties(this, this.env, credentials.get(this.id)) }
|
|
}
|
|
|
|
// Initialise the group objects. These must be done in the right order
|
|
// starting from outer-most to inner-most so that the parent hierarchy
|
|
// is maintained.
|
|
this.groups = {}
|
|
this.groupOrder = []
|
|
const groupIds = Object.keys(this.flow.groups || {})
|
|
while (groupIds.length > 0) {
|
|
const id = groupIds.shift()
|
|
const groupDef = this.flow.groups[id]
|
|
if (!groupDef.g || this.groups[groupDef.g]) {
|
|
// The parent of this group is available - either another group
|
|
// or the top-level flow (this)
|
|
const parent = this.groups[groupDef.g] || this
|
|
this.groups[groupDef.id] = new Group(parent, groupDef)
|
|
this.groupOrder.push(groupDef.id)
|
|
} else {
|
|
// Try again once we've processed the other groups
|
|
groupIds.push(id)
|
|
}
|
|
}
|
|
|
|
for (let i = 0; i < this.groupOrder.length; i++) {
|
|
// Start the groups in the right order so they
|
|
// can setup their env vars knowning their parent
|
|
// will have been started
|
|
await this.groups[this.groupOrder[i]].start()
|
|
}
|
|
|
|
var configNodes = Object.keys(this.flow.configs);
|
|
var configNodeAttempts = {};
|
|
while (configNodes.length > 0) {
|
|
id = configNodes.shift();
|
|
node = this.flow.configs[id];
|
|
if (!this.activeNodes[id]) {
|
|
if (node.d !== true) {
|
|
var readyToCreate = true;
|
|
// This node doesn't exist.
|
|
// Check it doesn't reference another non-existent config node
|
|
for (var prop in node) {
|
|
if (node.hasOwnProperty(prop) &&
|
|
prop !== 'id' &&
|
|
prop !== 'wires' &&
|
|
prop !== '_users' &&
|
|
this.flow.configs[node[prop]] &&
|
|
this.flow.configs[node[prop]].d !== true
|
|
) {
|
|
if (!this.activeNodes[node[prop]]) {
|
|
// References a non-existent config node
|
|
// Add it to the back of the list to try again later
|
|
configNodes.push(id);
|
|
configNodeAttempts[id] = (configNodeAttempts[id]||0)+1;
|
|
if (configNodeAttempts[id] === 100) {
|
|
throw new Error("Circular config node dependency detected: "+id);
|
|
}
|
|
readyToCreate = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (readyToCreate) {
|
|
newNode = await flowUtil.createNode(this,node);
|
|
if (newNode) {
|
|
this.activeNodes[id] = newNode;
|
|
}
|
|
}
|
|
} else {
|
|
this.debug("not starting disabled config node : "+id);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (diff && diff.rewired) {
|
|
for (var j=0;j<diff.rewired.length;j++) {
|
|
var rewireNode = this.activeNodes[diff.rewired[j]];
|
|
if (rewireNode) {
|
|
rewireNode.updateWires(this.flow.nodes[rewireNode.id].wires);
|
|
}
|
|
}
|
|
}
|
|
|
|
for (id in this.flow.nodes) {
|
|
if (this.flow.nodes.hasOwnProperty(id)) {
|
|
node = this.flow.nodes[id];
|
|
if (node.d !== true) {
|
|
if (!node.subflow) {
|
|
if (!this.activeNodes[id]) {
|
|
newNode = await flowUtil.createNode(this,node);
|
|
if (newNode) {
|
|
this.activeNodes[id] = newNode;
|
|
}
|
|
}
|
|
} else {
|
|
if (!this.subflowInstanceNodes[id]) {
|
|
try {
|
|
var subflowDefinition = this.flow.subflows[node.subflow]||this.global.subflows[node.subflow]
|
|
// console.log("NEED TO CREATE A SUBFLOW",id,node.subflow);
|
|
this.subflowInstanceNodes[id] = true;
|
|
var subflow = Subflow.create(
|
|
this,
|
|
this.global,
|
|
subflowDefinition,
|
|
node
|
|
);
|
|
this.subflowInstanceNodes[id] = subflow;
|
|
await subflow.start();
|
|
this.activeNodes[id] = subflow.node;
|
|
|
|
// this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
|
|
// for (var i=0;i<nodes.length;i++) {
|
|
// if (nodes[i]) {
|
|
// this.activeNodes[nodes[i].id] = nodes[i];
|
|
// }
|
|
// }
|
|
} catch(err) {
|
|
console.log(err.stack)
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
this.debug("not starting disabled node : "+id);
|
|
}
|
|
}
|
|
}
|
|
|
|
var activeCount = Object.keys(this.activeNodes).length;
|
|
if (activeCount > 0) {
|
|
this.trace("------------------|--------------|-----------------");
|
|
this.trace(" id | type | alias");
|
|
this.trace("------------------|--------------|-----------------");
|
|
}
|
|
// Build the map of catch/status/complete nodes.
|
|
for (id in this.activeNodes) {
|
|
if (this.activeNodes.hasOwnProperty(id)) {
|
|
node = this.activeNodes[id];
|
|
this.trace(" "+id.padEnd(16)+" | "+node.type.padEnd(12)+" | "+(node._alias||"")+(node._zAlias?" [zAlias:"+node._zAlias+"]":""));
|
|
if (node.type === "catch") {
|
|
this.catchNodes.push(node);
|
|
} else if (node.type === "status") {
|
|
this.statusNodes.push(node);
|
|
} else if (node.type === "complete") {
|
|
if (node.scope) {
|
|
node.scope.forEach(id => {
|
|
this.completeNodeMap[id] = this.completeNodeMap[id] || [];
|
|
this.completeNodeMap[id].push(node);
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
this.catchNodes.sort(function(A,B) {
|
|
if (A.scope && !B.scope) {
|
|
return -1;
|
|
} else if (!A.scope && B.scope) {
|
|
return 1;
|
|
} else if (A.scope && B.scope) {
|
|
return 0;
|
|
} else if (A.uncaught && !B.uncaught) {
|
|
return 1;
|
|
} else if (!A.uncaught && B.uncaught) {
|
|
return -1;
|
|
}
|
|
return 0;
|
|
});
|
|
|
|
if (activeCount > 0) {
|
|
this.trace("------------------|--------------|-----------------");
|
|
}
|
|
// this.dump();
|
|
}
|
|
|
|
/**
|
|
* Stop this flow.
|
|
* The `stopList` argument helps define what needs to be stopped in the case
|
|
* of a modified-nodes/flows type deploy.
|
|
* @param {[type]} stopList [description]
|
|
* @param {[type]} removedList [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
stop(stopList, removedList) {
|
|
this.trace("stop "+this.TYPE);
|
|
var i;
|
|
if (!stopList) {
|
|
stopList = Object.keys(this.activeNodes);
|
|
}
|
|
// this.trace(" stopList: "+stopList.join(","))
|
|
// Convert the list to a map to avoid multiple scans of the list
|
|
var removedMap = {};
|
|
removedList = removedList || [];
|
|
removedList.forEach(function(id) {
|
|
removedMap[id] = true;
|
|
});
|
|
|
|
let nodesToStop = [];
|
|
let configsToStop = [];
|
|
stopList.forEach(id => {
|
|
if (this.flow.configs[id]) {
|
|
configsToStop.push(id);
|
|
} else {
|
|
nodesToStop.push(id);
|
|
}
|
|
});
|
|
stopList = nodesToStop.concat(configsToStop);
|
|
|
|
var promises = [];
|
|
for (i=0;i<stopList.length;i++) {
|
|
var node = this.activeNodes[stopList[i]];
|
|
if (node) {
|
|
delete this.activeNodes[stopList[i]];
|
|
if (this.subflowInstanceNodes[stopList[i]]) {
|
|
delete this.subflowInstanceNodes[stopList[i]];
|
|
}
|
|
try {
|
|
var removed = removedMap[stopList[i]];
|
|
promises.push(stopNode(node,removed).catch(()=>{}));
|
|
} catch(err) {
|
|
node.error(err);
|
|
}
|
|
if (removedMap[stopList[i]]) {
|
|
events.emit("node-status",{
|
|
id: node.id
|
|
});
|
|
}
|
|
}
|
|
}
|
|
return Promise.all(promises);
|
|
}
|
|
|
|
/**
|
|
* Update the flow definition. This doesn't change anything that is running.
|
|
* This should be called after `stop` and before `start`.
|
|
* @param {[type]} _global [description]
|
|
* @param {[type]} _flow [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
update(_global,_flow) {
|
|
this.global = _global;
|
|
this.flow = _flow;
|
|
}
|
|
|
|
/**
|
|
* Get a node instance from this flow. If the node is not known to this
|
|
* flow, pass the request up to the parent.
|
|
* @param {String} id [description]
|
|
* @param {Boolean} cancelBubble if true, prevents the flow from passing the request to the parent
|
|
* This stops infinite loops when the parent asked this Flow for the
|
|
* node to begin with.
|
|
* @return {[type]} [description]
|
|
*/
|
|
getNode(id, cancelBubble) {
|
|
if (!id) {
|
|
return undefined;
|
|
}
|
|
// console.log((new Error().stack).toString().split("\n").slice(1,3).join("\n"))
|
|
if ((this.flow.configs && this.flow.configs[id]) || (this.flow.nodes && this.flow.nodes[id] && this.flow.nodes[id].type.substring(0,8) != "subflow:")) {
|
|
// This is a node owned by this flow, so return whatever we have got
|
|
// During a stop/restart, activeNodes could be null for this id
|
|
return this.activeNodes[id];
|
|
} else if (this.activeNodes[id]) {
|
|
// TEMP: this is a subflow internal node within this flow or subflow instance node
|
|
return this.activeNodes[id];
|
|
} else if (this.subflowInstanceNodes[id]) {
|
|
return this.subflowInstanceNodes[id];
|
|
} else if (cancelBubble) {
|
|
// The node could be inside one of this flow's subflows
|
|
var node;
|
|
for (var sfId in this.subflowInstanceNodes) {
|
|
if (this.subflowInstanceNodes.hasOwnProperty(sfId)) {
|
|
node = this.subflowInstanceNodes[sfId].getNode(id,cancelBubble);
|
|
if (node) {
|
|
return node;
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// Node not found inside this flow - ask the parent
|
|
return this.parent.getNode(id);
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
|
|
/**
|
|
* Get a group node instance
|
|
* @param {String} id
|
|
* @return {Node} group node
|
|
*/
|
|
getGroupNode(id) {
|
|
return this.groups[id];
|
|
}
|
|
|
|
/**
|
|
* Get all of the nodes instantiated within this flow
|
|
* @return {[type]} [description]
|
|
*/
|
|
getActiveNodes() {
|
|
return this.activeNodes;
|
|
}
|
|
|
|
/**
|
|
* Get a flow setting value.
|
|
* @param {[type]} key [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
getSetting(key) {
|
|
const flow = this.flow;
|
|
if (key === "NR_FLOW_NAME") {
|
|
return flow.label;
|
|
}
|
|
if (key === "NR_FLOW_ID") {
|
|
return flow.id;
|
|
}
|
|
if (!key.startsWith("$parent.")) {
|
|
if (this._env.hasOwnProperty(key)) {
|
|
return (this._env[key] && Object.hasOwn(this._env[key], 'value') && this._env[key].__clone__) ? clone(this._env[key].value) : this._env[key]
|
|
}
|
|
} else {
|
|
key = key.substring(8);
|
|
}
|
|
// Delegate to the parent flow.
|
|
return this.parent.getSetting(key);
|
|
}
|
|
|
|
/**
|
|
* Handle a status event from a node within this flow.
|
|
* @param {Node} node The original node that triggered the event
|
|
* @param {Object} statusMessage The status object
|
|
* @param {Node} reportingNode The node emitting the status event.
|
|
* This could be a subflow instance node when the status
|
|
* is being delegated up.
|
|
* @param {boolean} muteStatusEvent Whether to emit the status event
|
|
* @return {[type]} [description]
|
|
*/
|
|
handleStatus(node,statusMessage,reportingNode,muteStatusEvent) {
|
|
if (!reportingNode) {
|
|
reportingNode = node;
|
|
}
|
|
if (!muteStatusEvent) {
|
|
if (statusMessage.hasOwnProperty("text") && typeof statusMessage.text !== "string") {
|
|
try {
|
|
statusMessage.text = statusMessage.text.toString();
|
|
}
|
|
catch(e) {}
|
|
}
|
|
events.emit("node-status",{
|
|
id: node.id,
|
|
status:statusMessage
|
|
});
|
|
}
|
|
|
|
let handled = false;
|
|
|
|
if (this.id === 'global' && node.users) {
|
|
// This is a global config node
|
|
// Delegate status to any nodes using this config node
|
|
for (let userNode in node.users) {
|
|
if (node.users.hasOwnProperty(userNode)) {
|
|
handled = node.users[userNode]._flow.handleStatus(node,statusMessage,node.users[userNode],true) || handled;
|
|
}
|
|
}
|
|
} else {
|
|
const candidateNodes = [];
|
|
this.statusNodes.forEach(targetStatusNode => {
|
|
if (targetStatusNode.g && targetStatusNode.scope === 'group' && !reportingNode.g) {
|
|
// Status node inside a group, reporting node not in a group - skip it
|
|
return
|
|
}
|
|
if (Array.isArray(targetStatusNode.scope) && targetStatusNode.scope.indexOf(reportingNode.id) === -1) {
|
|
return;
|
|
}
|
|
let distance = 0
|
|
if (reportingNode.g) {
|
|
// Reporting node inside a group. Calculate the distance between it and the status node
|
|
let containingGroup = this.groups[reportingNode.g]
|
|
while (containingGroup && containingGroup.id !== targetStatusNode.g) {
|
|
distance++
|
|
containingGroup = this.groups[containingGroup.g]
|
|
}
|
|
if (!containingGroup && targetStatusNode.g && targetStatusNode.scope === 'group') {
|
|
// This status node is in a group, but not in the same hierachy
|
|
// the reporting node is in
|
|
return
|
|
}
|
|
}
|
|
candidateNodes.push({ d: distance, n: targetStatusNode })
|
|
})
|
|
candidateNodes.sort((A,B) => {
|
|
return A.d - B.d
|
|
})
|
|
candidateNodes.forEach(candidate => {
|
|
const targetStatusNode = candidate.n
|
|
var message = {
|
|
status: clone(statusMessage)
|
|
}
|
|
if (statusMessage.hasOwnProperty("text")) {
|
|
message.status.text = statusMessage.text.toString();
|
|
}
|
|
message.status.source = {
|
|
id: node.id,
|
|
type: node.type,
|
|
name: node.name
|
|
}
|
|
|
|
targetStatusNode.receive(message);
|
|
handled = true;
|
|
});
|
|
}
|
|
return handled;
|
|
}
|
|
|
|
/**
|
|
* Handle an error event from a node within this flow. If there are no Catch
|
|
* nodes within this flow, pass the event to the parent flow.
|
|
* @param {[type]} node [description]
|
|
* @param {[type]} logMessage [description]
|
|
* @param {[type]} msg [description]
|
|
* @param {[type]} reportingNode [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
handleError(node,logMessage,msg,reportingNode) {
|
|
if (!reportingNode) {
|
|
reportingNode = node;
|
|
}
|
|
// console.log("HE",logMessage);
|
|
var count = 1;
|
|
if (msg && msg.hasOwnProperty("error") && msg.error) {
|
|
if (msg.error.hasOwnProperty("source") && msg.error.source) {
|
|
if (msg.error.source.id === node.id) {
|
|
count = msg.error.source.count+1;
|
|
if (count === 10) {
|
|
node.warn(Log._("nodes.flow.error-loop"));
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
let handled = false;
|
|
|
|
if (this.id === 'global' && node.users) {
|
|
// This is a global config node
|
|
// Delegate status to any nodes using this config node
|
|
for (let userNode in node.users) {
|
|
if (node.users.hasOwnProperty(userNode)) {
|
|
handled = node.users[userNode]._flow.handleError(node,logMessage,msg,node.users[userNode]) || handled;
|
|
}
|
|
}
|
|
} else {
|
|
const candidateNodes = [];
|
|
this.catchNodes.forEach(targetCatchNode => {
|
|
if (targetCatchNode.g && targetCatchNode.scope === 'group' && !reportingNode.g) {
|
|
// Catch node inside a group, reporting node not in a group - skip it
|
|
return
|
|
}
|
|
if (Array.isArray(targetCatchNode.scope) && targetCatchNode.scope.indexOf(reportingNode.id) === -1) {
|
|
// Catch node has a scope set and it doesn't include the reporting node
|
|
return;
|
|
}
|
|
let distance = 0
|
|
if (reportingNode.g) {
|
|
// Reporting node inside a group. Calculate the distance between it and the catch node
|
|
let containingGroup = this.groups[reportingNode.g]
|
|
while (containingGroup && containingGroup.id !== targetCatchNode.g) {
|
|
distance++
|
|
containingGroup = this.groups[containingGroup.g]
|
|
}
|
|
if (!containingGroup && targetCatchNode.g && targetCatchNode.scope === 'group') {
|
|
// This catch node is in a group, but not in the same hierachy
|
|
// the reporting node is in
|
|
return
|
|
}
|
|
}
|
|
candidateNodes.push({ d: distance, n: targetCatchNode })
|
|
})
|
|
candidateNodes.sort((A,B) => {
|
|
return A.d - B.d
|
|
})
|
|
let handledByUncaught = false
|
|
candidateNodes.forEach(candidate => {
|
|
const targetCatchNode = candidate.n
|
|
if (targetCatchNode.uncaught && !handledByUncaught) {
|
|
// This node only wants errors that haven't already been handled
|
|
if (handled) {
|
|
return
|
|
}
|
|
handledByUncaught = true
|
|
}
|
|
let errorMessage;
|
|
if (msg) {
|
|
errorMessage = redUtil.cloneMessage(msg);
|
|
} else {
|
|
errorMessage = {};
|
|
}
|
|
if (errorMessage.hasOwnProperty("error")) {
|
|
errorMessage._error = errorMessage.error;
|
|
}
|
|
errorMessage.error = {
|
|
message: logMessage.toString(),
|
|
source: {
|
|
id: node.id,
|
|
type: node.type,
|
|
name: node.name,
|
|
count: count
|
|
}
|
|
};
|
|
if (logMessage.hasOwnProperty('stack')) {
|
|
errorMessage.error.stack = logMessage.stack;
|
|
}
|
|
if (logMessage.hasOwnProperty('cause')) {
|
|
errorMessage.error.cause = logMessage.cause;
|
|
}
|
|
targetCatchNode.receive(errorMessage);
|
|
handled = true;
|
|
});
|
|
}
|
|
return handled;
|
|
}
|
|
|
|
handleComplete(node,msg) {
|
|
if (this.completeNodeMap[node.id]) {
|
|
let toSend = msg;
|
|
this.completeNodeMap[node.id].forEach((completeNode,index) => {
|
|
toSend = redUtil.cloneMessage(msg);
|
|
completeNode.receive(toSend);
|
|
})
|
|
}
|
|
}
|
|
|
|
send(sendEvents) {
|
|
// onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object.
|
|
// preRoute - called once for each SendEvent object in turn
|
|
// preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed.
|
|
// postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery)
|
|
// onReceive - a node is about to receive a message
|
|
// postReceive - the message has been passed to the node's input handler
|
|
// onDone, onError - the node has completed with a message or logged an error
|
|
handleOnSend(this,sendEvents, (err, eventData) => {
|
|
if (err) {
|
|
let srcNode;
|
|
if (Array.isArray(eventData)) {
|
|
srcNode = eventData[0].source.node;
|
|
} else {
|
|
srcNode = eventData.source.node;
|
|
}
|
|
srcNode.error(err);
|
|
}
|
|
});
|
|
}
|
|
|
|
dump() {
|
|
console.log("==================")
|
|
console.log(this.TYPE, this.id);
|
|
for (var id in this.activeNodes) {
|
|
if (this.activeNodes.hasOwnProperty(id)) {
|
|
var node = this.activeNodes[id];
|
|
console.log(" ",id.padEnd(16),node.type)
|
|
if (node.wires) {
|
|
console.log(" -> ",node.wires)
|
|
}
|
|
}
|
|
}
|
|
console.log("==================")
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Stop an individual node within this flow.
|
|
*
|
|
* @param {[type]} node [description]
|
|
* @param {[type]} removed [description]
|
|
* @return {[type]} [description]
|
|
*/
|
|
function stopNode(node,removed) {
|
|
Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":""));
|
|
const start = Date.now();
|
|
const closePromise = node.close(removed);
|
|
let closeTimer = null;
|
|
const closeTimeout = new Promise((resolve,reject) => {
|
|
closeTimer = setTimeout(() => {
|
|
reject("Close timed out");
|
|
}, nodeCloseTimeout);
|
|
});
|
|
return Promise.race([closePromise,closeTimeout]).then(() => {
|
|
clearTimeout(closeTimer);
|
|
var delta = Date.now() - start;
|
|
Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" );
|
|
}).catch(err => {
|
|
clearTimeout(closeTimer);
|
|
node.error(Log._("nodes.flows.stopping-error",{message:err}));
|
|
Log.debug(err.stack);
|
|
})
|
|
}
|
|
|
|
|
|
function handleOnSend(flow,sendEvents, reportError) {
|
|
// onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object.
|
|
hooks.trigger("onSend",sendEvents,(err) => {
|
|
if (err) {
|
|
reportError(err,sendEvents);
|
|
return
|
|
} else if (err !== false) {
|
|
for (var i=0;i<sendEvents.length;i++) {
|
|
handlePreRoute(flow,sendEvents[i],reportError)
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
function handlePreRoute(flow, sendEvent, reportError) {
|
|
// preRoute - called once for each SendEvent object in turn
|
|
hooks.trigger("preRoute",sendEvent,(err) => {
|
|
if (err) {
|
|
reportError(err,sendEvent);
|
|
return;
|
|
} else if (err !== false) {
|
|
sendEvent.destination.node = flow.getNode(sendEvent.destination.id);
|
|
if (sendEvent.destination.node && typeof sendEvent.destination.node === 'object') {
|
|
if (sendEvent.cloneMessage) {
|
|
sendEvent.msg = redUtil.cloneMessage(sendEvent.msg);
|
|
}
|
|
handlePreDeliver(flow,sendEvent,reportError);
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
function deliverMessageToDestination(sendEvent) {
|
|
if (sendEvent?.destination?.node) {
|
|
try {
|
|
sendEvent.destination.node.receive(sendEvent.msg);
|
|
} catch(err) {
|
|
Log.error(`Error delivering message to node:${sendEvent.destination.node._path} [${sendEvent.destination.node.type}]`)
|
|
Log.error(err.stack)
|
|
if (detectMocha()) {
|
|
throw err;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
function handlePreDeliver(flow,sendEvent, reportError) {
|
|
// preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed.
|
|
hooks.trigger("preDeliver",sendEvent,(err) => {
|
|
if (err) {
|
|
reportError(err,sendEvent);
|
|
return;
|
|
} else if (err !== false) {
|
|
if (asyncMessageDelivery) {
|
|
setImmediate(function() {
|
|
deliverMessageToDestination(sendEvent)
|
|
})
|
|
} else {
|
|
deliverMessageToDestination(sendEvent)
|
|
}
|
|
// postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery)
|
|
hooks.trigger("postDeliver", sendEvent, function(err) {
|
|
if (err) {
|
|
reportError(err,sendEvent);
|
|
}
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
module.exports = {
|
|
init: function(runtime) {
|
|
nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000;
|
|
asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery
|
|
Log = runtime.log;
|
|
Subflow = require("./Subflow");
|
|
Group = require("./Group").Group
|
|
},
|
|
create: function(parent,global,conf) {
|
|
return new Flow(parent,global,conf)
|
|
},
|
|
Flow: Flow
|
|
}
|