WIP: Start refactor of nodes/Flow.js

This commit is contained in:
Nick O'Leary 2019-01-11 14:53:21 +00:00
parent 30aebc4ee3
commit da756fa568
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
2 changed files with 136 additions and 95 deletions

View File

@ -20,38 +20,42 @@ var typeRegistry = require("@node-red/registry");
var Log; var Log;
var redUtil = require("@node-red/util").util; var redUtil = require("@node-red/util").util;
var flowUtil = require("./util"); var flowUtil = require("./util");
var Node;
var nodeCloseTimeout = 15000; var nodeCloseTimeout = 15000;
function Flow(global,flow) { class Flow {
constructor(global,flow) {
this.global = global;
if (typeof flow === 'undefined') { if (typeof flow === 'undefined') {
flow = global; this.flow = global;
} else {
this.flow = flow;
}
this.activeNodes = {};
this.subflowInstanceNodes = {};
this.catchNodeMap = {};
this.statusNodeMap = {};
} }
var activeNodes = {};
var subflowInstanceNodes = {};
var catchNodeMap = {};
var statusNodeMap = {};
this.start = function(diff) { start(diff) {
var node; var node;
var newNode; var newNode;
var id; var id;
catchNodeMap = {}; this.catchNodeMap = {};
statusNodeMap = {}; this.statusNodeMap = {};
var configNodes = Object.keys(flow.configs); var configNodes = Object.keys(this.flow.configs);
var configNodeAttempts = {}; var configNodeAttempts = {};
while (configNodes.length > 0) { while (configNodes.length > 0) {
id = configNodes.shift(); id = configNodes.shift();
node = flow.configs[id]; node = this.flow.configs[id];
if (!activeNodes[id]) { if (!this.activeNodes[id]) {
var readyToCreate = true; var readyToCreate = true;
// This node doesn't exist. // This node doesn't exist.
// Check it doesn't reference another non-existent config node // Check it doesn't reference another non-existent config node
for (var prop in node) { for (var prop in node) {
if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && flow.configs[node[prop]]) { if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && this.flow.configs[node[prop]]) {
if (!activeNodes[node[prop]]) { if (!this.activeNodes[node[prop]]) {
// References a non-existent config node // References a non-existent config node
// Add it to the back of the list to try again later // Add it to the back of the list to try again later
configNodes.push(id); configNodes.push(id);
@ -67,7 +71,7 @@ function Flow(global,flow) {
if (readyToCreate) { if (readyToCreate) {
newNode = createNode(node.type,node); newNode = createNode(node.type,node);
if (newNode) { if (newNode) {
activeNodes[id] = newNode; this.activeNodes[id] = newNode;
} }
} }
} }
@ -75,31 +79,32 @@ function Flow(global,flow) {
if (diff && diff.rewired) { if (diff && diff.rewired) {
for (var j=0;j<diff.rewired.length;j++) { for (var j=0;j<diff.rewired.length;j++) {
var rewireNode = activeNodes[diff.rewired[j]]; var rewireNode = this.activeNodes[diff.rewired[j]];
if (rewireNode) { if (rewireNode) {
rewireNode.updateWires(flow.nodes[rewireNode.id].wires); rewireNode.updateWires(this.flow.nodes[rewireNode.id].wires);
} }
} }
} }
for (id in flow.nodes) { for (id in this.flow.nodes) {
if (flow.nodes.hasOwnProperty(id)) { if (this.flow.nodes.hasOwnProperty(id)) {
node = flow.nodes[id]; node = this.flow.nodes[id];
if (!node.subflow) { if (!node.subflow) {
if (!activeNodes[id]) { if (!this.activeNodes[id]) {
newNode = createNode(node.type,node); newNode = createNode(node.type,node);
if (newNode) { if (newNode) {
activeNodes[id] = newNode; this.activeNodes[id] = newNode;
} }
} }
} else { } else {
if (!subflowInstanceNodes[id]) { if (!this.subflowInstanceNodes[id]) {
try { try {
var nodes = createSubflow(flow.subflows[node.subflow]||global.subflows[node.subflow],node,flow.subflows,global.subflows,activeNodes); var subflowDefinition = this.flow.subflows[node.subflow]||this.global.subflows[node.subflow]
subflowInstanceNodes[id] = nodes.map(function(n) { return n.id}); var nodes = createSubflow(subflowDefinition,node,this.flow.subflows,this.global.subflows,this.activeNodes);
this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
for (var i=0;i<nodes.length;i++) { for (var i=0;i<nodes.length;i++) {
if (nodes[i]) { if (nodes[i]) {
activeNodes[nodes[i].id] = nodes[i]; this.activeNodes[nodes[i].id] = nodes[i];
} }
} }
} catch(err) { } catch(err) {
@ -110,33 +115,33 @@ function Flow(global,flow) {
} }
} }
for (id in activeNodes) { for (id in this.activeNodes) {
if (activeNodes.hasOwnProperty(id)) { if (this.activeNodes.hasOwnProperty(id)) {
node = activeNodes[id]; node = this.activeNodes[id];
if (node.type === "catch") { if (node.type === "catch") {
catchNodeMap[node.z] = catchNodeMap[node.z] || []; this.catchNodeMap[node.z] = this.catchNodeMap[node.z] || [];
catchNodeMap[node.z].push(node); this.catchNodeMap[node.z].push(node);
} else if (node.type === "status") { } else if (node.type === "status") {
statusNodeMap[node.z] = statusNodeMap[node.z] || []; this.statusNodeMap[node.z] = this.statusNodeMap[node.z] || [];
statusNodeMap[node.z].push(node); this.statusNodeMap[node.z].push(node);
} }
} }
} }
} }
this.stop = function(stopList, removedList) { stop(stopList, removedList) {
return when.promise(function(resolve) { return new Promise((resolve,reject) => {
var i; var i;
if (stopList) { if (stopList) {
for (i=0;i<stopList.length;i++) { for (i=0;i<stopList.length;i++) {
if (subflowInstanceNodes[stopList[i]]) { if (this.subflowInstanceNodes[stopList[i]]) {
// The first in the list is the instance node we already // The first in the list is the instance node we already
// know about // know about
stopList = stopList.concat(subflowInstanceNodes[stopList[i]].slice(1)) stopList = stopList.concat(this.subflowInstanceNodes[stopList[i]].slice(1))
} }
} }
} else { } else {
stopList = Object.keys(activeNodes); stopList = Object.keys(this.activeNodes);
} }
// Convert the list to a map to avoid multiple scans of the list // Convert the list to a map to avoid multiple scans of the list
var removedMap = {}; var removedMap = {};
@ -147,11 +152,11 @@ function Flow(global,flow) {
var promises = []; var promises = [];
for (i=0;i<stopList.length;i++) { for (i=0;i<stopList.length;i++) {
var node = activeNodes[stopList[i]]; var node = this.activeNodes[stopList[i]];
if (node) { if (node) {
delete activeNodes[stopList[i]]; delete this.activeNodes[stopList[i]];
if (subflowInstanceNodes[stopList[i]]) { if (this.subflowInstanceNodes[stopList[i]]) {
delete subflowInstanceNodes[stopList[i]]; delete this.subflowInstanceNodes[stopList[i]];
} }
try { try {
var removed = removedMap[stopList[i]]; var removed = removedMap[stopList[i]];
@ -188,25 +193,25 @@ function Flow(global,flow) {
}); });
} }
this.update = function(_global,_flow) { update(_global,_flow) {
global = _global; this.global = _global;
flow = _flow; this.flow = _flow;
} }
this.getNode = function(id) { getNode(id) {
return activeNodes[id]; return this.activeNodes[id];
} }
this.getActiveNodes = function() { getActiveNodes() {
return activeNodes; return this.activeNodes;
} }
this.handleStatus = function(node,statusMessage) { handleStatus(node,statusMessage) {
var targetStatusNodes = null; var targetStatusNodes = null;
var reportingNode = node; var reportingNode = node;
var handled = false; var handled = false;
while (reportingNode && !handled) { while (reportingNode && !handled) {
targetStatusNodes = statusNodeMap[reportingNode.z]; targetStatusNodes = this.statusNodeMap[reportingNode.z];
if (targetStatusNodes) { if (targetStatusNodes) {
targetStatusNodes.forEach(function(targetStatusNode) { targetStatusNodes.forEach(function(targetStatusNode) {
if (targetStatusNode.scope && targetStatusNode.scope.indexOf(node.id) === -1) { if (targetStatusNode.scope && targetStatusNode.scope.indexOf(node.id) === -1) {
@ -230,12 +235,12 @@ function Flow(global,flow) {
}); });
} }
if (!handled) { if (!handled) {
reportingNode = activeNodes[reportingNode.z]; reportingNode = this.activeNodes[reportingNode.z];
} }
} }
} }
this.handleError = function(node,logMessage,msg) { handleError(node,logMessage,msg) {
var count = 1; var count = 1;
if (msg && msg.hasOwnProperty("error") && msg.error !== null) { if (msg && msg.hasOwnProperty("error") && msg.error !== null) {
if (msg.error.hasOwnProperty("source") && msg.error.source !== null) { if (msg.error.hasOwnProperty("source") && msg.error.source !== null) {
@ -252,7 +257,7 @@ function Flow(global,flow) {
var throwingNode = node; var throwingNode = node;
var handled = false; var handled = false;
while (throwingNode && !handled) { while (throwingNode && !handled) {
targetCatchNodes = catchNodeMap[throwingNode.z]; targetCatchNodes = this.catchNodeMap[throwingNode.z];
if (targetCatchNodes) { if (targetCatchNodes) {
targetCatchNodes.forEach(function(targetCatchNode) { targetCatchNodes.forEach(function(targetCatchNode) {
if (targetCatchNode.scope && targetCatchNode.scope.indexOf(throwingNode.id) === -1) { if (targetCatchNode.scope && targetCatchNode.scope.indexOf(throwingNode.id) === -1) {
@ -284,18 +289,24 @@ function Flow(global,flow) {
}); });
} }
if (!handled) { if (!handled) {
throwingNode = activeNodes[throwingNode.z]; throwingNode = this.activeNodes[throwingNode.z];
} }
} }
return handled; return handled;
} }
} }
/**
* Create a new instance of a node
* @param {string} type The node type string
* @param {object} config The node configuration object
* @return {Node} The instance of the node
*/
function createNode(type,config) { function createNode(type,config) {
var nn = null; var newNode = null;
try { try {
var nt = typeRegistry.get(type); var nodeTypeConstructor = typeRegistry.get(type);
if (nt) { if (nodeTypeConstructor) {
var conf = clone(config); var conf = clone(config);
delete conf.credentials; delete conf.credentials;
for (var p in conf) { for (var p in conf) {
@ -304,9 +315,8 @@ function createNode(type,config) {
} }
} }
try { try {
nn = new nt(conf); newNode = new nodeTypeConstructor(conf);
} } catch (err) {
catch (err) {
Log.log({ Log.log({
level: Log.ERROR, level: Log.ERROR,
id:conf.id, id:conf.id,
@ -320,7 +330,7 @@ function createNode(type,config) {
} catch(err) { } catch(err) {
Log.error(err); Log.error(err);
} }
return nn; return newNode;
} }
function createSubflow(sf,sfn,subflows,globalSubflows,activeNodes) { function createSubflow(sf,sfn,subflows,globalSubflows,activeNodes) {
@ -342,7 +352,7 @@ function createSubflow(sf,sfn,subflows,globalSubflows,activeNodes) {
newNodes.push(node); newNodes.push(node);
} }
// Clone all of the subflow node definitions and give them new IDs // Clone all of the subflow config node definitions and give them new IDs
for (i in sf.configs) { for (i in sf.configs) {
if (sf.configs.hasOwnProperty(i)) { if (sf.configs.hasOwnProperty(i)) {
createNodeInSubflow(sf.configs[i]); createNodeInSubflow(sf.configs[i]);
@ -503,7 +513,6 @@ module.exports = {
init: function(runtime) { init: function(runtime) {
nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000; nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000;
Log = runtime.log; Log = runtime.log;
Node = require("../Node");
}, },
create: function(global,conf) { create: function(global,conf) {
return new Flow(global,conf); return new Flow(global,conf);

View File

@ -141,8 +141,11 @@ function setFlows(_config,type,muteLog,forceStart) {
return _config.rev; return _config.rev;
}); });
} else { } else {
// Clone the provided config so it can be manipulated
config = clone(_config); config = clone(_config);
// Parse the configuration
newFlowConfig = flowUtil.parseConfig(clone(config)); newFlowConfig = flowUtil.parseConfig(clone(config));
// Generate a diff to identify what has changed
diff = flowUtil.diffConfigs(activeFlowConfig,newFlowConfig); diff = flowUtil.diffConfigs(activeFlowConfig,newFlowConfig);
// Now the flows have been compared, remove any credentials from newFlowConfig // Now the flows have been compared, remove any credentials from newFlowConfig
@ -153,8 +156,14 @@ function setFlows(_config,type,muteLog,forceStart) {
} }
} }
// Allow the credential store to remove anything no longer needed
credentials.clean(config); credentials.clean(config);
// Remember whether credentials need saving or not
var credsDirty = credentials.dirty(); var credsDirty = credentials.dirty();
// Get the latest credentials and ask storage to save them (if needed)
// as well as the new flow configuration.
configSavePromise = credentials.export().then(function(creds) { configSavePromise = credentials.export().then(function(creds) {
var saveConfig = { var saveConfig = {
flows: config, flows: config,
@ -176,15 +185,20 @@ function setFlows(_config,type,muteLog,forceStart) {
}; };
activeFlowConfig = newFlowConfig; activeFlowConfig = newFlowConfig;
if (forceStart || started) { if (forceStart || started) {
return stop(type,diff,muteLog).then(function() { // Flows are running (or should be)
return context.clean(activeFlowConfig).then(function() {
start(type,diff,muteLog).then(function() { // Stop the active flows (according to deploy type and the diff)
return stop(type,diff,muteLog).then(() => {
// Once stopped, allow context to remove anything no longer needed
return context.clean(activeFlowConfig)
}).then(() => {
// Start the active flows
start(type,diff,muteLog).then(() => {
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true}); events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
}); });
// Return the new revision asynchronously to the actual start
return flowRevision; return flowRevision;
}); }).catch(function(err) { })
}).catch(function(err) {
})
} else { } else {
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true}); events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
} }
@ -273,10 +287,10 @@ function handleStatus(node,statusMessage) {
function start(type,diff,muteLog) { function start(type,diff,muteLog) {
//dumpActiveNodes();
type = type||"full"; type = type||"full";
started = true; started = true;
var i; var i;
// If there are missing types, report them, emit the necessary runtime event and return
if (activeFlowConfig.missingTypes.length > 0) { if (activeFlowConfig.missingTypes.length > 0) {
log.info(log._("nodes.flows.missing-types")); log.info(log._("nodes.flows.missing-types"));
var knownUnknowns = 0; var knownUnknowns = 0;
@ -297,8 +311,10 @@ function start(type,diff,muteLog) {
log.info(" "+settings.userDir); log.info(" "+settings.userDir);
} }
events.emit("runtime-event",{id:"runtime-state",payload:{error:"missing-types", type:"warning",text:"notification.warnings.missing-types",types:activeFlowConfig.missingTypes},retain:true}); events.emit("runtime-event",{id:"runtime-state",payload:{error:"missing-types", type:"warning",text:"notification.warnings.missing-types",types:activeFlowConfig.missingTypes},retain:true});
return when.resolve(); return Promise.resolve();
} }
// In safe mode, don't actually start anything, emit the necessary runtime event and return
if (settings.safeMode) { if (settings.safeMode) {
log.info("*****************************************************************") log.info("*****************************************************************")
log.info(log._("nodes.flows.safe-mode")); log.info(log._("nodes.flows.safe-mode"));
@ -306,6 +322,7 @@ function start(type,diff,muteLog) {
events.emit("runtime-event",{id:"runtime-state",payload:{error:"safe-mode", type:"warning",text:"notification.warnings.safe-mode"},retain:true}); events.emit("runtime-event",{id:"runtime-state",payload:{error:"safe-mode", type:"warning",text:"notification.warnings.safe-mode"},retain:true});
return Promise.resolve(); return Promise.resolve();
} }
if (!muteLog) { if (!muteLog) {
if (type !== "full") { if (type !== "full") {
log.info(log._("nodes.flows.starting-modified-"+type)); log.info(log._("nodes.flows.starting-modified-"+type));
@ -313,15 +330,22 @@ function start(type,diff,muteLog) {
log.info(log._("nodes.flows.starting-flows")); log.info(log._("nodes.flows.starting-flows"));
} }
} }
var id; var id;
if (type === "full") { if (type === "full") {
// A full start means everything should
// Check the 'global' flow is running
if (!activeFlows['global']) { if (!activeFlows['global']) {
log.debug("red/nodes/flows.start : starting flow : global"); log.debug("red/nodes/flows.start : starting flow : global");
activeFlows['global'] = Flow.create(activeFlowConfig); activeFlows['global'] = Flow.create(activeFlowConfig);
} }
// Check each flow in the active configuration
for (id in activeFlowConfig.flows) { for (id in activeFlowConfig.flows) {
if (activeFlowConfig.flows.hasOwnProperty(id)) { if (activeFlowConfig.flows.hasOwnProperty(id)) {
if (!activeFlowConfig.flows[id].disabled && !activeFlows[id]) { if (!activeFlowConfig.flows[id].disabled && !activeFlows[id]) {
// This flow is not disabled, nor is it currently active, so create it
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]); activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
log.debug("red/nodes/flows.start : starting flow : "+id); log.debug("red/nodes/flows.start : starting flow : "+id);
} else { } else {
@ -330,13 +354,18 @@ function start(type,diff,muteLog) {
} }
} }
} else { } else {
// A modified-type deploy means restarting things that have changed
// Update the global flow
activeFlows['global'].update(activeFlowConfig,activeFlowConfig); activeFlows['global'].update(activeFlowConfig,activeFlowConfig);
for (id in activeFlowConfig.flows) { for (id in activeFlowConfig.flows) {
if (activeFlowConfig.flows.hasOwnProperty(id)) { if (activeFlowConfig.flows.hasOwnProperty(id)) {
if (!activeFlowConfig.flows[id].disabled) { if (!activeFlowConfig.flows[id].disabled) {
if (activeFlows[id]) { if (activeFlows[id]) {
// This flow exists and is not disabled, so update it
activeFlows[id].update(activeFlowConfig,activeFlowConfig.flows[id]); activeFlows[id].update(activeFlowConfig,activeFlowConfig.flows[id]);
} else { } else {
// This flow didn't previously exist, so create it
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]); activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
log.debug("red/nodes/flows.start : starting flow : "+id); log.debug("red/nodes/flows.start : starting flow : "+id);
} }
@ -347,9 +376,12 @@ function start(type,diff,muteLog) {
} }
} }
// Having created or updated all flows, now start them.
for (id in activeFlows) { for (id in activeFlows) {
if (activeFlows.hasOwnProperty(id)) { if (activeFlows.hasOwnProperty(id)) {
activeFlows[id].start(diff); activeFlows[id].start(diff);
// Create a map of node id to flow id and also a subflowInstance lookup map
var activeNodes = activeFlows[id].getActiveNodes(); var activeNodes = activeFlows[id].getActiveNodes();
Object.keys(activeNodes).forEach(function(nid) { Object.keys(activeNodes).forEach(function(nid) {
activeNodesToFlow[nid] = id; activeNodesToFlow[nid] = id;
@ -376,7 +408,7 @@ function start(type,diff,muteLog) {
log.info(log._("nodes.flows.started-flows")); log.info(log._("nodes.flows.started-flows"));
} }
} }
return when.resolve(); return Promise.resolve();
} }
function stop(type,diff,muteLog) { function stop(type,diff,muteLog) {