2015-10-11 20:37:11 +01:00
/ * *
2017-01-11 15:24:33 +00:00
* Copyright JS Foundation and other contributors , http : //js.foundation
2015-10-11 20:37:11 +01:00
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* * /
var clone = require ( "clone" ) ;
2018-08-17 22:10:54 +01:00
var redUtil = require ( "@node-red/util" ) . util ;
2020-12-02 09:25:10 +00:00
const events = require ( "@node-red/util" ) . events ;
2015-10-11 20:37:11 +01:00
var flowUtil = require ( "./util" ) ;
2020-07-20 16:48:47 +01:00
const context = require ( '../nodes/context' ) ;
2021-04-12 20:30:31 +01:00
const hooks = require ( "@node-red/util" ) . hooks ;
2021-08-30 08:00:58 +09:00
const credentials = require ( "../nodes/credentials" ) ;
2015-10-11 20:37:11 +01:00
2019-01-17 13:18:26 +00:00
var Subflow ;
var Log ;
2017-05-15 13:05:33 +01:00
var nodeCloseTimeout = 15000 ;
2019-07-08 23:23:33 +01:00
var asyncMessageDelivery = true ;
2017-05-15 13:05:33 +01:00
2019-01-16 23:33:04 +00:00
/ * *
* This class represents a flow within the runtime . It is responsible for
* creating , starting and stopping all nodes within the flow .
* /
2019-01-11 14:53:21 +00:00
class Flow {
2019-01-16 23:33:04 +00:00
/ * *
* Create a Flow object .
* @ param { [ type ] } parent The parent flow
* @ param { [ type ] } globalFlow The global flow definition
* @ param { [ type ] } flow This flow ' s definition
* /
2019-01-16 16:27:19 +00:00
constructor ( parent , globalFlow , flow ) {
this . TYPE = 'flow' ;
this . parent = parent ;
this . global = globalFlow ;
2019-01-11 14:53:21 +00:00
if ( typeof flow === 'undefined' ) {
2019-01-16 16:27:19 +00:00
this . flow = globalFlow ;
this . isGlobalFlow = true ;
2019-01-11 14:53:21 +00:00
} else {
this . flow = flow ;
2019-01-16 16:27:19 +00:00
this . isGlobalFlow = false ;
2019-01-11 14:53:21 +00:00
}
2019-01-16 16:27:19 +00:00
this . id = this . flow . id || "global" ;
2019-01-11 14:53:21 +00:00
this . activeNodes = { } ;
this . subflowInstanceNodes = { } ;
2019-01-16 16:27:19 +00:00
this . catchNodes = [ ] ;
this . statusNodes = [ ] ;
2020-01-17 16:53:01 +00:00
this . path = this . id ;
2020-03-27 23:47:12 +00:00
// Ensure a context exists for this flow
this . context = context . getFlowContext ( this . id , this . parent . id ) ;
2015-10-11 20:37:11 +01:00
}
2019-01-16 23:33:04 +00:00
/ * *
* Log a debug - level message from this flow
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-16 16:27:19 +00:00
debug ( msg ) {
Log . log ( {
id : this . id || "global" ,
level : Log . DEBUG ,
type : this . TYPE ,
msg : msg
} )
}
2019-03-20 13:37:33 +00:00
/ * *
* Log an error - level message from this flow
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
error ( msg ) {
Log . log ( {
id : this . id || "global" ,
level : Log . ERROR ,
type : this . TYPE ,
msg : msg
} )
}
2019-01-16 23:33:04 +00:00
/ * *
* Log a info - level message from this flow
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
2020-09-02 19:33:12 +01:00
info ( msg ) {
2019-01-16 16:27:19 +00:00
Log . log ( {
id : this . id || "global" ,
level : Log . INFO ,
type : this . TYPE ,
msg : msg
} )
}
2019-01-16 23:33:04 +00:00
/ * *
* Log a trace - level message from this flow
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-16 16:27:19 +00:00
trace ( msg ) {
Log . log ( {
id : this . id || "global" ,
level : Log . TRACE ,
type : this . TYPE ,
msg : msg
} )
}
2020-09-02 19:33:12 +01:00
/ * *
* [ log description ]
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
log ( msg ) {
if ( ! msg . path ) {
msg . path = this . path ;
}
this . parent . log ( msg ) ;
}
2019-01-16 16:27:19 +00:00
2019-01-16 23:33:04 +00:00
/ * *
* Start this flow .
* The ` diff ` argument helps define what needs to be started in the case
* of a modified - nodes / flows type deploy .
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-11 14:53:21 +00:00
start ( diff ) {
2020-01-17 16:53:01 +00:00
this . trace ( "start " + this . TYPE + " [" + this . path + "]" ) ;
2015-10-11 20:37:11 +01:00
var node ;
2015-11-24 22:38:42 +00:00
var newNode ;
2015-10-11 20:37:11 +01:00
var id ;
2019-01-16 16:27:19 +00:00
this . catchNodes = [ ] ;
this . statusNodes = [ ] ;
2019-07-08 23:23:33 +01:00
this . completeNodeMap = { } ;
2016-04-27 10:31:19 +01:00
2019-01-11 14:53:21 +00:00
var configNodes = Object . keys ( this . flow . configs ) ;
2016-04-27 10:31:19 +01:00
var configNodeAttempts = { } ;
2016-07-06 17:22:45 +01:00
while ( configNodes . length > 0 ) {
2016-04-27 10:31:19 +01:00
id = configNodes . shift ( ) ;
2019-01-11 14:53:21 +00:00
node = this . flow . configs [ id ] ;
if ( ! this . activeNodes [ id ] ) {
2019-06-17 22:46:34 +01:00
if ( node . d !== true ) {
var readyToCreate = true ;
// This node doesn't exist.
// Check it doesn't reference another non-existent config node
for ( var prop in node ) {
if ( node . hasOwnProperty ( prop ) &&
prop !== 'id' &&
prop !== 'wires' &&
prop !== '_users' &&
this . flow . configs [ node [ prop ] ] &&
this . flow . configs [ node [ prop ] ] . d !== true
) {
if ( ! this . activeNodes [ node [ prop ] ] ) {
// References a non-existent config node
// Add it to the back of the list to try again later
configNodes . push ( id ) ;
configNodeAttempts [ id ] = ( configNodeAttempts [ id ] || 0 ) + 1 ;
if ( configNodeAttempts [ id ] === 100 ) {
throw new Error ( "Circular config node dependency detected: " + id ) ;
}
readyToCreate = false ;
break ;
2016-04-27 10:31:19 +01:00
}
}
}
2019-06-17 22:46:34 +01:00
if ( readyToCreate ) {
newNode = flowUtil . createNode ( this , node ) ;
if ( newNode ) {
this . activeNodes [ id ] = newNode ;
}
2015-11-24 22:38:42 +00:00
}
2019-06-17 22:46:34 +01:00
} else {
this . debug ( "not starting disabled config node : " + id ) ;
2015-10-11 20:37:11 +01:00
}
}
2016-04-27 12:37:20 +01:00
}
2016-04-27 10:31:19 +01:00
2015-12-09 21:51:46 +00:00
if ( diff && diff . rewired ) {
2015-10-11 20:37:11 +01:00
for ( var j = 0 ; j < diff . rewired . length ; j ++ ) {
2019-01-11 14:53:21 +00:00
var rewireNode = this . activeNodes [ diff . rewired [ j ] ] ;
2015-10-11 20:37:11 +01:00
if ( rewireNode ) {
2019-01-11 14:53:21 +00:00
rewireNode . updateWires ( this . flow . nodes [ rewireNode . id ] . wires ) ;
2015-10-11 20:37:11 +01:00
}
}
}
2019-01-11 14:53:21 +00:00
for ( id in this . flow . nodes ) {
if ( this . flow . nodes . hasOwnProperty ( id ) ) {
node = this . flow . nodes [ id ] ;
2019-06-14 22:12:47 +01:00
if ( node . d !== true ) {
if ( ! node . subflow ) {
if ( ! this . activeNodes [ id ] ) {
newNode = flowUtil . createNode ( this , node ) ;
if ( newNode ) {
this . activeNodes [ id ] = newNode ;
}
2015-11-24 22:38:42 +00:00
}
2019-06-14 22:12:47 +01:00
} else {
if ( ! this . subflowInstanceNodes [ id ] ) {
try {
var subflowDefinition = this . flow . subflows [ node . subflow ] || this . global . subflows [ node . subflow ]
// console.log("NEED TO CREATE A SUBFLOW",id,node.subflow);
this . subflowInstanceNodes [ id ] = true ;
var subflow = Subflow . create (
this ,
this . global ,
subflowDefinition ,
node
) ;
this . subflowInstanceNodes [ id ] = subflow ;
subflow . start ( ) ;
this . activeNodes [ id ] = subflow . node ;
// this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
// for (var i=0;i<nodes.length;i++) {
// if (nodes[i]) {
// this.activeNodes[nodes[i].id] = nodes[i];
// }
// }
} catch ( err ) {
console . log ( err . stack )
}
2015-10-11 20:37:11 +01:00
}
}
2019-06-17 22:46:34 +01:00
} else {
this . debug ( "not starting disabled node : " + id ) ;
2015-10-11 20:37:11 +01:00
}
}
}
2019-01-16 16:27:19 +00:00
var activeCount = Object . keys ( this . activeNodes ) . length ;
if ( activeCount > 0 ) {
this . trace ( "------------------|--------------|-----------------" ) ;
this . trace ( " id | type | alias" ) ;
this . trace ( "------------------|--------------|-----------------" ) ;
}
2019-07-08 23:23:33 +01:00
// Build the map of catch/status/complete nodes.
2019-01-11 14:53:21 +00:00
for ( id in this . activeNodes ) {
if ( this . activeNodes . hasOwnProperty ( id ) ) {
node = this . activeNodes [ id ] ;
2020-01-17 16:53:01 +00:00
this . trace ( " " + id . padEnd ( 16 ) + " | " + node . type . padEnd ( 12 ) + " | " + ( node . _alias || "" ) + ( node . _zAlias ? " [zAlias:" + node . _zAlias + "]" : "" ) ) ;
2015-10-11 20:37:11 +01:00
if ( node . type === "catch" ) {
2019-01-16 16:27:19 +00:00
this . catchNodes . push ( node ) ;
2015-10-11 20:37:11 +01:00
} else if ( node . type === "status" ) {
2019-01-16 16:27:19 +00:00
this . statusNodes . push ( node ) ;
2019-07-08 23:23:33 +01:00
} else if ( node . type === "complete" ) {
if ( node . scope ) {
node . scope . forEach ( id => {
this . completeNodeMap [ id ] = this . completeNodeMap [ id ] || [ ] ;
this . completeNodeMap [ id ] . push ( node ) ;
} )
}
2015-10-11 20:37:11 +01:00
}
}
}
2019-02-05 14:28:20 +00:00
this . catchNodes . sort ( function ( A , B ) {
if ( A . scope && ! B . scope ) {
return - 1 ;
} else if ( ! A . scope && B . scope ) {
return 1 ;
} else if ( A . scope && B . scope ) {
return 0 ;
} else if ( A . uncaught && ! B . uncaught ) {
return 1 ;
} else if ( ! A . uncaught && B . uncaught ) {
return - 1 ;
}
return 0 ;
} ) ;
2019-01-16 16:27:19 +00:00
if ( activeCount > 0 ) {
this . trace ( "------------------|--------------|-----------------" ) ;
}
// this.dump();
2015-10-11 20:37:11 +01:00
}
2019-01-16 23:33:04 +00:00
/ * *
* Stop this flow .
* The ` stopList ` argument helps define what needs to be stopped in the case
* of a modified - nodes / flows type deploy .
* @ param { [ type ] } stopList [ description ]
* @ param { [ type ] } removedList [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-11 14:53:21 +00:00
stop ( stopList , removedList ) {
2019-01-17 13:18:26 +00:00
this . trace ( "stop " + this . TYPE ) ;
var i ;
if ( ! stopList ) {
stopList = Object . keys ( this . activeNodes ) ;
}
2019-03-15 09:13:32 +00:00
// this.trace(" stopList: "+stopList.join(","))
2019-01-17 13:18:26 +00:00
// Convert the list to a map to avoid multiple scans of the list
var removedMap = { } ;
removedList = removedList || [ ] ;
removedList . forEach ( function ( id ) {
removedMap [ id ] = true ;
} ) ;
2017-04-21 23:36:21 +01:00
2021-02-19 20:44:01 +00:00
let nodesToStop = [ ] ;
let configsToStop = [ ] ;
stopList . forEach ( id => {
if ( this . flow . configs [ id ] ) {
configsToStop . push ( id ) ;
} else {
nodesToStop . push ( id ) ;
}
} ) ;
stopList = nodesToStop . concat ( configsToStop ) ;
2021-03-31 23:10:54 +01:00
2019-01-17 13:18:26 +00:00
var promises = [ ] ;
for ( i = 0 ; i < stopList . length ; i ++ ) {
var node = this . activeNodes [ stopList [ i ] ] ;
if ( node ) {
delete this . activeNodes [ stopList [ i ] ] ;
if ( this . subflowInstanceNodes [ stopList [ i ] ] ) {
delete this . subflowInstanceNodes [ stopList [ i ] ] ;
2020-08-28 16:36:11 +01:00
}
try {
var removed = removedMap [ stopList [ i ] ] ;
promises . push ( stopNode ( node , removed ) . catch ( ( ) => { } ) ) ;
} catch ( err ) {
node . error ( err ) ;
2015-10-11 20:37:11 +01:00
}
2020-11-17 13:29:13 +00:00
if ( removedMap [ stopList [ i ] ] ) {
events . emit ( "node-status" , {
id : node . id
} ) ;
}
2015-10-11 20:37:11 +01:00
}
2019-01-17 13:18:26 +00:00
}
return Promise . all ( promises ) ;
2015-10-11 20:37:11 +01:00
}
2019-01-16 23:33:04 +00:00
/ * *
* Update the flow definition . This doesn ' t change anything that is running .
* This should be called after ` stop ` and before ` start ` .
* @ param { [ type ] } _global [ description ]
* @ param { [ type ] } _flow [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-11 14:53:21 +00:00
update ( _global , _flow ) {
this . global = _global ;
this . flow = _flow ;
2015-10-11 20:37:11 +01:00
}
2019-01-16 23:33:04 +00:00
/ * *
* Get a node instance from this flow . If the node is not known to this
* flow , pass the request up to the parent .
2019-01-29 21:49:20 +00:00
* @ param { String } id [ description ]
* @ param { Boolean } cancelBubble if true , prevents the flow from passing the request to the parent
* This stops infinite loops when the parent asked this Flow for the
* node to begin with .
2019-01-16 23:33:04 +00:00
* @ return { [ type ] } [ description ]
* /
2019-01-29 21:49:20 +00:00
getNode ( id , cancelBubble ) {
2019-01-16 16:27:19 +00:00
if ( ! id ) {
return undefined ;
}
// console.log((new Error().stack).toString().split("\n").slice(1,3).join("\n"))
2021-06-04 15:08:03 +09:00
if ( ( this . flow . configs && this . flow . configs [ id ] ) || ( this . flow . nodes && this . flow . nodes [ id ] && this . flow . nodes [ id ] . type . substring ( 0 , 8 ) != "subflow:" ) ) {
2019-01-16 16:27:19 +00:00
// This is a node owned by this flow, so return whatever we have got
// During a stop/restart, activeNodes could be null for this id
return this . activeNodes [ id ] ;
} else if ( this . activeNodes [ id ] ) {
2021-06-04 15:08:03 +09:00
// TEMP: this is a subflow internal node within this flow or subflow instance node
2019-01-16 16:27:19 +00:00
return this . activeNodes [ id ] ;
2021-03-31 23:10:54 +01:00
} else if ( this . subflowInstanceNodes [ id ] ) {
return this . subflowInstanceNodes [ id ] ;
2019-03-12 14:40:47 +00:00
} else if ( cancelBubble ) {
2019-03-12 14:25:36 +00:00
// The node could be inside one of this flow's subflows
var node ;
for ( var sfId in this . subflowInstanceNodes ) {
if ( this . subflowInstanceNodes . hasOwnProperty ( sfId ) ) {
node = this . subflowInstanceNodes [ sfId ] . getNode ( id , cancelBubble ) ;
if ( node ) {
return node ;
}
}
}
2019-03-12 14:40:47 +00:00
} else {
// Node not found inside this flow - ask the parent
2019-01-29 21:49:20 +00:00
return this . parent . getNode ( id ) ;
}
return undefined ;
2015-10-11 20:37:11 +01:00
}
2021-08-19 21:15:13 +09:00
/ * *
* Get a group node instance
* @ param { String } id
* @ return { Node } group node
* /
getGroupNode ( id ) {
const groups = this . global . groups ;
return groups [ id ] ;
}
2019-01-16 23:33:04 +00:00
/ * *
* Get all of the nodes instantiated within this flow
* @ return { [ type ] } [ description ]
* /
2019-01-11 14:53:21 +00:00
getActiveNodes ( ) {
return this . activeNodes ;
2015-10-11 20:37:11 +01:00
}
2021-08-30 08:00:58 +09:00
/ * !
* Get value of environment variable defined in group node .
* @ param { String } group - group node
* @ param { String } name - name of variable
* @ return { Object } object containing the value in val property or null if not defined
* /
getGroupEnvSetting ( node , group , name ) {
if ( group ) {
2022-01-26 09:25:30 +09:00
if ( name === "NR_GROUP_NAME" ) {
2022-01-25 21:04:43 +09:00
return [ {
val : group . name
} , null ] ;
}
2022-01-26 09:25:30 +09:00
if ( name === "NR_GROUP_ID" ) {
2022-01-25 21:04:43 +09:00
return [ {
val : group . id
} , null ] ;
}
2021-08-30 08:00:58 +09:00
if ( group . credentials === undefined ) {
group . credentials = credentials . get ( group . id ) || { } ;
}
if ( ! name . startsWith ( "$parent." ) ) {
if ( group . env ) {
if ( ! group . _env ) {
const envs = group . env ;
const entries = envs . map ( ( env ) => {
if ( env . type === "cred" ) {
const cred = group . credentials ;
if ( cred . hasOwnProperty ( env . name ) ) {
env . value = cred [ env . name ] ;
}
}
return [ env . name , env ] ;
} ) ;
group . _env = Object . fromEntries ( entries ) ;
}
const env = group . _env [ name ] ;
if ( env ) {
let value = env . value ;
const type = env . type ;
if ( ( type !== "env" ) ||
( value !== name ) ) {
if ( type === "env" ) {
value = value . replace ( new RegExp ( "\\${" + name + "}" , "g" ) , "${$parent." + name + "}" ) ;
}
if ( type === "bool" ) {
const val
= ( ( value === "true" ) ||
( value === true ) ) ;
2021-11-27 19:29:57 +09:00
return [ {
2021-08-30 08:00:58 +09:00
val : val
2021-11-27 19:29:57 +09:00
} , null ] ;
2021-08-30 08:00:58 +09:00
}
if ( type === "cred" ) {
2021-11-27 19:29:57 +09:00
return [ {
2021-08-30 08:00:58 +09:00
val : value
2021-11-27 19:29:57 +09:00
} , null ] ;
2021-08-30 08:00:58 +09:00
}
try {
var val = redUtil . evaluateNodeProperty ( value , type , node , null , null ) ;
2021-11-27 19:29:57 +09:00
return [ {
2021-08-30 08:00:58 +09:00
val : val
2021-11-27 19:29:57 +09:00
} , null ] ;
2021-08-30 08:00:58 +09:00
}
catch ( e ) {
this . error ( e ) ;
2021-11-27 19:29:57 +09:00
return [ null , null ] ;
2021-08-30 08:00:58 +09:00
}
}
}
}
}
else {
name = name . substring ( 8 ) ;
}
if ( group . g ) {
const parent = this . getGroupNode ( group . g ) ;
return this . getGroupEnvSetting ( node , parent , name ) ;
}
}
2021-11-27 19:29:57 +09:00
return [ null , name ] ;
2021-08-30 08:00:58 +09:00
}
2019-01-16 23:33:04 +00:00
/ * *
* Get a flow setting value . This currently automatically defers to the parent
* flow which , as defined in . / index . js returns ` process.env[key] ` .
* This lays the groundwork for Subflow to have instance - specific settings
* @ param { [ type ] } key [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-16 22:38:04 +00:00
getSetting ( key ) {
2022-01-25 20:46:01 +09:00
const flow = this . flow ;
2022-01-26 09:25:30 +09:00
if ( key === "NR_FLOW_NAME" ) {
2022-01-25 20:46:01 +09:00
return flow . label ;
}
2022-01-26 09:25:30 +09:00
if ( key === "NR_FLOW_ID" ) {
2022-01-25 20:46:01 +09:00
return flow . id ;
}
2021-09-01 22:26:31 +09:00
if ( flow . credentials === undefined ) {
flow . credentials = credentials . get ( flow . id ) || { } ;
}
if ( flow . env ) {
if ( ! key . startsWith ( "$parent." ) ) {
if ( ! flow . _env ) {
const envs = flow . env ;
const entries = envs . map ( ( env ) => {
if ( env . type === "cred" ) {
const cred = flow . credentials ;
if ( cred . hasOwnProperty ( env . name ) ) {
env . value = cred [ env . name ] ;
}
}
return [ env . name , env ]
} ) ;
flow . _env = Object . fromEntries ( entries ) ;
}
const env = flow . _env [ key ] ;
if ( env ) {
let value = env . value ;
const type = env . type ;
if ( ( type !== "env" ) || ( value !== key ) ) {
if ( type === "env" ) {
value = value . replace ( new RegExp ( "\\${" + key + "}" , "g" ) , "${$parent." + key + "}" ) ;
}
try {
if ( type === "bool" ) {
const val = ( ( value === "true" ) ||
( value === true ) ) ;
return val ;
}
if ( type === "cred" ) {
return value ;
}
var val = redUtil . evaluateNodeProperty ( value , type , null , null , null ) ;
return val ;
}
catch ( e ) {
this . error ( e ) ;
}
}
}
}
2021-11-27 19:29:57 +09:00
else {
key = key . substring ( 8 ) ;
}
2021-08-19 21:15:13 +09:00
}
2019-01-16 22:38:04 +00:00
return this . parent . getSetting ( key ) ;
}
2019-01-16 23:33:04 +00:00
/ * *
2019-01-25 13:35:02 +00:00
* Handle a status event from a node within this flow .
2019-01-25 15:46:39 +00:00
* @ param { Node } node The original node that triggered the event
* @ param { Object } statusMessage The status object
* @ param { Node } reportingNode The node emitting the status event .
* This could be a subflow instance node when the status
* is being delegated up .
* @ param { boolean } muteStatusEvent Whether to emit the status event
* @ return { [ type ] } [ description ]
2019-01-16 23:33:04 +00:00
* /
2019-01-25 15:46:39 +00:00
handleStatus ( node , statusMessage , reportingNode , muteStatusEvent ) {
2019-01-25 13:35:02 +00:00
if ( ! reportingNode ) {
reportingNode = node ;
}
2019-01-25 15:46:39 +00:00
if ( ! muteStatusEvent ) {
2021-02-05 11:36:26 +00:00
if ( statusMessage . hasOwnProperty ( "text" ) && typeof ( statusMessage . text !== "string" ) ) {
try {
statusMessage . text = statusMessage . text . toString ( ) ;
}
catch ( e ) { }
}
2019-01-25 13:35:02 +00:00
events . emit ( "node-status" , {
id : node . id ,
status : statusMessage
} ) ;
}
let handled = false ;
2019-01-16 16:27:19 +00:00
2019-01-25 15:46:39 +00:00
if ( this . id === 'global' && node . users ) {
// This is a global config node
// Delegate status to any nodes using this config node
for ( let userNode in node . users ) {
if ( node . users . hasOwnProperty ( userNode ) ) {
node . users [ userNode ] . _flow . handleStatus ( node , statusMessage , node . users [ userNode ] , true ) ;
2019-01-16 16:27:19 +00:00
}
2015-10-11 20:37:11 +01:00
}
2019-01-16 16:27:19 +00:00
handled = true ;
2019-01-25 15:46:39 +00:00
} else {
this . statusNodes . forEach ( function ( targetStatusNode ) {
if ( targetStatusNode . scope && targetStatusNode . scope . indexOf ( reportingNode . id ) === - 1 ) {
return ;
}
var message = {
2019-03-04 10:23:10 +00:00
status : clone ( statusMessage )
}
2019-01-25 15:46:39 +00:00
if ( statusMessage . hasOwnProperty ( "text" ) ) {
message . status . text = statusMessage . text . toString ( ) ;
}
2019-03-04 10:23:10 +00:00
message . status . source = {
id : node . id ,
type : node . type ,
name : node . name
}
2019-01-25 15:46:39 +00:00
targetStatusNode . receive ( message ) ;
handled = true ;
} ) ;
}
2019-01-25 13:35:02 +00:00
return handled ;
2015-10-11 20:37:11 +01:00
}
2019-01-16 23:33:04 +00:00
/ * *
* Handle an error event from a node within this flow . If there are no Catch
* nodes within this flow , pass the event to the parent flow .
2019-01-25 13:35:02 +00:00
* @ param { [ type ] } node [ description ]
* @ param { [ type ] } logMessage [ description ]
* @ param { [ type ] } msg [ description ]
* @ param { [ type ] } reportingNode [ description ]
* @ return { [ type ] } [ description ]
2019-01-16 23:33:04 +00:00
* /
2019-01-25 13:35:02 +00:00
handleError ( node , logMessage , msg , reportingNode ) {
if ( ! reportingNode ) {
reportingNode = node ;
}
2019-01-16 16:27:19 +00:00
// console.log("HE",logMessage);
2015-10-11 20:37:11 +01:00
var count = 1 ;
2020-12-14 18:18:50 +08:00
if ( msg && msg . hasOwnProperty ( "error" ) && msg . error ) {
if ( msg . error . hasOwnProperty ( "source" ) && msg . error . source ) {
2015-10-11 20:37:11 +01:00
if ( msg . error . source . id === node . id ) {
count = msg . error . source . count + 1 ;
if ( count === 10 ) {
node . warn ( Log . _ ( "nodes.flow.error-loop" ) ) ;
2017-05-03 13:42:38 +01:00
return false ;
2015-10-11 20:37:11 +01:00
}
}
}
}
2019-01-25 15:46:39 +00:00
let handled = false ;
if ( this . id === 'global' && node . users ) {
// This is a global config node
// Delegate status to any nodes using this config node
for ( let userNode in node . users ) {
if ( node . users . hasOwnProperty ( userNode ) ) {
node . users [ userNode ] . _flow . handleError ( node , logMessage , msg , node . users [ userNode ] ) ;
2015-10-11 20:37:11 +01:00
}
}
2019-01-16 16:27:19 +00:00
handled = true ;
2019-01-25 15:46:39 +00:00
} else {
2019-02-05 14:28:20 +00:00
var handledByUncaught = false ;
2019-01-25 15:46:39 +00:00
this . catchNodes . forEach ( function ( targetCatchNode ) {
if ( targetCatchNode . scope && targetCatchNode . scope . indexOf ( reportingNode . id ) === - 1 ) {
return ;
}
2019-02-05 14:28:20 +00:00
if ( ! targetCatchNode . scope && targetCatchNode . uncaught && ! handledByUncaught ) {
if ( handled ) {
// This has been handled by a !uncaught catch node
return ;
}
// This is an uncaught error
handledByUncaught = true ;
}
2019-01-25 15:46:39 +00:00
var errorMessage ;
if ( msg ) {
errorMessage = redUtil . cloneMessage ( msg ) ;
} else {
errorMessage = { } ;
}
if ( errorMessage . hasOwnProperty ( "error" ) ) {
errorMessage . _error = errorMessage . error ;
}
errorMessage . error = {
message : logMessage . toString ( ) ,
source : {
id : node . id ,
type : node . type ,
name : node . name ,
count : count
}
} ;
if ( logMessage . hasOwnProperty ( 'stack' ) ) {
errorMessage . error . stack = logMessage . stack ;
}
targetCatchNode . receive ( errorMessage ) ;
handled = true ;
} ) ;
}
2019-01-16 16:27:19 +00:00
return handled ;
2015-10-11 20:37:11 +01:00
}
2019-07-08 23:23:33 +01:00
handleComplete ( node , msg ) {
if ( this . completeNodeMap [ node . id ] ) {
let toSend = msg ;
this . completeNodeMap [ node . id ] . forEach ( ( completeNode , index ) => {
toSend = redUtil . cloneMessage ( msg ) ;
completeNode . receive ( toSend ) ;
} )
}
}
2020-07-30 17:52:39 +01:00
send ( sendEvents ) {
// onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object.
// preRoute - called once for each SendEvent object in turn
// preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed.
// postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery)
// onReceive - a node is about to receive a message
// postReceive - the message has been passed to the node's input handler
// onDone, onError - the node has completed with a message or logged an error
handleOnSend ( this , sendEvents , ( err , eventData ) => {
if ( err ) {
let srcNode ;
if ( Array . isArray ( eventData ) ) {
srcNode = eventData [ 0 ] . source . node ;
} else {
srcNode = eventData . source . node ;
}
srcNode . error ( err ) ;
}
} ) ;
2020-07-20 16:48:47 +01:00
}
2019-01-16 16:27:19 +00:00
dump ( ) {
console . log ( "==================" )
console . log ( this . TYPE , this . id ) ;
for ( var id in this . activeNodes ) {
if ( this . activeNodes . hasOwnProperty ( id ) ) {
var node = this . activeNodes [ id ] ;
console . log ( " " , id . padEnd ( 16 ) , node . type )
if ( node . wires ) {
console . log ( " -> " , node . wires )
2015-10-11 20:37:11 +01:00
}
}
}
2019-01-16 16:27:19 +00:00
console . log ( "==================" )
2015-10-11 20:37:11 +01:00
}
2019-01-26 23:15:20 +09:00
2015-10-11 20:37:11 +01:00
}
2019-01-16 23:33:04 +00:00
/ * *
* Stop an individual node within this flow .
*
* @ param { [ type ] } node [ description ]
* @ param { [ type ] } removed [ description ]
* @ return { [ type ] } [ description ]
* /
function stopNode ( node , removed ) {
2019-01-17 13:18:26 +00:00
Log . trace ( "Stopping node " + node . type + ":" + node . id + ( removed ? " removed" : "" ) ) ;
const start = Date . now ( ) ;
const closePromise = node . close ( removed ) ;
2020-03-31 19:25:20 +01:00
let closeTimer = null ;
2019-01-17 13:18:26 +00:00
const closeTimeout = new Promise ( ( resolve , reject ) => {
2020-04-02 23:24:18 +01:00
closeTimer = setTimeout ( ( ) => {
2019-01-17 13:18:26 +00:00
reject ( "Close timed out" ) ;
} , nodeCloseTimeout ) ;
} ) ;
return Promise . race ( [ closePromise , closeTimeout ] ) . then ( ( ) => {
2020-03-31 19:25:20 +01:00
clearTimeout ( closeTimer ) ;
2019-01-17 13:18:26 +00:00
var delta = Date . now ( ) - start ;
Log . trace ( "Stopped node " + node . type + ":" + node . id + " (" + delta + "ms)" ) ;
} ) . catch ( err => {
2020-03-31 19:25:20 +01:00
clearTimeout ( closeTimer ) ;
2019-01-17 13:18:26 +00:00
node . error ( Log . _ ( "nodes.flows.stopping-error" , { message : err } ) ) ;
Log . debug ( err . stack ) ;
2019-01-16 23:33:04 +00:00
} )
}
2020-07-30 17:52:39 +01:00
function handleOnSend ( flow , sendEvents , reportError ) {
// onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object.
hooks . trigger ( "onSend" , sendEvents , ( err ) => {
if ( err ) {
reportError ( err , sendEvents ) ;
return
} else if ( err !== false ) {
for ( var i = 0 ; i < sendEvents . length ; i ++ ) {
handlePreRoute ( flow , sendEvents [ i ] , reportError )
}
}
} ) ;
}
function handlePreRoute ( flow , sendEvent , reportError ) {
// preRoute - called once for each SendEvent object in turn
hooks . trigger ( "preRoute" , sendEvent , ( err ) => {
if ( err ) {
reportError ( err , sendEvent ) ;
return ;
} else if ( err !== false ) {
sendEvent . destination . node = flow . getNode ( sendEvent . destination . id ) ;
if ( sendEvent . destination . node ) {
if ( sendEvent . cloneMessage ) {
sendEvent . msg = redUtil . cloneMessage ( sendEvent . msg ) ;
}
handlePreDeliver ( flow , sendEvent , reportError ) ;
}
}
} )
}
function handlePreDeliver ( flow , sendEvent , reportError ) {
// preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed.
hooks . trigger ( "preDeliver" , sendEvent , ( err ) => {
if ( err ) {
reportError ( err , sendEvent ) ;
return ;
} else if ( err !== false ) {
2020-09-29 17:39:29 +01:00
if ( asyncMessageDelivery ) {
setImmediate ( function ( ) {
if ( sendEvent . destination . node ) {
sendEvent . destination . node . receive ( sendEvent . msg ) ;
}
} )
} else {
2020-07-30 17:52:39 +01:00
if ( sendEvent . destination . node ) {
sendEvent . destination . node . receive ( sendEvent . msg ) ;
}
2020-09-29 17:39:29 +01:00
}
2020-07-30 17:52:39 +01:00
// postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery)
hooks . trigger ( "postDeliver" , sendEvent , function ( err ) {
if ( err ) {
reportError ( err , sendEvent ) ;
}
} )
}
} )
}
2015-10-11 20:37:11 +01:00
module . exports = {
2018-04-23 14:24:51 +01:00
init : function ( runtime ) {
nodeCloseTimeout = runtime . settings . nodeCloseTimeout || 15000 ;
2019-07-08 23:23:33 +01:00
asyncMessageDelivery = ! runtime . settings . runtimeSyncDelivery
2018-04-23 14:24:51 +01:00
Log = runtime . log ;
2019-01-16 16:27:19 +00:00
Subflow = require ( "./Subflow" ) ;
2017-05-15 13:05:33 +01:00
} ,
2019-01-16 16:27:19 +00:00
create : function ( parent , global , conf ) {
return new Flow ( parent , global , conf ) ;
} ,
Flow : Flow
2015-10-11 20:37:11 +01:00
}