2015-10-11 21:37:11 +02:00
/ * *
2017-01-11 16:24:33 +01:00
* Copyright JS Foundation and other contributors , http : //js.foundation
2015-10-11 21:37:11 +02:00
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* * /
var clone = require ( "clone" ) ;
2018-08-17 23:10:54 +02:00
var redUtil = require ( "@node-red/util" ) . util ;
2020-12-02 10:25:10 +01:00
const events = require ( "@node-red/util" ) . events ;
2015-10-11 21:37:11 +02:00
var flowUtil = require ( "./util" ) ;
2020-07-20 17:48:47 +02:00
const context = require ( '../nodes/context' ) ;
2020-07-30 18:52:39 +02:00
const hooks = require ( "../hooks" ) ;
2015-10-11 21:37:11 +02:00
2019-01-17 14:18:26 +01:00
var Subflow ;
var Log ;
2017-05-15 14:05:33 +02:00
var nodeCloseTimeout = 15000 ;
2019-07-09 00:23:33 +02:00
var asyncMessageDelivery = true ;
2017-05-15 14:05:33 +02:00
2019-01-17 00:33:04 +01:00
/ * *
* This class represents a flow within the runtime . It is responsible for
* creating , starting and stopping all nodes within the flow .
* /
2019-01-11 15:53:21 +01:00
class Flow {
2019-01-17 00:33:04 +01:00
/ * *
* Create a Flow object .
* @ param { [ type ] } parent The parent flow
* @ param { [ type ] } globalFlow The global flow definition
* @ param { [ type ] } flow This flow ' s definition
* /
2019-01-16 17:27:19 +01:00
constructor ( parent , globalFlow , flow ) {
this . TYPE = 'flow' ;
this . parent = parent ;
this . global = globalFlow ;
2019-01-11 15:53:21 +01:00
if ( typeof flow === 'undefined' ) {
2019-01-16 17:27:19 +01:00
this . flow = globalFlow ;
this . isGlobalFlow = true ;
2019-01-11 15:53:21 +01:00
} else {
this . flow = flow ;
2019-01-16 17:27:19 +01:00
this . isGlobalFlow = false ;
2019-01-11 15:53:21 +01:00
}
2019-01-16 17:27:19 +01:00
this . id = this . flow . id || "global" ;
2019-01-11 15:53:21 +01:00
this . activeNodes = { } ;
this . subflowInstanceNodes = { } ;
2019-01-16 17:27:19 +01:00
this . catchNodes = [ ] ;
this . statusNodes = [ ] ;
2020-01-17 17:53:01 +01:00
this . path = this . id ;
2020-03-28 00:47:12 +01:00
// Ensure a context exists for this flow
this . context = context . getFlowContext ( this . id , this . parent . id ) ;
2015-10-11 21:37:11 +02:00
}
2019-01-17 00:33:04 +01:00
/ * *
* Log a debug - level message from this flow
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-16 17:27:19 +01:00
debug ( msg ) {
Log . log ( {
id : this . id || "global" ,
level : Log . DEBUG ,
type : this . TYPE ,
msg : msg
} )
}
2019-03-20 14:37:33 +01:00
/ * *
* Log an error - level message from this flow
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
error ( msg ) {
Log . log ( {
id : this . id || "global" ,
level : Log . ERROR ,
type : this . TYPE ,
msg : msg
} )
}
2019-01-17 00:33:04 +01:00
/ * *
* Log a info - level message from this flow
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
2020-09-02 20:33:12 +02:00
info ( msg ) {
2019-01-16 17:27:19 +01:00
Log . log ( {
id : this . id || "global" ,
level : Log . INFO ,
type : this . TYPE ,
msg : msg
} )
}
2019-01-17 00:33:04 +01:00
/ * *
* Log a trace - level message from this flow
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-16 17:27:19 +01:00
trace ( msg ) {
Log . log ( {
id : this . id || "global" ,
level : Log . TRACE ,
type : this . TYPE ,
msg : msg
} )
}
2020-09-02 20:33:12 +02:00
/ * *
* [ log description ]
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
log ( msg ) {
if ( ! msg . path ) {
msg . path = this . path ;
}
this . parent . log ( msg ) ;
}
2019-01-16 17:27:19 +01:00
2019-01-17 00:33:04 +01:00
/ * *
* Start this flow .
* The ` diff ` argument helps define what needs to be started in the case
* of a modified - nodes / flows type deploy .
* @ param { [ type ] } msg [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-11 15:53:21 +01:00
start ( diff ) {
2020-01-17 17:53:01 +01:00
this . trace ( "start " + this . TYPE + " [" + this . path + "]" ) ;
2015-10-11 21:37:11 +02:00
var node ;
2015-11-24 23:38:42 +01:00
var newNode ;
2015-10-11 21:37:11 +02:00
var id ;
2019-01-16 17:27:19 +01:00
this . catchNodes = [ ] ;
this . statusNodes = [ ] ;
2019-07-09 00:23:33 +02:00
this . completeNodeMap = { } ;
2016-04-27 11:31:19 +02:00
2019-01-11 15:53:21 +01:00
var configNodes = Object . keys ( this . flow . configs ) ;
2016-04-27 11:31:19 +02:00
var configNodeAttempts = { } ;
2016-07-06 18:22:45 +02:00
while ( configNodes . length > 0 ) {
2016-04-27 11:31:19 +02:00
id = configNodes . shift ( ) ;
2019-01-11 15:53:21 +01:00
node = this . flow . configs [ id ] ;
if ( ! this . activeNodes [ id ] ) {
2019-06-17 23:46:34 +02:00
if ( node . d !== true ) {
var readyToCreate = true ;
// This node doesn't exist.
// Check it doesn't reference another non-existent config node
for ( var prop in node ) {
if ( node . hasOwnProperty ( prop ) &&
prop !== 'id' &&
prop !== 'wires' &&
prop !== '_users' &&
this . flow . configs [ node [ prop ] ] &&
this . flow . configs [ node [ prop ] ] . d !== true
) {
if ( ! this . activeNodes [ node [ prop ] ] ) {
// References a non-existent config node
// Add it to the back of the list to try again later
configNodes . push ( id ) ;
configNodeAttempts [ id ] = ( configNodeAttempts [ id ] || 0 ) + 1 ;
if ( configNodeAttempts [ id ] === 100 ) {
throw new Error ( "Circular config node dependency detected: " + id ) ;
}
readyToCreate = false ;
break ;
2016-04-27 11:31:19 +02:00
}
}
}
2019-06-17 23:46:34 +02:00
if ( readyToCreate ) {
newNode = flowUtil . createNode ( this , node ) ;
if ( newNode ) {
this . activeNodes [ id ] = newNode ;
}
2015-11-24 23:38:42 +01:00
}
2019-06-17 23:46:34 +02:00
} else {
this . debug ( "not starting disabled config node : " + id ) ;
2015-10-11 21:37:11 +02:00
}
}
2016-04-27 13:37:20 +02:00
}
2016-04-27 11:31:19 +02:00
2015-12-09 22:51:46 +01:00
if ( diff && diff . rewired ) {
2015-10-11 21:37:11 +02:00
for ( var j = 0 ; j < diff . rewired . length ; j ++ ) {
2019-01-11 15:53:21 +01:00
var rewireNode = this . activeNodes [ diff . rewired [ j ] ] ;
2015-10-11 21:37:11 +02:00
if ( rewireNode ) {
2019-01-11 15:53:21 +01:00
rewireNode . updateWires ( this . flow . nodes [ rewireNode . id ] . wires ) ;
2015-10-11 21:37:11 +02:00
}
}
}
2019-01-11 15:53:21 +01:00
for ( id in this . flow . nodes ) {
if ( this . flow . nodes . hasOwnProperty ( id ) ) {
node = this . flow . nodes [ id ] ;
2019-06-14 23:12:47 +02:00
if ( node . d !== true ) {
if ( ! node . subflow ) {
if ( ! this . activeNodes [ id ] ) {
newNode = flowUtil . createNode ( this , node ) ;
if ( newNode ) {
this . activeNodes [ id ] = newNode ;
}
2015-11-24 23:38:42 +01:00
}
2019-06-14 23:12:47 +02:00
} else {
if ( ! this . subflowInstanceNodes [ id ] ) {
try {
var subflowDefinition = this . flow . subflows [ node . subflow ] || this . global . subflows [ node . subflow ]
// console.log("NEED TO CREATE A SUBFLOW",id,node.subflow);
this . subflowInstanceNodes [ id ] = true ;
var subflow = Subflow . create (
this ,
this . global ,
subflowDefinition ,
node
) ;
this . subflowInstanceNodes [ id ] = subflow ;
subflow . start ( ) ;
this . activeNodes [ id ] = subflow . node ;
// this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
// for (var i=0;i<nodes.length;i++) {
// if (nodes[i]) {
// this.activeNodes[nodes[i].id] = nodes[i];
// }
// }
} catch ( err ) {
console . log ( err . stack )
}
2015-10-11 21:37:11 +02:00
}
}
2019-06-17 23:46:34 +02:00
} else {
this . debug ( "not starting disabled node : " + id ) ;
2015-10-11 21:37:11 +02:00
}
}
}
2019-01-16 17:27:19 +01:00
var activeCount = Object . keys ( this . activeNodes ) . length ;
if ( activeCount > 0 ) {
this . trace ( "------------------|--------------|-----------------" ) ;
this . trace ( " id | type | alias" ) ;
this . trace ( "------------------|--------------|-----------------" ) ;
}
2019-07-09 00:23:33 +02:00
// Build the map of catch/status/complete nodes.
2019-01-11 15:53:21 +01:00
for ( id in this . activeNodes ) {
if ( this . activeNodes . hasOwnProperty ( id ) ) {
node = this . activeNodes [ id ] ;
2020-01-17 17:53:01 +01:00
this . trace ( " " + id . padEnd ( 16 ) + " | " + node . type . padEnd ( 12 ) + " | " + ( node . _alias || "" ) + ( node . _zAlias ? " [zAlias:" + node . _zAlias + "]" : "" ) ) ;
2015-10-11 21:37:11 +02:00
if ( node . type === "catch" ) {
2019-01-16 17:27:19 +01:00
this . catchNodes . push ( node ) ;
2015-10-11 21:37:11 +02:00
} else if ( node . type === "status" ) {
2019-01-16 17:27:19 +01:00
this . statusNodes . push ( node ) ;
2019-07-09 00:23:33 +02:00
} else if ( node . type === "complete" ) {
if ( node . scope ) {
node . scope . forEach ( id => {
this . completeNodeMap [ id ] = this . completeNodeMap [ id ] || [ ] ;
this . completeNodeMap [ id ] . push ( node ) ;
} )
}
2015-10-11 21:37:11 +02:00
}
}
}
2019-02-05 15:28:20 +01:00
this . catchNodes . sort ( function ( A , B ) {
if ( A . scope && ! B . scope ) {
return - 1 ;
} else if ( ! A . scope && B . scope ) {
return 1 ;
} else if ( A . scope && B . scope ) {
return 0 ;
} else if ( A . uncaught && ! B . uncaught ) {
return 1 ;
} else if ( ! A . uncaught && B . uncaught ) {
return - 1 ;
}
return 0 ;
} ) ;
2019-01-16 17:27:19 +01:00
if ( activeCount > 0 ) {
this . trace ( "------------------|--------------|-----------------" ) ;
}
// this.dump();
2015-10-11 21:37:11 +02:00
}
2019-01-17 00:33:04 +01:00
/ * *
* Stop this flow .
* The ` stopList ` argument helps define what needs to be stopped in the case
* of a modified - nodes / flows type deploy .
* @ param { [ type ] } stopList [ description ]
* @ param { [ type ] } removedList [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-11 15:53:21 +01:00
stop ( stopList , removedList ) {
2019-01-17 14:18:26 +01:00
this . trace ( "stop " + this . TYPE ) ;
var i ;
if ( ! stopList ) {
stopList = Object . keys ( this . activeNodes ) ;
}
2019-03-15 10:13:32 +01:00
// this.trace(" stopList: "+stopList.join(","))
2019-01-17 14:18:26 +01:00
// Convert the list to a map to avoid multiple scans of the list
var removedMap = { } ;
removedList = removedList || [ ] ;
removedList . forEach ( function ( id ) {
removedMap [ id ] = true ;
} ) ;
2017-04-22 00:36:21 +02:00
2021-02-19 21:44:01 +01:00
let nodesToStop = [ ] ;
let configsToStop = [ ] ;
stopList . forEach ( id => {
if ( this . flow . configs [ id ] ) {
configsToStop . push ( id ) ;
} else {
nodesToStop . push ( id ) ;
}
} ) ;
stopList = nodesToStop . concat ( configsToStop ) ;
2021-04-01 00:10:54 +02:00
2019-01-17 14:18:26 +01:00
var promises = [ ] ;
for ( i = 0 ; i < stopList . length ; i ++ ) {
var node = this . activeNodes [ stopList [ i ] ] ;
if ( node ) {
delete this . activeNodes [ stopList [ i ] ] ;
if ( this . subflowInstanceNodes [ stopList [ i ] ] ) {
delete this . subflowInstanceNodes [ stopList [ i ] ] ;
2020-08-28 17:36:11 +02:00
}
try {
var removed = removedMap [ stopList [ i ] ] ;
promises . push ( stopNode ( node , removed ) . catch ( ( ) => { } ) ) ;
} catch ( err ) {
node . error ( err ) ;
2015-10-11 21:37:11 +02:00
}
2020-11-17 14:29:13 +01:00
if ( removedMap [ stopList [ i ] ] ) {
events . emit ( "node-status" , {
id : node . id
} ) ;
}
2015-10-11 21:37:11 +02:00
}
2019-01-17 14:18:26 +01:00
}
return Promise . all ( promises ) ;
2015-10-11 21:37:11 +02:00
}
2019-01-17 00:33:04 +01:00
/ * *
* Update the flow definition . This doesn ' t change anything that is running .
* This should be called after ` stop ` and before ` start ` .
* @ param { [ type ] } _global [ description ]
* @ param { [ type ] } _flow [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-11 15:53:21 +01:00
update ( _global , _flow ) {
this . global = _global ;
this . flow = _flow ;
2015-10-11 21:37:11 +02:00
}
2019-01-17 00:33:04 +01:00
/ * *
* Get a node instance from this flow . If the node is not known to this
* flow , pass the request up to the parent .
2019-01-29 22:49:20 +01:00
* @ param { String } id [ description ]
* @ param { Boolean } cancelBubble if true , prevents the flow from passing the request to the parent
* This stops infinite loops when the parent asked this Flow for the
* node to begin with .
2019-01-17 00:33:04 +01:00
* @ return { [ type ] } [ description ]
* /
2019-01-29 22:49:20 +01:00
getNode ( id , cancelBubble ) {
2019-01-16 17:27:19 +01:00
if ( ! id ) {
return undefined ;
}
// console.log((new Error().stack).toString().split("\n").slice(1,3).join("\n"))
if ( ( this . flow . configs && this . flow . configs [ id ] ) || ( this . flow . nodes && this . flow . nodes [ id ] ) ) {
// This is a node owned by this flow, so return whatever we have got
// During a stop/restart, activeNodes could be null for this id
return this . activeNodes [ id ] ;
} else if ( this . activeNodes [ id ] ) {
// TEMP: this is a subflow internal node within this flow
return this . activeNodes [ id ] ;
2021-04-01 00:10:54 +02:00
} else if ( this . subflowInstanceNodes [ id ] ) {
return this . subflowInstanceNodes [ id ] ;
2019-03-12 15:40:47 +01:00
} else if ( cancelBubble ) {
2019-03-12 15:25:36 +01:00
// The node could be inside one of this flow's subflows
var node ;
for ( var sfId in this . subflowInstanceNodes ) {
if ( this . subflowInstanceNodes . hasOwnProperty ( sfId ) ) {
node = this . subflowInstanceNodes [ sfId ] . getNode ( id , cancelBubble ) ;
if ( node ) {
return node ;
}
}
}
2019-03-12 15:40:47 +01:00
} else {
// Node not found inside this flow - ask the parent
2019-01-29 22:49:20 +01:00
return this . parent . getNode ( id ) ;
}
return undefined ;
2015-10-11 21:37:11 +02:00
}
2019-01-17 00:33:04 +01:00
/ * *
* Get all of the nodes instantiated within this flow
* @ return { [ type ] } [ description ]
* /
2019-01-11 15:53:21 +01:00
getActiveNodes ( ) {
return this . activeNodes ;
2015-10-11 21:37:11 +02:00
}
2019-01-17 00:33:04 +01:00
/ * *
* Get a flow setting value . This currently automatically defers to the parent
* flow which , as defined in . / index . js returns ` process.env[key] ` .
* This lays the groundwork for Subflow to have instance - specific settings
* @ param { [ type ] } key [ description ]
* @ return { [ type ] } [ description ]
* /
2019-01-16 23:38:04 +01:00
getSetting ( key ) {
return this . parent . getSetting ( key ) ;
}
2019-01-17 00:33:04 +01:00
/ * *
2019-01-25 14:35:02 +01:00
* Handle a status event from a node within this flow .
2019-01-25 16:46:39 +01:00
* @ param { Node } node The original node that triggered the event
* @ param { Object } statusMessage The status object
* @ param { Node } reportingNode The node emitting the status event .
* This could be a subflow instance node when the status
* is being delegated up .
* @ param { boolean } muteStatusEvent Whether to emit the status event
* @ return { [ type ] } [ description ]
2019-01-17 00:33:04 +01:00
* /
2019-01-25 16:46:39 +01:00
handleStatus ( node , statusMessage , reportingNode , muteStatusEvent ) {
2019-01-25 14:35:02 +01:00
if ( ! reportingNode ) {
reportingNode = node ;
}
2019-01-25 16:46:39 +01:00
if ( ! muteStatusEvent ) {
2021-02-05 12:36:26 +01:00
if ( statusMessage . hasOwnProperty ( "text" ) && typeof ( statusMessage . text !== "string" ) ) {
try {
statusMessage . text = statusMessage . text . toString ( ) ;
}
catch ( e ) { }
}
2019-01-25 14:35:02 +01:00
events . emit ( "node-status" , {
id : node . id ,
status : statusMessage
} ) ;
}
let handled = false ;
2019-01-16 17:27:19 +01:00
2019-01-25 16:46:39 +01:00
if ( this . id === 'global' && node . users ) {
// This is a global config node
// Delegate status to any nodes using this config node
for ( let userNode in node . users ) {
if ( node . users . hasOwnProperty ( userNode ) ) {
node . users [ userNode ] . _flow . handleStatus ( node , statusMessage , node . users [ userNode ] , true ) ;
2019-01-16 17:27:19 +01:00
}
2015-10-11 21:37:11 +02:00
}
2019-01-16 17:27:19 +01:00
handled = true ;
2019-01-25 16:46:39 +01:00
} else {
this . statusNodes . forEach ( function ( targetStatusNode ) {
if ( targetStatusNode . scope && targetStatusNode . scope . indexOf ( reportingNode . id ) === - 1 ) {
return ;
}
var message = {
2019-03-04 11:23:10 +01:00
status : clone ( statusMessage )
}
2019-01-25 16:46:39 +01:00
if ( statusMessage . hasOwnProperty ( "text" ) ) {
message . status . text = statusMessage . text . toString ( ) ;
}
2019-03-04 11:23:10 +01:00
message . status . source = {
id : node . id ,
type : node . type ,
name : node . name
}
2019-01-25 16:46:39 +01:00
targetStatusNode . receive ( message ) ;
handled = true ;
} ) ;
}
2019-01-25 14:35:02 +01:00
return handled ;
2015-10-11 21:37:11 +02:00
}
2019-01-17 00:33:04 +01:00
/ * *
* Handle an error event from a node within this flow . If there are no Catch
* nodes within this flow , pass the event to the parent flow .
2019-01-25 14:35:02 +01:00
* @ param { [ type ] } node [ description ]
* @ param { [ type ] } logMessage [ description ]
* @ param { [ type ] } msg [ description ]
* @ param { [ type ] } reportingNode [ description ]
* @ return { [ type ] } [ description ]
2019-01-17 00:33:04 +01:00
* /
2019-01-25 14:35:02 +01:00
handleError ( node , logMessage , msg , reportingNode ) {
if ( ! reportingNode ) {
reportingNode = node ;
}
2019-01-16 17:27:19 +01:00
// console.log("HE",logMessage);
2015-10-11 21:37:11 +02:00
var count = 1 ;
2020-12-14 11:18:50 +01:00
if ( msg && msg . hasOwnProperty ( "error" ) && msg . error ) {
if ( msg . error . hasOwnProperty ( "source" ) && msg . error . source ) {
2015-10-11 21:37:11 +02:00
if ( msg . error . source . id === node . id ) {
count = msg . error . source . count + 1 ;
if ( count === 10 ) {
node . warn ( Log . _ ( "nodes.flow.error-loop" ) ) ;
2017-05-03 14:42:38 +02:00
return false ;
2015-10-11 21:37:11 +02:00
}
}
}
}
2019-01-25 16:46:39 +01:00
let handled = false ;
if ( this . id === 'global' && node . users ) {
// This is a global config node
// Delegate status to any nodes using this config node
for ( let userNode in node . users ) {
if ( node . users . hasOwnProperty ( userNode ) ) {
node . users [ userNode ] . _flow . handleError ( node , logMessage , msg , node . users [ userNode ] ) ;
2015-10-11 21:37:11 +02:00
}
}
2019-01-16 17:27:19 +01:00
handled = true ;
2019-01-25 16:46:39 +01:00
} else {
2019-02-05 15:28:20 +01:00
var handledByUncaught = false ;
2019-01-25 16:46:39 +01:00
this . catchNodes . forEach ( function ( targetCatchNode ) {
if ( targetCatchNode . scope && targetCatchNode . scope . indexOf ( reportingNode . id ) === - 1 ) {
return ;
}
2019-02-05 15:28:20 +01:00
if ( ! targetCatchNode . scope && targetCatchNode . uncaught && ! handledByUncaught ) {
if ( handled ) {
// This has been handled by a !uncaught catch node
return ;
}
// This is an uncaught error
handledByUncaught = true ;
}
2019-01-25 16:46:39 +01:00
var errorMessage ;
if ( msg ) {
errorMessage = redUtil . cloneMessage ( msg ) ;
} else {
errorMessage = { } ;
}
if ( errorMessage . hasOwnProperty ( "error" ) ) {
errorMessage . _error = errorMessage . error ;
}
errorMessage . error = {
message : logMessage . toString ( ) ,
source : {
id : node . id ,
type : node . type ,
name : node . name ,
count : count
}
} ;
if ( logMessage . hasOwnProperty ( 'stack' ) ) {
errorMessage . error . stack = logMessage . stack ;
}
targetCatchNode . receive ( errorMessage ) ;
handled = true ;
} ) ;
}
2019-01-16 17:27:19 +01:00
return handled ;
2015-10-11 21:37:11 +02:00
}
2019-07-09 00:23:33 +02:00
handleComplete ( node , msg ) {
if ( this . completeNodeMap [ node . id ] ) {
let toSend = msg ;
this . completeNodeMap [ node . id ] . forEach ( ( completeNode , index ) => {
toSend = redUtil . cloneMessage ( msg ) ;
completeNode . receive ( toSend ) ;
} )
}
}
2020-07-30 18:52:39 +02:00
send ( sendEvents ) {
// onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object.
// preRoute - called once for each SendEvent object in turn
// preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed.
// postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery)
// onReceive - a node is about to receive a message
// postReceive - the message has been passed to the node's input handler
// onDone, onError - the node has completed with a message or logged an error
handleOnSend ( this , sendEvents , ( err , eventData ) => {
if ( err ) {
let srcNode ;
if ( Array . isArray ( eventData ) ) {
srcNode = eventData [ 0 ] . source . node ;
} else {
srcNode = eventData . source . node ;
}
srcNode . error ( err ) ;
}
} ) ;
2020-07-20 17:48:47 +02:00
}
2019-01-16 17:27:19 +01:00
dump ( ) {
console . log ( "==================" )
console . log ( this . TYPE , this . id ) ;
for ( var id in this . activeNodes ) {
if ( this . activeNodes . hasOwnProperty ( id ) ) {
var node = this . activeNodes [ id ] ;
console . log ( " " , id . padEnd ( 16 ) , node . type )
if ( node . wires ) {
console . log ( " -> " , node . wires )
2015-10-11 21:37:11 +02:00
}
}
}
2019-01-16 17:27:19 +01:00
console . log ( "==================" )
2015-10-11 21:37:11 +02:00
}
2019-01-26 15:15:20 +01:00
2015-10-11 21:37:11 +02:00
}
2019-01-17 00:33:04 +01:00
/ * *
* Stop an individual node within this flow .
*
* @ param { [ type ] } node [ description ]
* @ param { [ type ] } removed [ description ]
* @ return { [ type ] } [ description ]
* /
function stopNode ( node , removed ) {
2019-01-17 14:18:26 +01:00
Log . trace ( "Stopping node " + node . type + ":" + node . id + ( removed ? " removed" : "" ) ) ;
const start = Date . now ( ) ;
const closePromise = node . close ( removed ) ;
2020-03-31 20:25:20 +02:00
let closeTimer = null ;
2019-01-17 14:18:26 +01:00
const closeTimeout = new Promise ( ( resolve , reject ) => {
2020-04-03 00:24:18 +02:00
closeTimer = setTimeout ( ( ) => {
2019-01-17 14:18:26 +01:00
reject ( "Close timed out" ) ;
} , nodeCloseTimeout ) ;
} ) ;
return Promise . race ( [ closePromise , closeTimeout ] ) . then ( ( ) => {
2020-03-31 20:25:20 +02:00
clearTimeout ( closeTimer ) ;
2019-01-17 14:18:26 +01:00
var delta = Date . now ( ) - start ;
Log . trace ( "Stopped node " + node . type + ":" + node . id + " (" + delta + "ms)" ) ;
} ) . catch ( err => {
2020-03-31 20:25:20 +02:00
clearTimeout ( closeTimer ) ;
2019-01-17 14:18:26 +01:00
node . error ( Log . _ ( "nodes.flows.stopping-error" , { message : err } ) ) ;
Log . debug ( err . stack ) ;
2019-01-17 00:33:04 +01:00
} )
}
2020-07-30 18:52:39 +02:00
function handleOnSend ( flow , sendEvents , reportError ) {
// onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object.
hooks . trigger ( "onSend" , sendEvents , ( err ) => {
if ( err ) {
reportError ( err , sendEvents ) ;
return
} else if ( err !== false ) {
for ( var i = 0 ; i < sendEvents . length ; i ++ ) {
handlePreRoute ( flow , sendEvents [ i ] , reportError )
}
}
} ) ;
}
function handlePreRoute ( flow , sendEvent , reportError ) {
// preRoute - called once for each SendEvent object in turn
hooks . trigger ( " preRoute " , sendEvent , ( err ) => {
if ( err ) {
reportError ( err , sendEvent ) ;
return ;
} else if ( err !== false ) {
sendEvent . destination . node = flow . getNode ( sendEvent . destination . id ) ;
if ( sendEvent . destination . node ) {
if ( sendEvent . cloneMessage ) {
sendEvent . msg = redUtil . cloneMessage ( sendEvent . msg ) ;
}
handlePreDeliver ( flow , sendEvent , reportError ) ;
}
}
} )
}
function handlePreDeliver ( flow , sendEvent , reportError ) {
// preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed.
hooks . trigger ( "preDeliver" , sendEvent , ( err ) => {
if ( err ) {
reportError ( err , sendEvent ) ;
return ;
} else if ( err !== false ) {
2020-09-29 18:39:29 +02:00
if ( asyncMessageDelivery ) {
setImmediate ( function ( ) {
if ( sendEvent . destination . node ) {
sendEvent . destination . node . receive ( sendEvent . msg ) ;
}
} )
} else {
2020-07-30 18:52:39 +02:00
if ( sendEvent . destination . node ) {
sendEvent . destination . node . receive ( sendEvent . msg ) ;
}
2020-09-29 18:39:29 +02:00
}
2020-07-30 18:52:39 +02:00
// postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery)
hooks . trigger ( "postDeliver" , sendEvent , function ( err ) {
if ( err ) {
reportError ( err , sendEvent ) ;
}
} )
}
} )
}
2015-10-11 21:37:11 +02:00
module . exports = {
2018-04-23 15:24:51 +02:00
init : function ( runtime ) {
nodeCloseTimeout = runtime . settings . nodeCloseTimeout || 15000 ;
2019-07-09 00:23:33 +02:00
asyncMessageDelivery = ! runtime . settings . runtimeSyncDelivery
2018-04-23 15:24:51 +02:00
Log = runtime . log ;
2019-01-16 17:27:19 +01:00
Subflow = require ( "./Subflow" ) ;
2017-05-15 14:05:33 +02:00
} ,
2019-01-16 17:27:19 +01:00
create : function ( parent , global , conf ) {
return new Flow ( parent , global , conf ) ;
} ,
Flow : Flow
2015-10-11 21:37:11 +02:00
}