mirror of
https://github.com/node-red/node-red.git
synced 2025-03-01 10:36:34 +00:00
WIP: move all the code
This commit is contained in:
511
packages/node_modules/@node-red/runtime/nodes/flows/Flow.js
generated
vendored
Normal file
511
packages/node_modules/@node-red/runtime/nodes/flows/Flow.js
generated
vendored
Normal file
@@ -0,0 +1,511 @@
|
||||
/**
|
||||
* Copyright JS Foundation and other contributors, http://js.foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
var when = require("when");
|
||||
var clone = require("clone");
|
||||
var typeRegistry = require("@node-red/registry");
|
||||
var Log;
|
||||
var redUtil = require("../../util");
|
||||
var flowUtil = require("./util");
|
||||
var Node;
|
||||
|
||||
var nodeCloseTimeout = 15000;
|
||||
|
||||
function Flow(global,flow) {
|
||||
if (typeof flow === 'undefined') {
|
||||
flow = global;
|
||||
}
|
||||
var activeNodes = {};
|
||||
var subflowInstanceNodes = {};
|
||||
var catchNodeMap = {};
|
||||
var statusNodeMap = {};
|
||||
|
||||
this.start = function(diff) {
|
||||
var node;
|
||||
var newNode;
|
||||
var id;
|
||||
catchNodeMap = {};
|
||||
statusNodeMap = {};
|
||||
|
||||
var configNodes = Object.keys(flow.configs);
|
||||
var configNodeAttempts = {};
|
||||
while (configNodes.length > 0) {
|
||||
id = configNodes.shift();
|
||||
node = flow.configs[id];
|
||||
if (!activeNodes[id]) {
|
||||
var readyToCreate = true;
|
||||
// This node doesn't exist.
|
||||
// Check it doesn't reference another non-existent config node
|
||||
for (var prop in node) {
|
||||
if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && flow.configs[node[prop]]) {
|
||||
if (!activeNodes[node[prop]]) {
|
||||
// References a non-existent config node
|
||||
// Add it to the back of the list to try again later
|
||||
configNodes.push(id);
|
||||
configNodeAttempts[id] = (configNodeAttempts[id]||0)+1;
|
||||
if (configNodeAttempts[id] === 100) {
|
||||
throw new Error("Circular config node dependency detected: "+id);
|
||||
}
|
||||
readyToCreate = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (readyToCreate) {
|
||||
newNode = createNode(node.type,node);
|
||||
if (newNode) {
|
||||
activeNodes[id] = newNode;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (diff && diff.rewired) {
|
||||
for (var j=0;j<diff.rewired.length;j++) {
|
||||
var rewireNode = activeNodes[diff.rewired[j]];
|
||||
if (rewireNode) {
|
||||
rewireNode.updateWires(flow.nodes[rewireNode.id].wires);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (id in flow.nodes) {
|
||||
if (flow.nodes.hasOwnProperty(id)) {
|
||||
node = flow.nodes[id];
|
||||
if (!node.subflow) {
|
||||
if (!activeNodes[id]) {
|
||||
newNode = createNode(node.type,node);
|
||||
if (newNode) {
|
||||
activeNodes[id] = newNode;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (!subflowInstanceNodes[id]) {
|
||||
try {
|
||||
var nodes = createSubflow(flow.subflows[node.subflow]||global.subflows[node.subflow],node,flow.subflows,global.subflows,activeNodes);
|
||||
subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
|
||||
for (var i=0;i<nodes.length;i++) {
|
||||
if (nodes[i]) {
|
||||
activeNodes[nodes[i].id] = nodes[i];
|
||||
}
|
||||
}
|
||||
} catch(err) {
|
||||
console.log(err.stack)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (id in activeNodes) {
|
||||
if (activeNodes.hasOwnProperty(id)) {
|
||||
node = activeNodes[id];
|
||||
if (node.type === "catch") {
|
||||
catchNodeMap[node.z] = catchNodeMap[node.z] || [];
|
||||
catchNodeMap[node.z].push(node);
|
||||
} else if (node.type === "status") {
|
||||
statusNodeMap[node.z] = statusNodeMap[node.z] || [];
|
||||
statusNodeMap[node.z].push(node);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.stop = function(stopList, removedList) {
|
||||
return when.promise(function(resolve) {
|
||||
var i;
|
||||
if (stopList) {
|
||||
for (i=0;i<stopList.length;i++) {
|
||||
if (subflowInstanceNodes[stopList[i]]) {
|
||||
// The first in the list is the instance node we already
|
||||
// know about
|
||||
stopList = stopList.concat(subflowInstanceNodes[stopList[i]].slice(1))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
stopList = Object.keys(activeNodes);
|
||||
}
|
||||
// Convert the list to a map to avoid multiple scans of the list
|
||||
var removedMap = {};
|
||||
removedList = removedList || [];
|
||||
removedList.forEach(function(id) {
|
||||
removedMap[id] = true;
|
||||
});
|
||||
|
||||
var promises = [];
|
||||
for (i=0;i<stopList.length;i++) {
|
||||
var node = activeNodes[stopList[i]];
|
||||
if (node) {
|
||||
delete activeNodes[stopList[i]];
|
||||
if (subflowInstanceNodes[stopList[i]]) {
|
||||
delete subflowInstanceNodes[stopList[i]];
|
||||
}
|
||||
try {
|
||||
var removed = removedMap[stopList[i]];
|
||||
promises.push(
|
||||
when.promise(function(resolve, reject) {
|
||||
var start;
|
||||
var nt = node.type;
|
||||
var nid = node.id;
|
||||
var n = node;
|
||||
when.promise(function(resolve) {
|
||||
Log.trace("Stopping node "+nt+":"+nid+(removed?" removed":""));
|
||||
start = Date.now();
|
||||
resolve(n.close(removed));
|
||||
}).timeout(nodeCloseTimeout).then(function(){
|
||||
var delta = Date.now() - start;
|
||||
Log.trace("Stopped node "+nt+":"+nid+" ("+delta+"ms)" );
|
||||
resolve(delta);
|
||||
},function(err) {
|
||||
var delta = Date.now() - start;
|
||||
n.error(Log._("nodes.flows.stopping-error",{message:err}));
|
||||
Log.debug(err.stack);
|
||||
reject(err);
|
||||
});
|
||||
})
|
||||
);
|
||||
} catch(err) {
|
||||
node.error(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
when.settle(promises).then(function(results) {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
this.update = function(_global,_flow) {
|
||||
global = _global;
|
||||
flow = _flow;
|
||||
}
|
||||
|
||||
this.getNode = function(id) {
|
||||
return activeNodes[id];
|
||||
}
|
||||
|
||||
this.getActiveNodes = function() {
|
||||
return activeNodes;
|
||||
}
|
||||
|
||||
this.handleStatus = function(node,statusMessage) {
|
||||
var targetStatusNodes = null;
|
||||
var reportingNode = node;
|
||||
var handled = false;
|
||||
while (reportingNode && !handled) {
|
||||
targetStatusNodes = statusNodeMap[reportingNode.z];
|
||||
if (targetStatusNodes) {
|
||||
targetStatusNodes.forEach(function(targetStatusNode) {
|
||||
if (targetStatusNode.scope && targetStatusNode.scope.indexOf(node.id) === -1) {
|
||||
return;
|
||||
}
|
||||
var message = {
|
||||
status: {
|
||||
text: "",
|
||||
source: {
|
||||
id: node.id,
|
||||
type: node.type,
|
||||
name: node.name
|
||||
}
|
||||
}
|
||||
};
|
||||
if (statusMessage.hasOwnProperty("text")) {
|
||||
message.status.text = statusMessage.text.toString();
|
||||
}
|
||||
targetStatusNode.receive(message);
|
||||
handled = true;
|
||||
});
|
||||
}
|
||||
if (!handled) {
|
||||
reportingNode = activeNodes[reportingNode.z];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.handleError = function(node,logMessage,msg) {
|
||||
var count = 1;
|
||||
if (msg && msg.hasOwnProperty("error") && msg.error !== null) {
|
||||
if (msg.error.hasOwnProperty("source") && msg.error.source !== null) {
|
||||
if (msg.error.source.id === node.id) {
|
||||
count = msg.error.source.count+1;
|
||||
if (count === 10) {
|
||||
node.warn(Log._("nodes.flow.error-loop"));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
var targetCatchNodes = null;
|
||||
var throwingNode = node;
|
||||
var handled = false;
|
||||
while (throwingNode && !handled) {
|
||||
targetCatchNodes = catchNodeMap[throwingNode.z];
|
||||
if (targetCatchNodes) {
|
||||
targetCatchNodes.forEach(function(targetCatchNode) {
|
||||
if (targetCatchNode.scope && targetCatchNode.scope.indexOf(throwingNode.id) === -1) {
|
||||
return;
|
||||
}
|
||||
var errorMessage;
|
||||
if (msg) {
|
||||
errorMessage = redUtil.cloneMessage(msg);
|
||||
} else {
|
||||
errorMessage = {};
|
||||
}
|
||||
if (errorMessage.hasOwnProperty("error")) {
|
||||
errorMessage._error = errorMessage.error;
|
||||
}
|
||||
errorMessage.error = {
|
||||
message: logMessage.toString(),
|
||||
source: {
|
||||
id: node.id,
|
||||
type: node.type,
|
||||
name: node.name,
|
||||
count: count
|
||||
}
|
||||
};
|
||||
if (logMessage.hasOwnProperty('stack')) {
|
||||
errorMessage.error.stack = logMessage.stack;
|
||||
}
|
||||
targetCatchNode.receive(errorMessage);
|
||||
handled = true;
|
||||
});
|
||||
}
|
||||
if (!handled) {
|
||||
throwingNode = activeNodes[throwingNode.z];
|
||||
}
|
||||
}
|
||||
return handled;
|
||||
}
|
||||
}
|
||||
|
||||
function createNode(type,config) {
|
||||
var nn = null;
|
||||
try {
|
||||
var nt = typeRegistry.get(type);
|
||||
if (nt) {
|
||||
var conf = clone(config);
|
||||
delete conf.credentials;
|
||||
for (var p in conf) {
|
||||
if (conf.hasOwnProperty(p)) {
|
||||
flowUtil.mapEnvVarProperties(conf,p);
|
||||
}
|
||||
}
|
||||
try {
|
||||
nn = new nt(conf);
|
||||
}
|
||||
catch (err) {
|
||||
Log.log({
|
||||
level: Log.ERROR,
|
||||
id:conf.id,
|
||||
type: type,
|
||||
msg: err
|
||||
});
|
||||
}
|
||||
} else {
|
||||
Log.error(Log._("nodes.flow.unknown-type", {type:type}));
|
||||
}
|
||||
} catch(err) {
|
||||
Log.error(err);
|
||||
}
|
||||
return nn;
|
||||
}
|
||||
|
||||
function createSubflow(sf,sfn,subflows,globalSubflows,activeNodes) {
|
||||
//console.log("CREATE SUBFLOW",sf.id,sfn.id);
|
||||
var nodes = [];
|
||||
var node_map = {};
|
||||
var newNodes = [];
|
||||
var node;
|
||||
var wires;
|
||||
var i,j,k;
|
||||
|
||||
var createNodeInSubflow = function(def) {
|
||||
node = clone(def);
|
||||
var nid = redUtil.generateId();
|
||||
node_map[node.id] = node;
|
||||
node._alias = node.id;
|
||||
node.id = nid;
|
||||
node.z = sfn.id;
|
||||
newNodes.push(node);
|
||||
}
|
||||
|
||||
// Clone all of the subflow node definitions and give them new IDs
|
||||
for (i in sf.configs) {
|
||||
if (sf.configs.hasOwnProperty(i)) {
|
||||
createNodeInSubflow(sf.configs[i]);
|
||||
}
|
||||
}
|
||||
// Clone all of the subflow node definitions and give them new IDs
|
||||
for (i in sf.nodes) {
|
||||
if (sf.nodes.hasOwnProperty(i)) {
|
||||
createNodeInSubflow(sf.nodes[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Look for any catch/status nodes and update their scope ids
|
||||
// Update all subflow interior wiring to reflect new node IDs
|
||||
for (i=0;i<newNodes.length;i++) {
|
||||
node = newNodes[i];
|
||||
if (node.wires) {
|
||||
var outputs = node.wires;
|
||||
for (j=0;j<outputs.length;j++) {
|
||||
wires = outputs[j];
|
||||
for (k=0;k<wires.length;k++) {
|
||||
outputs[j][k] = node_map[outputs[j][k]].id
|
||||
}
|
||||
}
|
||||
if ((node.type === 'catch' || node.type === 'status') && node.scope) {
|
||||
node.scope = node.scope.map(function(id) {
|
||||
return node_map[id]?node_map[id].id:""
|
||||
})
|
||||
} else {
|
||||
for (var prop in node) {
|
||||
if (node.hasOwnProperty(prop) && prop !== '_alias') {
|
||||
if (node_map[node[prop]]) {
|
||||
//console.log("Mapped",node.type,node.id,prop,node_map[node[prop]].id);
|
||||
node[prop] = node_map[node[prop]].id;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create a subflow node to accept inbound messages and route appropriately
|
||||
var Node = require("../Node");
|
||||
var subflowInstance = {
|
||||
id: sfn.id,
|
||||
type: sfn.type,
|
||||
z: sfn.z,
|
||||
name: sfn.name,
|
||||
wires: []
|
||||
}
|
||||
if (sf.in) {
|
||||
subflowInstance.wires = sf.in.map(function(n) { return n.wires.map(function(w) { return node_map[w.id].id;})})
|
||||
subflowInstance._originalWires = clone(subflowInstance.wires);
|
||||
}
|
||||
var subflowNode = new Node(subflowInstance);
|
||||
|
||||
subflowNode.on("input", function(msg) { this.send(msg);});
|
||||
|
||||
|
||||
subflowNode._updateWires = subflowNode.updateWires;
|
||||
|
||||
subflowNode.updateWires = function(newWires) {
|
||||
// Wire the subflow outputs
|
||||
if (sf.out) {
|
||||
var node,wires,i,j;
|
||||
// Restore the original wiring to the internal nodes
|
||||
subflowInstance.wires = clone(subflowInstance._originalWires);
|
||||
for (i=0;i<sf.out.length;i++) {
|
||||
wires = sf.out[i].wires;
|
||||
for (j=0;j<wires.length;j++) {
|
||||
if (wires[j].id != sf.id) {
|
||||
node = node_map[wires[j].id];
|
||||
if (node._originalWires) {
|
||||
node.wires = clone(node._originalWires);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var modifiedNodes = {};
|
||||
var subflowInstanceModified = false;
|
||||
|
||||
for (i=0;i<sf.out.length;i++) {
|
||||
wires = sf.out[i].wires;
|
||||
for (j=0;j<wires.length;j++) {
|
||||
if (wires[j].id === sf.id) {
|
||||
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(newWires[i]);
|
||||
subflowInstanceModified = true;
|
||||
} else {
|
||||
node = node_map[wires[j].id];
|
||||
node.wires[wires[j].port] = node.wires[wires[j].port].concat(newWires[i]);
|
||||
modifiedNodes[node.id] = node;
|
||||
}
|
||||
}
|
||||
}
|
||||
Object.keys(modifiedNodes).forEach(function(id) {
|
||||
var node = modifiedNodes[id];
|
||||
subflowNode.instanceNodes[id].updateWires(node.wires);
|
||||
});
|
||||
if (subflowInstanceModified) {
|
||||
subflowNode._updateWires(subflowInstance.wires);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nodes.push(subflowNode);
|
||||
|
||||
// Wire the subflow outputs
|
||||
if (sf.out) {
|
||||
var modifiedNodes = {};
|
||||
for (i=0;i<sf.out.length;i++) {
|
||||
wires = sf.out[i].wires;
|
||||
for (j=0;j<wires.length;j++) {
|
||||
if (wires[j].id === sf.id) {
|
||||
// A subflow input wired straight to a subflow output
|
||||
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(sfn.wires[i])
|
||||
subflowNode._updateWires(subflowInstance.wires);
|
||||
} else {
|
||||
node = node_map[wires[j].id];
|
||||
modifiedNodes[node.id] = node;
|
||||
if (!node._originalWires) {
|
||||
node._originalWires = clone(node.wires);
|
||||
}
|
||||
node.wires[wires[j].port] = (node.wires[wires[j].port]||[]).concat(sfn.wires[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Instantiate the nodes
|
||||
for (i=0;i<newNodes.length;i++) {
|
||||
node = newNodes[i];
|
||||
var type = node.type;
|
||||
|
||||
var m = /^subflow:(.+)$/.exec(type);
|
||||
if (!m) {
|
||||
var newNode = createNode(type,node);
|
||||
if (newNode) {
|
||||
activeNodes[node.id] = newNode;
|
||||
nodes.push(newNode);
|
||||
}
|
||||
} else {
|
||||
var subflowId = m[1];
|
||||
nodes = nodes.concat(createSubflow(subflows[subflowId]||globalSubflows[subflowId],node,subflows,globalSubflows,activeNodes));
|
||||
}
|
||||
}
|
||||
|
||||
subflowNode.instanceNodes = {};
|
||||
|
||||
nodes.forEach(function(node) {
|
||||
subflowNode.instanceNodes[node.id] = node;
|
||||
});
|
||||
return nodes;
|
||||
}
|
||||
|
||||
|
||||
module.exports = {
|
||||
init: function(runtime) {
|
||||
nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000;
|
||||
Log = runtime.log;
|
||||
Node = require("../Node");
|
||||
},
|
||||
create: function(global,conf) {
|
||||
return new Flow(global,conf);
|
||||
}
|
||||
}
|
735
packages/node_modules/@node-red/runtime/nodes/flows/index.js
generated
vendored
Normal file
735
packages/node_modules/@node-red/runtime/nodes/flows/index.js
generated
vendored
Normal file
@@ -0,0 +1,735 @@
|
||||
/**
|
||||
* Copyright JS Foundation and other contributors, http://js.foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
var clone = require("clone");
|
||||
var when = require("when");
|
||||
|
||||
var Flow = require('./Flow');
|
||||
|
||||
var typeRegistry = require("@node-red/registry");
|
||||
var deprecated = typeRegistry.deprecated;
|
||||
|
||||
|
||||
var context = require("../context")
|
||||
var credentials = require("../credentials");
|
||||
|
||||
var flowUtil = require("./util");
|
||||
var log;
|
||||
var events = require("../../events");
|
||||
var redUtil = require("../../util");
|
||||
|
||||
var storage = null;
|
||||
var settings = null;
|
||||
|
||||
var activeConfig = null;
|
||||
var activeFlowConfig = null;
|
||||
|
||||
var activeFlows = {};
|
||||
var started = false;
|
||||
var credentialsPendingReset = false;
|
||||
|
||||
var activeNodesToFlow = {};
|
||||
var subflowInstanceNodeMap = {};
|
||||
|
||||
var typeEventRegistered = false;
|
||||
|
||||
function init(runtime) {
|
||||
if (started) {
|
||||
throw new Error("Cannot init without a stop");
|
||||
}
|
||||
settings = runtime.settings;
|
||||
storage = runtime.storage;
|
||||
log = runtime.log;
|
||||
started = false;
|
||||
if (!typeEventRegistered) {
|
||||
events.on('type-registered',function(type) {
|
||||
if (activeFlowConfig && activeFlowConfig.missingTypes.length > 0) {
|
||||
var i = activeFlowConfig.missingTypes.indexOf(type);
|
||||
if (i != -1) {
|
||||
log.info(log._("nodes.flows.registered-missing", {type:type}));
|
||||
activeFlowConfig.missingTypes.splice(i,1);
|
||||
if (activeFlowConfig.missingTypes.length === 0 && started) {
|
||||
events.emit("runtime-event",{id:"runtime-state",retain: true});
|
||||
start();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
typeEventRegistered = true;
|
||||
}
|
||||
Flow.init(runtime);
|
||||
}
|
||||
|
||||
function loadFlows() {
|
||||
var config;
|
||||
return storage.getFlows().then(function(_config) {
|
||||
config = _config;
|
||||
log.debug("loaded flow revision: "+config.rev);
|
||||
return credentials.load(config.credentials).then(function() {
|
||||
events.emit("runtime-event",{id:"runtime-state",retain:true});
|
||||
return config;
|
||||
});
|
||||
}).catch(function(err) {
|
||||
if (err.code === "credentials_load_failed" && !storage.projects) {
|
||||
// project disabled, credential load failed
|
||||
credentialsPendingReset = true;
|
||||
log.warn(log._("nodes.flows.error",{message:err.toString()}));
|
||||
events.emit("runtime-event",{id:"runtime-state",payload:{type:"warning",error:err.code,text:"notification.warnings.credentials_load_failed_reset"},retain:true});
|
||||
return config;
|
||||
} else {
|
||||
activeConfig = null;
|
||||
events.emit("runtime-event",{id:"runtime-state",payload:{type:"warning",error:err.code,project:err.project,text:"notification.warnings."+err.code},retain:true});
|
||||
if (err.code === "project_not_found") {
|
||||
log.warn(log._("storage.localfilesystem.projects.project-not-found",{project:err.project}));
|
||||
} else {
|
||||
log.warn(log._("nodes.flows.error",{message:err.toString()}));
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
}
|
||||
function load(forceStart) {
|
||||
return setFlows(null,"load",false,forceStart);
|
||||
}
|
||||
|
||||
/*
|
||||
* _config - new node array configuration
|
||||
* type - full/nodes/flows/load (default full)
|
||||
* muteLog - don't emit the standard log messages (used for individual flow api)
|
||||
*/
|
||||
function setFlows(_config,type,muteLog,forceStart) {
|
||||
type = type||"full";
|
||||
|
||||
var configSavePromise = null;
|
||||
var config = null;
|
||||
var diff;
|
||||
var newFlowConfig;
|
||||
var isLoad = false;
|
||||
if (type === "load") {
|
||||
isLoad = true;
|
||||
configSavePromise = loadFlows().then(function(_config) {
|
||||
config = clone(_config.flows);
|
||||
newFlowConfig = flowUtil.parseConfig(clone(config));
|
||||
type = "full";
|
||||
return _config.rev;
|
||||
});
|
||||
} else {
|
||||
config = clone(_config);
|
||||
newFlowConfig = flowUtil.parseConfig(clone(config));
|
||||
diff = flowUtil.diffConfigs(activeFlowConfig,newFlowConfig);
|
||||
|
||||
// Now the flows have been compared, remove any credentials from newFlowConfig
|
||||
// so they don't cause false-positive diffs the next time a flow is deployed
|
||||
for (var id in newFlowConfig.allNodes) {
|
||||
if (newFlowConfig.allNodes.hasOwnProperty(id)) {
|
||||
delete newFlowConfig.allNodes[id].credentials;
|
||||
}
|
||||
}
|
||||
|
||||
credentials.clean(config);
|
||||
var credsDirty = credentials.dirty();
|
||||
configSavePromise = credentials.export().then(function(creds) {
|
||||
var saveConfig = {
|
||||
flows: config,
|
||||
credentialsDirty:credsDirty,
|
||||
credentials: creds
|
||||
}
|
||||
return storage.saveFlows(saveConfig);
|
||||
});
|
||||
}
|
||||
|
||||
return configSavePromise
|
||||
.then(function(flowRevision) {
|
||||
if (!isLoad) {
|
||||
log.debug("saved flow revision: "+flowRevision);
|
||||
}
|
||||
activeConfig = {
|
||||
flows:config,
|
||||
rev:flowRevision
|
||||
};
|
||||
activeFlowConfig = newFlowConfig;
|
||||
if (forceStart || started) {
|
||||
return stop(type,diff,muteLog).then(function() {
|
||||
return context.clean(activeFlowConfig).then(function() {
|
||||
start(type,diff,muteLog).then(function() {
|
||||
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
|
||||
});
|
||||
return flowRevision;
|
||||
});
|
||||
}).catch(function(err) {
|
||||
})
|
||||
} else {
|
||||
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function getNode(id) {
|
||||
var node;
|
||||
if (activeNodesToFlow[id] && activeFlows[activeNodesToFlow[id]]) {
|
||||
return activeFlows[activeNodesToFlow[id]].getNode(id);
|
||||
}
|
||||
for (var flowId in activeFlows) {
|
||||
if (activeFlows.hasOwnProperty(flowId)) {
|
||||
node = activeFlows[flowId].getNode(id);
|
||||
if (node) {
|
||||
return node;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function eachNode(cb) {
|
||||
for (var id in activeFlowConfig.allNodes) {
|
||||
if (activeFlowConfig.allNodes.hasOwnProperty(id)) {
|
||||
cb(activeFlowConfig.allNodes[id]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function getFlows() {
|
||||
return activeConfig;
|
||||
}
|
||||
|
||||
function delegateError(node,logMessage,msg) {
|
||||
var handled = false;
|
||||
if (activeFlows[node.z]) {
|
||||
handled = activeFlows[node.z].handleError(node,logMessage,msg);
|
||||
} else if (activeNodesToFlow[node.z] && activeFlows[activeNodesToFlow[node.z]]) {
|
||||
handled = activeFlows[activeNodesToFlow[node.z]].handleError(node,logMessage,msg);
|
||||
} else if (activeFlowConfig.subflows[node.z] && subflowInstanceNodeMap[node.id]) {
|
||||
subflowInstanceNodeMap[node.id].forEach(function(n) {
|
||||
handled = handled || delegateError(getNode(n),logMessage,msg);
|
||||
});
|
||||
}
|
||||
return handled;
|
||||
}
|
||||
function handleError(node,logMessage,msg) {
|
||||
var handled = false;
|
||||
if (node.z) {
|
||||
handled = delegateError(node,logMessage,msg);
|
||||
} else {
|
||||
if (activeFlowConfig.configs[node.id]) {
|
||||
activeFlowConfig.configs[node.id]._users.forEach(function(id) {
|
||||
var userNode = activeFlowConfig.allNodes[id];
|
||||
handled = handled || delegateError(userNode,logMessage,msg);
|
||||
})
|
||||
}
|
||||
}
|
||||
return handled;
|
||||
}
|
||||
|
||||
function delegateStatus(node,statusMessage) {
|
||||
if (activeFlows[node.z]) {
|
||||
activeFlows[node.z].handleStatus(node,statusMessage);
|
||||
} else if (activeNodesToFlow[node.z] && activeFlows[activeNodesToFlow[node.z]]) {
|
||||
activeFlows[activeNodesToFlow[node.z]].handleStatus(node,statusMessage);
|
||||
}
|
||||
}
|
||||
function handleStatus(node,statusMessage) {
|
||||
events.emit("node-status",{
|
||||
id: node.id,
|
||||
status:statusMessage
|
||||
});
|
||||
if (node.z) {
|
||||
delegateStatus(node,statusMessage);
|
||||
} else {
|
||||
if (activeFlowConfig.configs[node.id]) {
|
||||
activeFlowConfig.configs[node.id]._users.forEach(function(id) {
|
||||
var userNode = activeFlowConfig.allNodes[id];
|
||||
delegateStatus(userNode,statusMessage);
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
function start(type,diff,muteLog) {
|
||||
//dumpActiveNodes();
|
||||
type = type||"full";
|
||||
started = true;
|
||||
var i;
|
||||
if (activeFlowConfig.missingTypes.length > 0) {
|
||||
log.info(log._("nodes.flows.missing-types"));
|
||||
var knownUnknowns = 0;
|
||||
for (i=0;i<activeFlowConfig.missingTypes.length;i++) {
|
||||
var nodeType = activeFlowConfig.missingTypes[i];
|
||||
var info = deprecated.get(nodeType);
|
||||
if (info) {
|
||||
log.info(log._("nodes.flows.missing-type-provided",{type:activeFlowConfig.missingTypes[i],module:info.module}));
|
||||
knownUnknowns += 1;
|
||||
} else {
|
||||
log.info(" - "+activeFlowConfig.missingTypes[i]);
|
||||
}
|
||||
}
|
||||
if (knownUnknowns > 0) {
|
||||
log.info(log._("nodes.flows.missing-type-install-1"));
|
||||
log.info(" npm install <module name>");
|
||||
log.info(log._("nodes.flows.missing-type-install-2"));
|
||||
log.info(" "+settings.userDir);
|
||||
}
|
||||
events.emit("runtime-event",{id:"runtime-state",payload:{error:"missing-types", type:"warning",text:"notification.warnings.missing-types",types:activeFlowConfig.missingTypes},retain:true});
|
||||
return when.resolve();
|
||||
}
|
||||
if (!muteLog) {
|
||||
if (type !== "full") {
|
||||
log.info(log._("nodes.flows.starting-modified-"+type));
|
||||
} else {
|
||||
log.info(log._("nodes.flows.starting-flows"));
|
||||
}
|
||||
}
|
||||
var id;
|
||||
if (type === "full") {
|
||||
if (!activeFlows['global']) {
|
||||
log.debug("red/nodes/flows.start : starting flow : global");
|
||||
activeFlows['global'] = Flow.create(activeFlowConfig);
|
||||
}
|
||||
for (id in activeFlowConfig.flows) {
|
||||
if (activeFlowConfig.flows.hasOwnProperty(id)) {
|
||||
if (!activeFlowConfig.flows[id].disabled && !activeFlows[id]) {
|
||||
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
|
||||
log.debug("red/nodes/flows.start : starting flow : "+id);
|
||||
} else {
|
||||
log.debug("red/nodes/flows.start : not starting disabled flow : "+id);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
activeFlows['global'].update(activeFlowConfig,activeFlowConfig);
|
||||
for (id in activeFlowConfig.flows) {
|
||||
if (activeFlowConfig.flows.hasOwnProperty(id)) {
|
||||
if (!activeFlowConfig.flows[id].disabled) {
|
||||
if (activeFlows[id]) {
|
||||
activeFlows[id].update(activeFlowConfig,activeFlowConfig.flows[id]);
|
||||
} else {
|
||||
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
|
||||
log.debug("red/nodes/flows.start : starting flow : "+id);
|
||||
}
|
||||
} else {
|
||||
log.debug("red/nodes/flows.start : not starting disabled flow : "+id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (id in activeFlows) {
|
||||
if (activeFlows.hasOwnProperty(id)) {
|
||||
activeFlows[id].start(diff);
|
||||
var activeNodes = activeFlows[id].getActiveNodes();
|
||||
Object.keys(activeNodes).forEach(function(nid) {
|
||||
activeNodesToFlow[nid] = id;
|
||||
if (activeNodes[nid]._alias) {
|
||||
subflowInstanceNodeMap[activeNodes[nid]._alias] = subflowInstanceNodeMap[activeNodes[nid]._alias] || [];
|
||||
subflowInstanceNodeMap[activeNodes[nid]._alias].push(nid);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
events.emit("nodes-started");
|
||||
|
||||
if (credentialsPendingReset === true) {
|
||||
credentialsPendingReset = false;
|
||||
} else {
|
||||
events.emit("runtime-event",{id:"runtime-state",retain:true});
|
||||
}
|
||||
|
||||
if (!muteLog) {
|
||||
if (type !== "full") {
|
||||
log.info(log._("nodes.flows.started-modified-"+type));
|
||||
} else {
|
||||
log.info(log._("nodes.flows.started-flows"));
|
||||
}
|
||||
}
|
||||
return when.resolve();
|
||||
}
|
||||
|
||||
function stop(type,diff,muteLog) {
|
||||
if (!started) {
|
||||
return when.resolve();
|
||||
}
|
||||
type = type||"full";
|
||||
diff = diff||{
|
||||
added:[],
|
||||
changed:[],
|
||||
removed:[],
|
||||
rewired:[],
|
||||
linked:[]
|
||||
};
|
||||
if (!muteLog) {
|
||||
if (type !== "full") {
|
||||
log.info(log._("nodes.flows.stopping-modified-"+type));
|
||||
} else {
|
||||
log.info(log._("nodes.flows.stopping-flows"));
|
||||
}
|
||||
}
|
||||
started = false;
|
||||
var promises = [];
|
||||
var stopList;
|
||||
var removedList = diff.removed;
|
||||
if (type === 'nodes') {
|
||||
stopList = diff.changed.concat(diff.removed);
|
||||
} else if (type === 'flows') {
|
||||
stopList = diff.changed.concat(diff.removed).concat(diff.linked);
|
||||
}
|
||||
|
||||
for (var id in activeFlows) {
|
||||
if (activeFlows.hasOwnProperty(id)) {
|
||||
var flowStateChanged = diff && (diff.added.indexOf(id) !== -1 || diff.removed.indexOf(id) !== -1);
|
||||
log.debug("red/nodes/flows.stop : stopping flow : "+id);
|
||||
promises = promises.concat(activeFlows[id].stop(flowStateChanged?null:stopList,removedList));
|
||||
if (type === "full" || flowStateChanged || diff.removed.indexOf(id)!==-1) {
|
||||
delete activeFlows[id];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return when.promise(function(resolve,reject) {
|
||||
when.settle(promises).then(function() {
|
||||
for (id in activeNodesToFlow) {
|
||||
if (activeNodesToFlow.hasOwnProperty(id)) {
|
||||
if (!activeFlows[activeNodesToFlow[id]]) {
|
||||
delete activeNodesToFlow[id];
|
||||
}
|
||||
}
|
||||
}
|
||||
if (stopList) {
|
||||
stopList.forEach(function(id) {
|
||||
delete activeNodesToFlow[id];
|
||||
});
|
||||
}
|
||||
// Ideally we'd prune just what got stopped - but mapping stopList
|
||||
// id to the list of subflow instance nodes is something only Flow
|
||||
// can do... so cheat by wiping the map knowing it'll be rebuilt
|
||||
// in start()
|
||||
subflowInstanceNodeMap = {};
|
||||
if (!muteLog) {
|
||||
if (type !== "full") {
|
||||
log.info(log._("nodes.flows.stopped-modified-"+type));
|
||||
} else {
|
||||
log.info(log._("nodes.flows.stopped-flows"));
|
||||
}
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
function checkTypeInUse(id) {
|
||||
var nodeInfo = typeRegistry.getNodeInfo(id);
|
||||
if (!nodeInfo) {
|
||||
throw new Error(log._("nodes.index.unrecognised-id", {id:id}));
|
||||
} else {
|
||||
var inUse = {};
|
||||
var config = getFlows();
|
||||
config.flows.forEach(function(n) {
|
||||
inUse[n.type] = (inUse[n.type]||0)+1;
|
||||
});
|
||||
var nodesInUse = [];
|
||||
nodeInfo.types.forEach(function(t) {
|
||||
if (inUse[t]) {
|
||||
nodesInUse.push(t);
|
||||
}
|
||||
});
|
||||
if (nodesInUse.length > 0) {
|
||||
var msg = nodesInUse.join(", ");
|
||||
var err = new Error(log._("nodes.index.type-in-use", {msg:msg}));
|
||||
err.code = "type_in_use";
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function updateMissingTypes() {
|
||||
var subflowInstanceRE = /^subflow:(.+)$/;
|
||||
activeFlowConfig.missingTypes = [];
|
||||
|
||||
for (var id in activeFlowConfig.allNodes) {
|
||||
if (activeFlowConfig.allNodes.hasOwnProperty(id)) {
|
||||
var node = activeFlowConfig.allNodes[id];
|
||||
if (node.type !== 'tab' && node.type !== 'subflow') {
|
||||
var subflowDetails = subflowInstanceRE.exec(node.type);
|
||||
if ( (subflowDetails && !activeFlowConfig.subflows[subflowDetails[1]]) || (!subflowDetails && !typeRegistry.get(node.type)) ) {
|
||||
if (activeFlowConfig.missingTypes.indexOf(node.type) === -1) {
|
||||
activeFlowConfig.missingTypes.push(node.type);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function addFlow(flow) {
|
||||
var i,node;
|
||||
if (!flow.hasOwnProperty('nodes')) {
|
||||
throw new Error('missing nodes property');
|
||||
}
|
||||
flow.id = redUtil.generateId();
|
||||
|
||||
var tabNode = {
|
||||
type:'tab',
|
||||
label:flow.label,
|
||||
id:flow.id
|
||||
}
|
||||
if (flow.hasOwnProperty('info')) {
|
||||
tabNode.info = flow.info;
|
||||
}
|
||||
if (flow.hasOwnProperty('disabled')) {
|
||||
tabNode.disabled = flow.disabled;
|
||||
}
|
||||
|
||||
var nodes = [tabNode];
|
||||
|
||||
for (i=0;i<flow.nodes.length;i++) {
|
||||
node = flow.nodes[i];
|
||||
if (activeFlowConfig.allNodes[node.id]) {
|
||||
// TODO nls
|
||||
return when.reject(new Error('duplicate id'));
|
||||
}
|
||||
if (node.type === 'tab' || node.type === 'subflow') {
|
||||
return when.reject(new Error('invalid node type: '+node.type));
|
||||
}
|
||||
node.z = flow.id;
|
||||
nodes.push(node);
|
||||
}
|
||||
if (flow.configs) {
|
||||
for (i=0;i<flow.configs.length;i++) {
|
||||
node = flow.configs[i];
|
||||
if (activeFlowConfig.allNodes[node.id]) {
|
||||
// TODO nls
|
||||
return when.reject(new Error('duplicate id'));
|
||||
}
|
||||
if (node.type === 'tab' || node.type === 'subflow') {
|
||||
return when.reject(new Error('invalid node type: '+node.type));
|
||||
}
|
||||
node.z = flow.id;
|
||||
nodes.push(node);
|
||||
}
|
||||
}
|
||||
var newConfig = clone(activeConfig.flows);
|
||||
newConfig = newConfig.concat(nodes);
|
||||
|
||||
return setFlows(newConfig,'flows',true).then(function() {
|
||||
log.info(log._("nodes.flows.added-flow",{label:(flow.label?flow.label+" ":"")+"["+flow.id+"]"}));
|
||||
return flow.id;
|
||||
});
|
||||
}
|
||||
|
||||
function getFlow(id) {
|
||||
var flow;
|
||||
if (id === 'global') {
|
||||
flow = activeFlowConfig;
|
||||
} else {
|
||||
flow = activeFlowConfig.flows[id];
|
||||
}
|
||||
if (!flow) {
|
||||
return null;
|
||||
}
|
||||
var result = {
|
||||
id: id
|
||||
};
|
||||
if (flow.label) {
|
||||
result.label = flow.label;
|
||||
}
|
||||
if (flow.disabled) {
|
||||
result.disabled = flow.disabled;
|
||||
}
|
||||
if (flow.hasOwnProperty('info')) {
|
||||
result.info = flow.info;
|
||||
}
|
||||
if (id !== 'global') {
|
||||
result.nodes = [];
|
||||
}
|
||||
if (flow.nodes) {
|
||||
var nodeIds = Object.keys(flow.nodes);
|
||||
if (nodeIds.length > 0) {
|
||||
result.nodes = nodeIds.map(function(nodeId) {
|
||||
var node = clone(flow.nodes[nodeId]);
|
||||
if (node.type === 'link out') {
|
||||
delete node.wires;
|
||||
}
|
||||
return node;
|
||||
})
|
||||
}
|
||||
}
|
||||
if (flow.configs) {
|
||||
var configIds = Object.keys(flow.configs);
|
||||
result.configs = configIds.map(function(configId) {
|
||||
return clone(flow.configs[configId]);
|
||||
})
|
||||
if (result.configs.length === 0) {
|
||||
delete result.configs;
|
||||
}
|
||||
}
|
||||
if (flow.subflows) {
|
||||
var subflowIds = Object.keys(flow.subflows);
|
||||
result.subflows = subflowIds.map(function(subflowId) {
|
||||
var subflow = clone(flow.subflows[subflowId]);
|
||||
var nodeIds = Object.keys(subflow.nodes);
|
||||
subflow.nodes = nodeIds.map(function(id) {
|
||||
return subflow.nodes[id];
|
||||
});
|
||||
if (subflow.configs) {
|
||||
var configIds = Object.keys(subflow.configs);
|
||||
subflow.configs = configIds.map(function(id) {
|
||||
return subflow.configs[id];
|
||||
})
|
||||
}
|
||||
delete subflow.instances;
|
||||
return subflow;
|
||||
});
|
||||
if (result.subflows.length === 0) {
|
||||
delete result.subflows;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function updateFlow(id,newFlow) {
|
||||
var label = id;
|
||||
if (id !== 'global') {
|
||||
if (!activeFlowConfig.flows[id]) {
|
||||
var e = new Error();
|
||||
e.code = 404;
|
||||
throw e;
|
||||
}
|
||||
label = activeFlowConfig.flows[id].label;
|
||||
}
|
||||
var newConfig = clone(activeConfig.flows);
|
||||
var nodes;
|
||||
|
||||
if (id === 'global') {
|
||||
// Remove all nodes whose z is not a known flow
|
||||
// When subflows can be owned by a flow, this logic will have to take
|
||||
// that into account
|
||||
newConfig = newConfig.filter(function(node) {
|
||||
return node.type === 'tab' || (node.hasOwnProperty('z') && activeFlowConfig.flows.hasOwnProperty(node.z));
|
||||
})
|
||||
|
||||
// Add in the new config nodes
|
||||
nodes = newFlow.configs||[];
|
||||
if (newFlow.subflows) {
|
||||
// Add in the new subflows
|
||||
newFlow.subflows.forEach(function(sf) {
|
||||
nodes = nodes.concat(sf.nodes||[]).concat(sf.configs||[]);
|
||||
delete sf.nodes;
|
||||
delete sf.configs;
|
||||
nodes.push(sf);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
newConfig = newConfig.filter(function(node) {
|
||||
return node.z !== id && node.id !== id;
|
||||
});
|
||||
var tabNode = {
|
||||
type:'tab',
|
||||
label:newFlow.label,
|
||||
id:id
|
||||
}
|
||||
if (newFlow.hasOwnProperty('info')) {
|
||||
tabNode.info = newFlow.info;
|
||||
}
|
||||
if (newFlow.hasOwnProperty('disabled')) {
|
||||
tabNode.disabled = newFlow.disabled;
|
||||
}
|
||||
|
||||
nodes = [tabNode].concat(newFlow.nodes||[]).concat(newFlow.configs||[]);
|
||||
nodes.forEach(function(n) {
|
||||
n.z = id;
|
||||
});
|
||||
}
|
||||
|
||||
newConfig = newConfig.concat(nodes);
|
||||
return setFlows(newConfig,'flows',true).then(function() {
|
||||
log.info(log._("nodes.flows.updated-flow",{label:(label?label+" ":"")+"["+id+"]"}));
|
||||
})
|
||||
}
|
||||
|
||||
function removeFlow(id) {
|
||||
if (id === 'global') {
|
||||
// TODO: nls + error code
|
||||
throw new Error('not allowed to remove global');
|
||||
}
|
||||
var flow = activeFlowConfig.flows[id];
|
||||
if (!flow) {
|
||||
var e = new Error();
|
||||
e.code = 404;
|
||||
throw e;
|
||||
}
|
||||
|
||||
var newConfig = clone(activeConfig.flows);
|
||||
newConfig = newConfig.filter(function(node) {
|
||||
return node.z !== id && node.id !== id;
|
||||
});
|
||||
|
||||
return setFlows(newConfig,'flows',true).then(function() {
|
||||
log.info(log._("nodes.flows.removed-flow",{label:(flow.label?flow.label+" ":"")+"["+flow.id+"]"}));
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
init: init,
|
||||
|
||||
/**
|
||||
* Load the current flow configuration from storage
|
||||
* @return a promise for the loading of the config
|
||||
*/
|
||||
load: load,
|
||||
|
||||
get:getNode,
|
||||
eachNode: eachNode,
|
||||
|
||||
/**
|
||||
* Gets the current flow configuration
|
||||
*/
|
||||
getFlows: getFlows,
|
||||
|
||||
/**
|
||||
* Sets the current active config.
|
||||
* @param config the configuration to enable
|
||||
* @param type the type of deployment to do: full (default), nodes, flows, load
|
||||
* @return a promise for the saving/starting of the new flow
|
||||
*/
|
||||
setFlows: setFlows,
|
||||
|
||||
/**
|
||||
* Starts the current flow configuration
|
||||
*/
|
||||
startFlows: start,
|
||||
|
||||
/**
|
||||
* Stops the current flow configuration
|
||||
* @return a promise for the stopping of the flow
|
||||
*/
|
||||
stopFlows: stop,
|
||||
|
||||
get started() { return started },
|
||||
|
||||
handleError: handleError,
|
||||
handleStatus: handleStatus,
|
||||
|
||||
checkTypeInUse: checkTypeInUse,
|
||||
|
||||
addFlow: addFlow,
|
||||
getFlow: getFlow,
|
||||
updateFlow: updateFlow,
|
||||
removeFlow: removeFlow,
|
||||
disableFlow:null,
|
||||
enableFlow:null
|
||||
|
||||
};
|
447
packages/node_modules/@node-red/runtime/nodes/flows/util.js
generated
vendored
Normal file
447
packages/node_modules/@node-red/runtime/nodes/flows/util.js
generated
vendored
Normal file
@@ -0,0 +1,447 @@
|
||||
/**
|
||||
* Copyright JS Foundation and other contributors, http://js.foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
var clone = require("clone");
|
||||
var redUtil = require("../../util");
|
||||
var subflowInstanceRE = /^subflow:(.+)$/;
|
||||
var typeRegistry = require("@node-red/registry");
|
||||
|
||||
function diffNodes(oldNode,newNode) {
|
||||
if (oldNode == null) {
|
||||
return true;
|
||||
}
|
||||
var oldKeys = Object.keys(oldNode).filter(function(p) { return p != "x" && p != "y" && p != "wires" });
|
||||
var newKeys = Object.keys(newNode).filter(function(p) { return p != "x" && p != "y" && p != "wires" });
|
||||
if (oldKeys.length != newKeys.length) {
|
||||
return true;
|
||||
}
|
||||
for (var i=0;i<newKeys.length;i++) {
|
||||
var p = newKeys[i];
|
||||
if (!redUtil.compareObjects(oldNode[p],newNode[p])) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
var EnvVarPropertyRE_old = /^\$\((\S+)\)$/;
|
||||
var EnvVarPropertyRE = /^\${(\S+)}$/;
|
||||
|
||||
function mapEnvVarProperties(obj,prop) {
|
||||
if (Buffer.isBuffer(obj[prop])) {
|
||||
return;
|
||||
} else if (Array.isArray(obj[prop])) {
|
||||
for (var i=0;i<obj[prop].length;i++) {
|
||||
mapEnvVarProperties(obj[prop],i);
|
||||
}
|
||||
} else if (typeof obj[prop] === 'string') {
|
||||
if (obj[prop][0] === "$" && (EnvVarPropertyRE_old.test(obj[prop]) || EnvVarPropertyRE.test(obj[prop])) ) {
|
||||
var envVar = obj[prop].substring(2,obj[prop].length-1);
|
||||
obj[prop] = process.env.hasOwnProperty(envVar)?process.env[envVar]:obj[prop];
|
||||
}
|
||||
} else {
|
||||
for (var p in obj[prop]) {
|
||||
if (obj[prop].hasOwnProperty(p)) {
|
||||
mapEnvVarProperties(obj[prop],p);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
||||
diffNodes: diffNodes,
|
||||
mapEnvVarProperties: mapEnvVarProperties,
|
||||
|
||||
parseConfig: function(config) {
|
||||
var flow = {};
|
||||
flow.allNodes = {};
|
||||
flow.subflows = {};
|
||||
flow.configs = {};
|
||||
flow.flows = {};
|
||||
flow.missingTypes = [];
|
||||
|
||||
config.forEach(function(n) {
|
||||
flow.allNodes[n.id] = clone(n);
|
||||
if (n.type === 'tab') {
|
||||
flow.flows[n.id] = n;
|
||||
flow.flows[n.id].subflows = {};
|
||||
flow.flows[n.id].configs = {};
|
||||
flow.flows[n.id].nodes = {};
|
||||
}
|
||||
});
|
||||
|
||||
config.forEach(function(n) {
|
||||
if (n.type === 'subflow') {
|
||||
flow.subflows[n.id] = n;
|
||||
flow.subflows[n.id].configs = {};
|
||||
flow.subflows[n.id].nodes = {};
|
||||
flow.subflows[n.id].instances = [];
|
||||
}
|
||||
});
|
||||
var linkWires = {};
|
||||
var linkOutNodes = [];
|
||||
config.forEach(function(n) {
|
||||
if (n.type !== 'subflow' && n.type !== 'tab') {
|
||||
var subflowDetails = subflowInstanceRE.exec(n.type);
|
||||
|
||||
if ( (subflowDetails && !flow.subflows[subflowDetails[1]]) || (!subflowDetails && !typeRegistry.get(n.type)) ) {
|
||||
if (flow.missingTypes.indexOf(n.type) === -1) {
|
||||
flow.missingTypes.push(n.type);
|
||||
}
|
||||
}
|
||||
var container = null;
|
||||
if (flow.flows[n.z]) {
|
||||
container = flow.flows[n.z];
|
||||
} else if (flow.subflows[n.z]) {
|
||||
container = flow.subflows[n.z];
|
||||
}
|
||||
if (n.hasOwnProperty('x') && n.hasOwnProperty('y')) {
|
||||
if (subflowDetails) {
|
||||
var subflowType = subflowDetails[1]
|
||||
n.subflow = subflowType;
|
||||
flow.subflows[subflowType].instances.push(n)
|
||||
}
|
||||
if (container) {
|
||||
container.nodes[n.id] = n;
|
||||
}
|
||||
} else {
|
||||
if (container) {
|
||||
container.configs[n.id] = n;
|
||||
} else {
|
||||
flow.configs[n.id] = n;
|
||||
flow.configs[n.id]._users = [];
|
||||
}
|
||||
}
|
||||
if (n.type === 'link in' && n.links) {
|
||||
// Ensure wires are present in corresponding link out nodes
|
||||
n.links.forEach(function(id) {
|
||||
linkWires[id] = linkWires[id]||{};
|
||||
linkWires[id][n.id] = true;
|
||||
})
|
||||
} else if (n.type === 'link out' && n.links) {
|
||||
linkWires[n.id] = linkWires[n.id]||{};
|
||||
n.links.forEach(function(id) {
|
||||
linkWires[n.id][id] = true;
|
||||
})
|
||||
linkOutNodes.push(n);
|
||||
}
|
||||
}
|
||||
});
|
||||
linkOutNodes.forEach(function(n) {
|
||||
var links = linkWires[n.id];
|
||||
var targets = Object.keys(links);
|
||||
n.wires = [targets];
|
||||
});
|
||||
|
||||
|
||||
var addedTabs = {};
|
||||
config.forEach(function(n) {
|
||||
if (n.type !== 'subflow' && n.type !== 'tab') {
|
||||
for (var prop in n) {
|
||||
if (n.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== 'type' && prop !== '_users' && flow.configs.hasOwnProperty(n[prop])) {
|
||||
// This property references a global config node
|
||||
flow.configs[n[prop]]._users.push(n.id)
|
||||
}
|
||||
}
|
||||
if (n.z && !flow.subflows[n.z]) {
|
||||
|
||||
if (!flow.flows[n.z]) {
|
||||
flow.flows[n.z] = {type:'tab',id:n.z};
|
||||
flow.flows[n.z].subflows = {};
|
||||
flow.flows[n.z].configs = {};
|
||||
flow.flows[n.z].nodes = {};
|
||||
addedTabs[n.z] = flow.flows[n.z];
|
||||
}
|
||||
if (addedTabs[n.z]) {
|
||||
if (n.hasOwnProperty('x') && n.hasOwnProperty('y')) {
|
||||
addedTabs[n.z].nodes[n.id] = n;
|
||||
} else {
|
||||
addedTabs[n.z].configs[n.id] = n;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
return flow;
|
||||
},
|
||||
|
||||
diffConfigs: function(oldConfig, newConfig) {
|
||||
var id;
|
||||
var node;
|
||||
var nn;
|
||||
var wires;
|
||||
var j,k;
|
||||
|
||||
if (!oldConfig) {
|
||||
oldConfig = {
|
||||
flows:{},
|
||||
allNodes:{}
|
||||
}
|
||||
}
|
||||
var changedSubflows = {};
|
||||
|
||||
var added = {};
|
||||
var removed = {};
|
||||
var changed = {};
|
||||
var wiringChanged = {};
|
||||
|
||||
var linkMap = {};
|
||||
|
||||
var changedTabs = {};
|
||||
|
||||
// Look for tabs that have been removed
|
||||
for (id in oldConfig.flows) {
|
||||
if (oldConfig.flows.hasOwnProperty(id) && (!newConfig.flows.hasOwnProperty(id))) {
|
||||
removed[id] = oldConfig.allNodes[id];
|
||||
}
|
||||
}
|
||||
|
||||
// Look for tabs that have been disabled
|
||||
for (id in oldConfig.flows) {
|
||||
if (oldConfig.flows.hasOwnProperty(id) && newConfig.flows.hasOwnProperty(id)) {
|
||||
var originalState = oldConfig.flows[id].disabled||false;
|
||||
var newState = newConfig.flows[id].disabled||false;
|
||||
if (originalState !== newState) {
|
||||
changedTabs[id] = true;
|
||||
if (originalState) {
|
||||
added[id] = oldConfig.allNodes[id];
|
||||
} else {
|
||||
removed[id] = oldConfig.allNodes[id];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (id in oldConfig.allNodes) {
|
||||
if (oldConfig.allNodes.hasOwnProperty(id)) {
|
||||
node = oldConfig.allNodes[id];
|
||||
if (node.type !== 'tab') {
|
||||
// build the map of what this node was previously wired to
|
||||
if (node.wires) {
|
||||
linkMap[node.id] = linkMap[node.id] || [];
|
||||
for (j=0;j<node.wires.length;j++) {
|
||||
wires = node.wires[j];
|
||||
for (k=0;k<wires.length;k++) {
|
||||
linkMap[node.id].push(wires[k]);
|
||||
nn = oldConfig.allNodes[wires[k]];
|
||||
if (nn) {
|
||||
linkMap[nn.id] = linkMap[nn.id] || [];
|
||||
linkMap[nn.id].push(node.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// This node has been removed
|
||||
if (removed[node.z] || !newConfig.allNodes.hasOwnProperty(id)) {
|
||||
removed[id] = node;
|
||||
// Mark the container as changed
|
||||
if (!removed[node.z] && newConfig.allNodes[removed[id].z]) {
|
||||
changed[removed[id].z] = newConfig.allNodes[removed[id].z];
|
||||
if (changed[removed[id].z].type === "subflow") {
|
||||
changedSubflows[removed[id].z] = changed[removed[id].z];
|
||||
//delete removed[id];
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (added[node.z]) {
|
||||
added[id] = node;
|
||||
} else {
|
||||
// This node has a material configuration change
|
||||
if (diffNodes(node,newConfig.allNodes[id]) || newConfig.allNodes[id].credentials) {
|
||||
changed[id] = newConfig.allNodes[id];
|
||||
if (changed[id].type === "subflow") {
|
||||
changedSubflows[id] = changed[id];
|
||||
}
|
||||
// Mark the container as changed
|
||||
if (newConfig.allNodes[changed[id].z]) {
|
||||
changed[changed[id].z] = newConfig.allNodes[changed[id].z];
|
||||
if (changed[changed[id].z].type === "subflow") {
|
||||
changedSubflows[changed[id].z] = changed[changed[id].z];
|
||||
delete changed[id];
|
||||
}
|
||||
}
|
||||
}
|
||||
// This node's wiring has changed
|
||||
if (!redUtil.compareObjects(node.wires,newConfig.allNodes[id].wires)) {
|
||||
wiringChanged[id] = newConfig.allNodes[id];
|
||||
// Mark the container as changed
|
||||
if (newConfig.allNodes[wiringChanged[id].z]) {
|
||||
changed[wiringChanged[id].z] = newConfig.allNodes[wiringChanged[id].z];
|
||||
if (changed[wiringChanged[id].z].type === "subflow") {
|
||||
changedSubflows[wiringChanged[id].z] = changed[wiringChanged[id].z];
|
||||
delete wiringChanged[id];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Look for added nodes
|
||||
for (id in newConfig.allNodes) {
|
||||
if (newConfig.allNodes.hasOwnProperty(id)) {
|
||||
node = newConfig.allNodes[id];
|
||||
// build the map of what this node is now wired to
|
||||
if (node.wires) {
|
||||
linkMap[node.id] = linkMap[node.id] || [];
|
||||
for (j=0;j<node.wires.length;j++) {
|
||||
wires = node.wires[j];
|
||||
for (k=0;k<wires.length;k++) {
|
||||
if (linkMap[node.id].indexOf(wires[k]) === -1) {
|
||||
linkMap[node.id].push(wires[k]);
|
||||
}
|
||||
nn = newConfig.allNodes[wires[k]];
|
||||
if (nn) {
|
||||
linkMap[nn.id] = linkMap[nn.id] || [];
|
||||
if (linkMap[nn.id].indexOf(node.id) === -1) {
|
||||
linkMap[nn.id].push(node.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// This node has been added
|
||||
if (!oldConfig.allNodes.hasOwnProperty(id)) {
|
||||
added[id] = node;
|
||||
// Mark the container as changed
|
||||
if (newConfig.allNodes[added[id].z]) {
|
||||
changed[added[id].z] = newConfig.allNodes[added[id].z];
|
||||
if (changed[added[id].z].type === "subflow") {
|
||||
changedSubflows[added[id].z] = changed[added[id].z];
|
||||
delete added[id];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var madeChange;
|
||||
// Loop through the nodes looking for references to changed config nodes
|
||||
// Repeat the loop if anything is marked as changed as it may need to be
|
||||
// propagated to parent nodes.
|
||||
// TODO: looping through all nodes every time is a bit inefficient - could be more targeted
|
||||
do {
|
||||
madeChange = false;
|
||||
for (id in newConfig.allNodes) {
|
||||
if (newConfig.allNodes.hasOwnProperty(id)) {
|
||||
node = newConfig.allNodes[id];
|
||||
for (var prop in node) {
|
||||
if (node.hasOwnProperty(prop) && prop != "z" && prop != "id" && prop != "wires") {
|
||||
// This node has a property that references a changed/removed node
|
||||
// Assume it is a config node change and mark this node as
|
||||
// changed.
|
||||
if (changed[node[prop]] || removed[node[prop]]) {
|
||||
if (!changed[node.id]) {
|
||||
madeChange = true;
|
||||
changed[node.id] = node;
|
||||
// This node exists within subflow template
|
||||
// Mark the template as having changed
|
||||
if (newConfig.allNodes[node.z]) {
|
||||
changed[node.z] = newConfig.allNodes[node.z];
|
||||
if (changed[node.z].type === "subflow") {
|
||||
changedSubflows[node.z] = changed[node.z];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (madeChange===true)
|
||||
|
||||
// Find any nodes that exist on a subflow template and remove from changed
|
||||
// list as the parent subflow will now be marked as containing a change
|
||||
for (id in newConfig.allNodes) {
|
||||
if (newConfig.allNodes.hasOwnProperty(id)) {
|
||||
node = newConfig.allNodes[id];
|
||||
if (newConfig.allNodes[node.z] && newConfig.allNodes[node.z].type === "subflow") {
|
||||
delete changed[node.id];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Recursively mark all instances of changed subflows as changed
|
||||
var changedSubflowStack = Object.keys(changedSubflows);
|
||||
while (changedSubflowStack.length > 0) {
|
||||
var subflowId = changedSubflowStack.pop();
|
||||
for (id in newConfig.allNodes) {
|
||||
if (newConfig.allNodes.hasOwnProperty(id)) {
|
||||
node = newConfig.allNodes[id];
|
||||
if (node.type === 'subflow:'+subflowId) {
|
||||
if (!changed[node.id]) {
|
||||
changed[node.id] = node;
|
||||
if (!changed[changed[node.id].z] && newConfig.allNodes[changed[node.id].z]) {
|
||||
changed[changed[node.id].z] = newConfig.allNodes[changed[node.id].z];
|
||||
if (newConfig.allNodes[changed[node.id].z].type === "subflow") {
|
||||
// This subflow instance is inside a subflow. Add the
|
||||
// containing subflow to the stack to mark
|
||||
changedSubflowStack.push(changed[node.id].z);
|
||||
delete changed[node.id];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var diff = {
|
||||
added:Object.keys(added),
|
||||
changed:Object.keys(changed),
|
||||
removed:Object.keys(removed),
|
||||
rewired:Object.keys(wiringChanged),
|
||||
linked:[]
|
||||
}
|
||||
|
||||
// Traverse the links of all modified nodes to mark the connected nodes
|
||||
var modifiedNodes = diff.added.concat(diff.changed).concat(diff.removed).concat(diff.rewired);
|
||||
var visited = {};
|
||||
while (modifiedNodes.length > 0) {
|
||||
node = modifiedNodes.pop();
|
||||
if (!visited[node]) {
|
||||
visited[node] = true;
|
||||
if (linkMap[node]) {
|
||||
if (!changed[node] && !added[node] && !removed[node] && !wiringChanged[node]) {
|
||||
diff.linked.push(node);
|
||||
}
|
||||
modifiedNodes = modifiedNodes.concat(linkMap[node]);
|
||||
}
|
||||
}
|
||||
}
|
||||
// console.log(diff);
|
||||
// for (id in newConfig.allNodes) {
|
||||
// console.log(
|
||||
// (added[id]?"+":(changed[id]?"!":" "))+(wiringChanged[id]?"w":" ")+(diff.linked.indexOf(id)!==-1?"~":" "),
|
||||
// id,
|
||||
// newConfig.allNodes[id].type,
|
||||
// newConfig.allNodes[id].name||newConfig.allNodes[id].label||""
|
||||
// );
|
||||
// }
|
||||
// for (id in removed) {
|
||||
// console.log(
|
||||
// "- "+(diff.linked.indexOf(id)!==-1?"~":" "),
|
||||
// id,
|
||||
// oldConfig.allNodes[id].type,
|
||||
// oldConfig.allNodes[id].name||oldConfig.allNodes[id].label||""
|
||||
// );
|
||||
// }
|
||||
|
||||
return diff;
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user