mirror of
https://github.com/node-red/node-red.git
synced 2025-03-01 10:36:34 +00:00
Add subflow support
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
**/
|
||||
|
||||
var util = require("util");
|
||||
var clone = require("clone");
|
||||
var when = require("when");
|
||||
|
||||
var typeRegistry = require("./registry");
|
||||
@@ -25,6 +26,7 @@ var events = require("../events");
|
||||
var storage = null;
|
||||
|
||||
var nodes = {};
|
||||
var subflows = {};
|
||||
var activeConfig = [];
|
||||
var missingTypes = [];
|
||||
|
||||
@@ -40,6 +42,95 @@ events.on('type-registered',function(type) {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
function getID() {
|
||||
return (1+Math.random()*4294967295).toString(16);
|
||||
}
|
||||
|
||||
function createSubflow(sf,sfn) {
|
||||
var node_map = {};
|
||||
var newNodes = [];
|
||||
var node;
|
||||
var wires;
|
||||
var i,j,k;
|
||||
|
||||
// Clone all of the subflow node definitions and give them new IDs
|
||||
for (i=0;i<sf.nodes.length;i++) {
|
||||
node = clone(sf.nodes[i]);
|
||||
var nid = getID();
|
||||
node_map[node.id] = node;
|
||||
node.id = nid;
|
||||
newNodes.push(node);
|
||||
}
|
||||
// Update all subflow interior wiring to reflect new node IDs
|
||||
for (i=0;i<newNodes.length;i++) {
|
||||
node = newNodes[i];
|
||||
var outputs = node.wires;
|
||||
|
||||
for (j=0;j<outputs.length;j++) {
|
||||
wires = outputs[j];
|
||||
for (k=0;k<wires.length;k++) {
|
||||
outputs[j][k] = node_map[outputs[j][k]].id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create a subflow node to accept inbound messages and route appropriately
|
||||
var Node = require("./Node");
|
||||
var subflowInstance = {
|
||||
id: sfn.id,
|
||||
name: sfn.name,
|
||||
wires: []
|
||||
}
|
||||
if (sf.in) {
|
||||
subflowInstance.wires = sf.in.map(function(n) { return n.wires.map(function(w) { return node_map[w.id].id;})})
|
||||
}
|
||||
var subflowNode = new Node(subflowInstance);
|
||||
subflowNode.on("input", function(msg) { this.send(msg);});
|
||||
|
||||
// Wire the subflow outputs
|
||||
if (sf.out) {
|
||||
for (i=0;i<sf.out.length;i++) {
|
||||
wires = sf.out[i].wires;
|
||||
for (j=0;j<wires.length;j++) {
|
||||
if (wires[j].id === sf.id) {
|
||||
node = subflowNode;
|
||||
delete subflowNode._wire;
|
||||
} else {
|
||||
node = node_map[wires[j].id];
|
||||
}
|
||||
node.wires[wires[j].port] = node.wires[wires[j].port].concat(sfn.wires[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Instantiate the nodes
|
||||
for (i=0;i<newNodes.length;i++) {
|
||||
node = newNodes[i];
|
||||
var nn = null;
|
||||
var type = node.type;
|
||||
|
||||
var m = /^subflow:(.+)$/.exec(type);
|
||||
if (!m) {
|
||||
var nt = typeRegistry.get(type);
|
||||
if (nt) {
|
||||
try {
|
||||
nn = new nt(node);
|
||||
}
|
||||
catch (err) {
|
||||
util.log("[red] "+type+" : "+err);
|
||||
}
|
||||
}
|
||||
if (nn === null) {
|
||||
util.log("[red] unknown type: "+type);
|
||||
}
|
||||
} else {
|
||||
var subflowId = m[1];
|
||||
createSubflow(subflows[subflowId],node);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the current activeConfig and creates the required node instances
|
||||
@@ -47,13 +138,15 @@ events.on('type-registered',function(type) {
|
||||
function parseConfig() {
|
||||
var i;
|
||||
var nt;
|
||||
var type;
|
||||
var subflow;
|
||||
missingTypes = [];
|
||||
|
||||
// Scan the configuration for any unknown node types
|
||||
for (i=0;i<activeConfig.length;i++) {
|
||||
var type = activeConfig[i].type;
|
||||
type = activeConfig[i].type;
|
||||
// TODO: remove workspace in next release+1
|
||||
if (type != "workspace" && type != "tab") {
|
||||
if (type != "workspace" && type != "tab" && !/^subflow($|:.+$)/.test(type)) {
|
||||
nt = typeRegistry.get(type);
|
||||
if (!nt && missingTypes.indexOf(type) == -1) {
|
||||
missingTypes.push(type);
|
||||
@@ -71,25 +164,50 @@ function parseConfig() {
|
||||
|
||||
util.log("[red] Starting flows");
|
||||
events.emit("nodes-starting");
|
||||
|
||||
for (i=0;i<activeConfig.length;i++) {
|
||||
type = activeConfig[i].type;
|
||||
if (type === "subflow") {
|
||||
subflow = activeConfig[i];
|
||||
subflow.nodes = [];
|
||||
subflow.instances = [];
|
||||
subflows[subflow.id] = subflow;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
for (i=0;i<activeConfig.length;i++) {
|
||||
if (subflows[activeConfig[i].z]) {
|
||||
subflow = subflows[activeConfig[i].z];
|
||||
subflow.nodes.push(activeConfig[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Instantiate each node in the flow
|
||||
for (i=0;i<activeConfig.length;i++) {
|
||||
var nn = null;
|
||||
// TODO: remove workspace in next release+1
|
||||
if (activeConfig[i].type != "workspace" && activeConfig[i].type != "tab") {
|
||||
nt = typeRegistry.get(activeConfig[i].type);
|
||||
if (nt) {
|
||||
try {
|
||||
nn = new nt(activeConfig[i]);
|
||||
type = activeConfig[i].type;
|
||||
|
||||
var m = /^subflow:(.+)$/.exec(type);
|
||||
if (!m) {
|
||||
// TODO: remove workspace in next release+1
|
||||
if (type != "workspace" && type != "tab" && type != "subflow" && !subflows[activeConfig[i].z]) {
|
||||
nt = typeRegistry.get(type);
|
||||
if (nt) {
|
||||
try {
|
||||
nn = new nt(activeConfig[i]);
|
||||
}
|
||||
catch (err) {
|
||||
util.log("[red] "+type+" : "+err);
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
util.log("[red] "+activeConfig[i].type+" : "+err);
|
||||
if (nn === null) {
|
||||
util.log("[red] unknown type: "+type);
|
||||
}
|
||||
}
|
||||
// console.log(nn);
|
||||
if (nn === null) {
|
||||
util.log("[red] unknown type: "+activeConfig[i].type);
|
||||
}
|
||||
} else {
|
||||
var subflowId = m[1];
|
||||
createSubflow(subflows[subflowId],activeConfig[i]);
|
||||
}
|
||||
}
|
||||
// Clean up any orphaned credentials
|
||||
|
Reference in New Issue
Block a user