Add deployment types in runtime

- removes ui option as it needs work
This commit is contained in:
Nick O'Leary 2015-01-08 22:34:26 +00:00
parent 89fff339d5
commit cf1371bfdf
11 changed files with 839 additions and 415 deletions

View File

@ -32,14 +32,13 @@
<div id="header">
<span class="logo"><img src="node-red.png"> <span>Node-RED</span></span>
<div class="pull-right">
<span class="deploy-button-group button-group">
<a id="btn-deploy" class="action-deploy disabled" href="#"><i id="btn-icn-deploy" class="fa fa-download"></i> Deploy</a>
<a id="btn-deploy-options" data-toggle="dropdown" class="" href="#"><i class="fa fa-caret-down"></i></a>
<ul class="dropdown-menu" role="menu">
<li>Foo</li>
</ul>
</span>
<a id="btn-sidemenu" class="button dropdown-toggle" data-toggle="dropdown" href="#"><i class="fa fa-bars"></i></a>
<ul class="header-toolbar">
<li><span class="deploy-button-group button-group">
<a id="btn-deploy" class="action-deploy disabled" href="#"><i id="btn-icn-deploy" class="fa fa-download"></i> <span>Deploy</span></a>
<!-- <a id="btn-deploy-options" data-toggle="dropdown" class="" href="#"><i class="fa fa-caret-down"></i></a> -->
</span></li>
<li><a id="btn-sidemenu" class="button" data-toggle="dropdown" href="#"><i class="fa fa-bars"></i></a></li>
<ul>
</div>
</div>
<div id="main-container" class="sidebar-closed">

View File

@ -15,6 +15,14 @@
**/
var RED = (function() {
var deploymentTypes = {
"full":"Deploy",
"nodes":"Deploy changed nodes",
"flows":"Deploy changed flows"
}
var deploymentType = "full";
function hideDropTarget() {
$("#dropTarget").hide();
RED.keyboard.remove(/* ESCAPE */ 27);
@ -82,7 +90,10 @@ var RED = (function() {
url:"flows",
type: "POST",
data: JSON.stringify(nns),
contentType: "application/json; charset=utf-8"
contentType: "application/json; charset=utf-8",
headers: {
"Node-RED-Deployment-Type":deploymentType
}
}).done(function(data,textStatus,xhr) {
RED.notify("Successfully deployed","success");
RED.nodes.eachNode(function(node) {
@ -280,6 +291,12 @@ var RED = (function() {
dialog.modal();
}
function changeDeploymentType(type) {
deploymentType = type;
$("#btn-deploy span").text(deploymentTypes[type]);
}
$(function() {
RED.menu.init({id:"btn-sidemenu",
options: [
@ -312,6 +329,15 @@ var RED = (function() {
{id:"btn-help",icon:"fa fa-question",label:"Help...", href:"http://nodered.org/docs"}
]
});
//RED.menu.init({id:"btn-deploy-options",
// options: [
// {id:"btn-deploy-select",label:"Select deployment type"},
// {id:"btn-deploy-full",icon:null,label:"Full deploy",tip:"Deploys all nodes",onselect:function() { changeDeploymentType("full")}},
// {id:"btn-deploy-node",icon:null,label:"Deploy changed nodes",tip:"Deploys all nodes that have been changed",onselect:function() { changeDeploymentType("nodes")}},
// {id:"btn-deploy-flow",icon:null,label:"Deploy changed flows",tip:"Deploys all nodes in flows that contain changes",onselect:function() { changeDeploymentType("flows")}}
// ]
//});
RED.keyboard.add(/* ? */ 191,{shift:true},function(){showHelp();d3.event.preventDefault();});
loadSettings();

View File

@ -394,7 +394,7 @@ RED.nodes = (function() {
var wires = links.filter(function(d) { return d.source === p });
for (var i=0;i<wires.length;i++) {
var w = wires[i];
if (w.target.id != p.id) {
if (w.target.type != "subflow") {
nIn.wires.push({id:w.target.id})
}
}

View File

@ -47,8 +47,8 @@ RED.menu = (function() {
item = $('<li></li>');
var link = $('<a '+(opt.id?'id="'+opt.id+'" ':'')+'tabindex="-1" href="#">'+
(opt.toggle?'<i class="fa fa-check pull-right"></i>':'')+
(opt.icon?'<i class="'+opt.icon+'"></i> ':'')+
opt.label+
(opt.icon!==undefined?'<i class="'+(opt.icon?opt.icon:'" style="display: inline-block;"')+'"></i> ':"")+
'<span class="menu-label">'+opt.label+'</span>'+
'</a>').appendTo(item);
menuItems[opt.id] = opt;
@ -67,6 +67,11 @@ RED.menu = (function() {
setState();
} else if (opt.href) {
link.attr("target","_blank").attr("href",opt.href);
} else if (!opt.options) {
item.addClass("disabled");
link.click(function(event) {
event.preventDefault();
});
}
if (opt.options) {
item.addClass("dropdown-submenu pull-left");
@ -79,6 +84,17 @@ RED.menu = (function() {
if (opt.disabled) {
item.addClass("disabled");
}
if (opt.tip) {
item.popover({
placement:"left",
trigger: "hover",
delay: { show: 350, hide: 20 },
html: true,
container:'body',
content: opt.tip
});
}
}
@ -88,8 +104,8 @@ RED.menu = (function() {
function createMenu(options) {
var button = $("#"+options.id);
var topMenu = $("<ul/>",{class:"dropdown-menu"}).insertAfter(button);
var topMenu = $("<ul/>",{id:options.id+"-submenu", class:"dropdown-menu pull-right"}).insertAfter(button);
for (var i=0;i<options.options.length;i++) {
var opt = options.options[i];

View File

@ -74,6 +74,19 @@ span.logo span {
span.logo img {
height: 18px;
}
#header ul.header-toolbar {
padding: 0;
margin: 0;
list-style: none;
}
#header ul.header-toolbar > li {
display: inline-block;
padding: 0;
margin: 0;
position: relative;
}
.button {
-webkit-user-select: none;
-khtml-user-select: none;
@ -83,6 +96,8 @@ span.logo img {
}
#header .button {
min-width: 20px;
text-align: center;
line-height: 22px;
display: inline-block;
font-size: 14px;
@ -90,7 +105,7 @@ span.logo img {
text-decoration: none;
border-radius: 3px;
color: #ccc;
margin: auto 10px;
margin: auto 5px;
vertical-align: middle;
}
#header .button:not(.disabled):hover {

View File

@ -28,11 +28,13 @@ module.exports = {
},
post: function(req,res) {
var flows = req.body;
redNodes.setFlows(flows).then(function() {
var deploymentType = req.get("Node-RED-Deployment-Type")||"full";
redNodes.setFlows(flows,deploymentType).then(function() {
res.send(204);
}).otherwise(function(err) {
util.log("[red] Error saving flows : "+err);
res.send(500,err.message);
console.log(err.stack);
});
}
}

638
red/nodes/Flow.js Normal file
View File

@ -0,0 +1,638 @@
/**
* Copyright 2014 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var util = require("util");
var when = require("when");
var clone = require("clone");
var typeRegistry = require("./registry");
var credentials = require("./credentials");
var redUtil = require("../util");
var events = require("../events");
function getID() {
return (1+Math.random()*4294967295).toString(16);
}
function createNode(type,config) {
var nn = null;
var nt = typeRegistry.get(type);
if (nt) {
try {
nn = new nt(clone(config));
}
catch (err) {
util.log("[red] "+type+" : "+err);
}
} else {
util.log("[red] unknown type: "+type);
}
return nn;
}
function createSubflow(sf,sfn,subflows) {
//console.log("CREATE SUBFLOW",sf.config.id,sfn.id);
var nodes = [];
var node_map = {};
var newNodes = [];
var node;
var wires;
var i,j,k;
// Clone all of the subflow node definitions and give them new IDs
for (i=0;i<sf.nodes.length;i++) {
node = clone(sf.nodes[i].config);
var nid = getID();
node_map[node.id] = node;
node._alias = node.id;
node.id = nid;
newNodes.push(node);
}
// Update all subflow interior wiring to reflect new node IDs
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
var outputs = node.wires;
for (j=0;j<outputs.length;j++) {
wires = outputs[j];
for (k=0;k<wires.length;k++) {
outputs[j][k] = node_map[outputs[j][k]].id
}
}
}
// Create a subflow node to accept inbound messages and route appropriately
var Node = require("./Node");
var subflowInstance = {
id: sfn.id,
type: sfn.type,
name: sfn.name,
wires: []
}
if (sf.config.in) {
subflowInstance.wires = sf.config.in.map(function(n) { return n.wires.map(function(w) { return node_map[w.id].id;})})
subflowInstance._originalWires = clone(subflowInstance.wires);
}
var subflowNode = new Node(subflowInstance);
subflowNode.on("input", function(msg) { this.send(msg);});
subflowNode._updateWires = subflowNode.updateWires;
subflowNode.updateWires = function(newWires) {
// Wire the subflow outputs
if (sf.config.out) {
var node,wires,i,j;
// Restore the original wiring to the internal nodes
subflowInstance.wires = clone(subflowInstance._originalWires);
for (i=0;i<sf.config.out.length;i++) {
wires = sf.config.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id != sf.config.id) {
node = node_map[wires[j].id];
if (node._originalWires) {
node.wires = clone(node._originalWires);
}
}
}
}
var modifiedNodes = {};
var subflowInstanceModified = false;
for (i=0;i<sf.config.out.length;i++) {
wires = sf.config.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id === sf.config.id) {
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(newWires[i]);
subflowInstanceModified = true;
} else {
node = node_map[wires[j].id];
node.wires[wires[j].port] = node.wires[wires[j].port].concat(newWires[i]);
modifiedNodes[node.id] = node;
}
}
}
}
Object.keys(modifiedNodes).forEach(function(id) {
var node = modifiedNodes[id];
subflowNode.instanceNodes[id].updateWires(node.wires);
});
if (subflowInstanceModified) {
subflowNode._updateWires(subflowInstance.wires);
}
}
nodes.push(subflowNode);
// Wire the subflow outputs
if (sf.config.out) {
var modifiedNodes = {};
for (i=0;i<sf.config.out.length;i++) {
wires = sf.config.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id === sf.config.id) {
// A subflow input wired straight to a subflow output
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(sfn.wires[i])
subflowNode._updateWires(subflowInstance.wires);
} else {
node = node_map[wires[j].id];
modifiedNodes[node.id] = node;
if (!node._originalWires) {
node._originalWires = clone(node.wires);
}
node.wires[wires[j].port] = node.wires[wires[j].port].concat(sfn.wires[i]);
}
}
}
}
// Instantiate the nodes
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
var type = node.type;
var m = /^subflow:(.+)$/.exec(type);
if (!m) {
nodes.push(createNode(type,node));
} else {
var subflowId = m[1];
nodes = nodes.concat(createSubflow(subflows[subflowId],node,subflows));
}
}
subflowNode.instanceNodes = {};
nodes.forEach(function(node) {
subflowNode.instanceNodes[node.id] = node;
});
return nodes;
}
function diffNodeConfigs(oldNode,newNode) {
if (oldNode == null) {
return true;
} else {
for (var p in newNode) {
if (newNode.hasOwnProperty(p) && p != "x" && p != "y" && p != "wires") {
if (!redUtil.compareObjects(oldNode[p],newNode[p])) {
return true;
}
}
}
}
return false;
}
var subflowInstanceRE = /^subflow:(.+)$/;
function Flow(config) {
this.activeNodes = {};
this.subflowInstanceNodes = {};
this.parseConfig(config);
}
Flow.prototype.parseConfig = function(config) {
var i;
this.config = config;
this.allNodes = {};
this.nodes = {};
this.subflows = {};
var unknownTypes = {};
for (i=0;i<this.config.length;i++) {
var nodeConfig = this.config[i];
var nodeType = nodeConfig.type;
if (nodeType == "subflow") {
this.subflows[nodeConfig.id] = {
type: "subflow",
config: nodeConfig,
nodes: []
}
}
}
//console.log("Known subflows:",Object.keys(this.subflows));
for (i=0;i<this.config.length;i++) {
var nodeConfig = this.config[i];
this.allNodes[nodeConfig.id] = nodeConfig;
var nodeType = nodeConfig.type;
if (nodeConfig.credentials) {
credentials.extract(nodeConfig);
delete nodeConfig.credentials;
}
if (nodeType != "tab" && nodeType != "subflow") {
var m = subflowInstanceRE.exec(nodeType);
if ((m && !this.subflows[m[1]]) || (!m && !typeRegistry.get(nodeType))) {
// This is an unknown subflow or an unknown type
unknownTypes[nodeType] = true;
} else {
var nodeInfo = {
type: nodeType,
config:nodeConfig
}
if (m) {
nodeInfo.subflow = m[1];
}
if (this.subflows[nodeConfig.z]) {
this.subflows[nodeConfig.z].nodes.push(nodeInfo);
} else {
this.nodes[nodeConfig.id] = nodeInfo;
}
}
}
}
//console.log("NODES");
//for (i in this.nodes) {
// if (this.nodes.hasOwnProperty(i)) {
// console.log(" ",i,this.nodes[i].type,this.nodes[i].config.name||"");
// }
//}
//console.log("SUBFLOWS");
//for (i in this.subflows) {
// if (this.subflows.hasOwnProperty(i)) {
// console.log(" ",i,this.subflows[i].type,this.subflows[i].config.name||"");
// for (var j=0;j<this.subflows[i].nodes.length;j++) {
// console.log(" ",this.subflows[i].nodes[j].config.id,this.subflows[i].nodes[j].type,this.subflows[i].nodes[j].config.name||"");
// }
// }
//}
this.missingTypes = Object.keys(unknownTypes);
}
Flow.prototype.start = function() {
if (this.missingTypes.length > 0) {
throw new Error("missing types");
}
events.emit("nodes-starting");
for (var id in this.nodes) {
if (this.nodes.hasOwnProperty(id)) {
var node = this.nodes[id];
if (!node.subflow) {
if (!this.activeNodes[id]) {
this.activeNodes[id] = createNode(node.type,node.config);
//console.log(id,"created");
} else {
//console.log(id,"already running");
}
} else {
if (!this.subflowInstanceNodes[id]) {
var nodes = createSubflow(this.subflows[node.subflow],node.config,this.subflows);
this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
for (var i=0;i<nodes.length;i++) {
this.activeNodes[nodes[i].id] = nodes[i];
}
//console.log(id,"(sf) created");
} else {
//console.log(id,"(sf) already running");
}
}
}
}
credentials.clean(this.config);
events.emit("nodes-started");
}
Flow.prototype.stop = function(nodeList) {
nodeList = nodeList || Object.keys(this.activeNodes);
var flow = this;
return when.promise(function(resolve) {
events.emit("nodes-stopping");
var promises = [];
for (var i=0;i<nodeList.length;i++) {
var node = flow.activeNodes[nodeList[i]];
if (node) {
try {
var p = node.close();
if (p) {
promises.push(p);
}
} catch(err) {
node.error(err);
}
delete flow.subflowInstanceNodes[nodeList[i]];
delete flow.activeNodes[nodeList[i]];
}
}
when.settle(promises).then(function() {
events.emit("nodes-stopped");
resolve();
});
});
}
Flow.prototype.getMissingTypes = function() {
return this.missingTypes;
}
Flow.prototype.typeRegistered = function(type) {
if (this.missingTypes.length > 0) {
var i = this.missingTypes.indexOf(type);
if (i != -1) {
this.missingTypes.splice(i,1);
if (this.missingTypes.length === 0) {
this.start();
}
}
}
}
Flow.prototype.getNode = function(id) {
return this.activeNodes[id];
}
Flow.prototype.getFlow = function() {
//console.log(this.config);
return this.config;
}
Flow.prototype.applyConfig = function(config,type) {
var diff = this.diffFlow(config);
//console.log(diff);
//var diff = {
// deleted:[]
// changed:[]
// linked:[]
// wiringChanged: []
//}
var nodesToStop = [];
var nodesToCreate = [];
var nodesToRewire = diff.wiringChanged;
if (type == "nodes") {
nodesToStop = diff.deleted.concat(diff.changed);
nodesToCreate = diff.changed;
} else if (type == "flows") {
nodesToStop = diff.deleted.concat(diff.changed).concat(diff.linked);
nodesToCreate = diff.changed.concat(diff.linked);
}
var activeNodesToStop = [];
for (var i=0;i<nodesToStop.length;i++) {
var id = nodesToStop[i];
if (this.subflowInstanceNodes[id]) {
activeNodesToStop = activeNodesToStop.concat(this.subflowInstanceNodes[id]);
} else if (this.activeNodes[id]) {
activeNodesToStop.push(id);
}
}
//console.log(activeNodesToStop);
var flow = this;
return this.stop(activeNodesToStop).then(function() {
flow.parseConfig(config);
for (var i=0;i<nodesToRewire.length;i++) {
var node = flow.activeNodes[nodesToRewire[i]];
if (node) {
node.updateWires(flow.allNodes[node.id].wires);
}
}
flow.start();
})
}
Flow.prototype.diffFlow = function(config) {
var flow = this;
var configNodes = {};
var changedNodes = {};
var deletedNodes = {};
var linkChangedNodes = {};
var activeLinks = {};
var newLinks = {};
var changedSubflowStack = [];
var changedSubflows = {};
var buildNodeLinks = function(nodeLinks,n,nodes) {
nodeLinks[n.id] = nodeLinks[n.id] || [];
if (n.wires) {
for (var j=0;j<n.wires.length;j++) {
var wires = n.wires[j];
for (var k=0;k<wires.length;k++) {
nodeLinks[n.id].push(wires[k]);
var nn = nodes[wires[k]];
if (nn) {
nodeLinks[nn.id] = nodeLinks[nn.id] || [];
nodeLinks[nn.id].push(n.id);
}
}
}
}
}
config.forEach(function(node) {
configNodes[node.id] = node;
});
config.forEach(function(node) {
var changed = false;
if (node.credentials) {
changed = true;
delete node.credentials;
} else {
changed = diffNodeConfigs(flow.allNodes[node.id],node);
if (!changed) {
if (configNodes[node.z] && configNodes[node.z].type == "subflow") {
var originalNode = flow.allNodes[node.id];
if (originalNode && !redUtil.compareObjects(originalNode.wires,node.wires)) {
// This is a node in a subflow whose wiring has changed. Mark subflow as changed
changed = true;
}
}
}
}
if (changed) {
changedNodes[node.id] = node;
}
});
this.config.forEach(function(node) {
if (!configNodes[node.id]) {
deletedNodes[node.id] = node;
}
buildNodeLinks(activeLinks,node,flow.allNodes);
});
this.config.forEach(function(node) {
for (var prop in node) {
if (node.hasOwnProperty(prop) && prop != "z") {
// This node has a property that references a changed node
// Assume it is a config node change and mark this node as
// changed.
if (changedNodes[node[prop]]) {
changedNodes[node.id] = node;
}
}
}
});
var checkSubflowMembership = function(nodes,id) {
var node = nodes[id];
if (node) {
if (node.type == "subflow") {
changedSubflows[id] = node;
changedSubflowStack.push(id);
} else if (nodes[node.z] && nodes[node.z].type == "subflow") {
if (!changedSubflows[node.z]) {
changedSubflows[node.z] = nodes[node.z];
changedSubflowStack.push(node.z);
}
}
}
};
Object.keys(changedNodes).forEach(function(n) { checkSubflowMembership(configNodes,n)});
Object.keys(deletedNodes).forEach(function(n) { checkSubflowMembership(flow.allNodes,n)});
while (changedSubflowStack.length > 0) {
var subflowId = changedSubflowStack.pop();
config.forEach(function(node) {
if (node.type == "subflow:"+subflowId) {
if (!changedNodes[node.id]) {
changedNodes[node.id] = node;
checkSubflowMembership(configNodes,node.id);
}
}
});
}
config.forEach(function(node) {
buildNodeLinks(newLinks,node,configNodes);
});
var markLinkedNodes = function(linkChanged,changedNodes,linkMap,allNodes) {
var stack = Object.keys(changedNodes);
var visited = {};
while(stack.length > 0) {
var id = stack.pop();
var linkedNodes = linkMap[id];
if (linkedNodes) {
for (var i=0;i<linkedNodes.length;i++) {
linkedNodeId = linkedNodes[i];
if (changedNodes[linkedNodeId] || linkChanged[linkedNodeId]) {
// Do nothing - this linked node is already marked as changed, so will get done
} else {
linkChanged[linkedNodeId] = true;
stack.push(linkedNodeId);
}
}
}
}
}
markLinkedNodes(linkChangedNodes,changedNodes,newLinks,configNodes);
markLinkedNodes(linkChangedNodes,deletedNodes,activeLinks,flow.allNodes);
var modifiedLinkNodes = {};
config.forEach(function(node) {
if (!changedNodes[node.id]) {
// only concerned about unchanged nodes whose wiring may have changed
var newNodeLinks = newLinks[node.id];
var oldNodeLinks = activeLinks[node.id];
var newLinkMap = {};
newNodeLinks.forEach(function(l) { newLinkMap[l] = (newLinkMap[l]||0)+1;});
var oldLinkMap = {};
oldNodeLinks.forEach(function(l) { oldLinkMap[l] = (oldLinkMap[l]||0)+1;});
newNodeLinks.forEach(function(link) {
if (newLinkMap[link] != oldLinkMap[link]) {
modifiedLinkNodes[node.id] = node;
modifiedLinkNodes[link] = configNodes[link];
linkChangedNodes[node.id] = node;
linkChangedNodes[link] = configNodes[link];
}
});
oldNodeLinks.forEach(function(link) {
if (newLinkMap[link] != oldLinkMap[link]) {
modifiedLinkNodes[node.id] = node;
modifiedLinkNodes[link] = configNodes[link];
linkChangedNodes[node.id] = node;
linkChangedNodes[link] = configNodes[link];
}
});
}
});
markLinkedNodes(linkChangedNodes,modifiedLinkNodes,newLinks,configNodes);
//config.forEach(function(n) {
// console.log((changedNodes[n.id]!=null)?"[C]":"[ ]",(linkChangedNodes[n.id]!=null)?"[L]":"[ ]","[ ]",n.id,n.type,n.name);
//});
//Object.keys(deletedNodes).forEach(function(id) {
// var n = flow.allNodes[id];
// console.log("[ ] [ ] [D]",n.id,n.type);
//});
var diff = {
deleted: Object.keys(deletedNodes),
changed: Object.keys(changedNodes),
linked: Object.keys(linkChangedNodes),
wiringChanged: []
}
config.forEach(function(n) {
if (!configNodes[n.z] || configNodes[n.z].type != "subflow") {
var originalNode = flow.allNodes[n.id];
if (originalNode && !redUtil.compareObjects(originalNode.wires,n.wires)) {
diff.wiringChanged.push(n.id);
}
}
});
return diff;
}
module.exports = Flow;

View File

@ -25,12 +25,19 @@ var comms = require("../comms");
function Node(n) {
this.id = n.id;
flows.add(this);
this.type = n.type;
if (n.name) {
this.name = n.name;
}
this.wires = n.wires || [];
flows.add(this);
this.updateWires(n.wires);
}
util.inherits(Node, EventEmitter);
Node.prototype.updateWires = function(wires) {
this.wires = wires || [];
delete this._wire;
var wc = 0;
this.wires.forEach(function(w) {
@ -40,14 +47,16 @@ function Node(n) {
if (wc === 0) {
// With nothing wired to the node, no-op send
this.send = function(msg) {}
} else if (this.wires.length === 1 && this.wires[0].length === 1) {
// Single wire, so we can shortcut the send when
// a single message is sent
this._wire = this.wires[0][0];
} else {
this.send = Node.prototype.send;
if (this.wires.length === 1 && this.wires[0].length === 1) {
// Single wire, so we can shortcut the send when
// a single message is sent
this._wire = this.wires[0][0];
}
}
}
util.inherits(Node, EventEmitter);
}
Node.prototype._on = Node.prototype.on;
@ -75,6 +84,7 @@ Node.prototype.close = function() {};
Node.prototype.send = function(msg) {
console.log(this.id,this.type,this.wires);
var msgSent = false;
var node;

View File

@ -111,15 +111,18 @@ module.exports = {
/**
* Deletes any credentials for nodes that no longer exist
* @param getNode a function that can return a node for a given id
* @param config a flow config
* @return a promise for the saving of credentials to storage
*/
clean: function (getNode) {
clean: function (config) {
var existingIds = {};
config.forEach(function(n) {
existingIds[n.id] = true;
});
var deletedCredentials = false;
for (var c in credentialCache) {
if (credentialCache.hasOwnProperty(c)) {
var n = getNode(c);
if (!n) {
if (!existingIds[c]) {
deletedCredentials = true;
delete credentialCache[c];
}
@ -164,7 +167,6 @@ module.exports = {
var dashedType = nodeType.replace(/\s+/g, '-');
var definition = credentialsDef[dashedType];
if (!definition) {
util.log('Credential Type ' + nodeType + ' is not registered.');
return;

View File

@ -20,234 +20,27 @@ var when = require("when");
var typeRegistry = require("./registry");
var credentials = require("./credentials");
var Flow = require("./Flow");
var log = require("../log");
var events = require("../events");
var redUtil = require("../util");
var storage = null;
var activeFlow = null;
var nodes = {};
var subflows = {};
var activeConfig = [];
var activeConfigNodes = {};
var missingTypes = [];
events.on('type-registered',function(type) {
if (missingTypes.length > 0) {
var i = missingTypes.indexOf(type);
if (i != -1) {
missingTypes.splice(i,1);
util.log("[red] Missing type registered: "+type);
if (missingTypes.length === 0) {
parseConfig();
}
}
if (activeFlow) {
if (activeFlow.typeRegistered(type)) {
util.log("[red] Missing type registered: "+type);
}
}
});
function getID() {
return (1+Math.random()*4294967295).toString(16);
}
function createSubflow(sf,sfn) {
var node_map = {};
var newNodes = [];
var node;
var wires;
var i,j,k;
// Clone all of the subflow node definitions and give them new IDs
for (i=0;i<sf.nodes.length;i++) {
node = clone(sf.nodes[i]);
var nid = getID();
node_map[node.id] = node;
node.id = nid;
newNodes.push(node);
}
// Update all subflow interior wiring to reflect new node IDs
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
var outputs = node.wires;
for (j=0;j<outputs.length;j++) {
wires = outputs[j];
for (k=0;k<wires.length;k++) {
outputs[j][k] = node_map[outputs[j][k]].id
}
}
}
// Create a subflow node to accept inbound messages and route appropriately
var Node = require("./Node");
var subflowInstance = {
id: sfn.id,
type: sfn.type,
name: sfn.name,
wires: []
}
if (sf.in) {
subflowInstance.wires = sf.in.map(function(n) { return n.wires.map(function(w) { return node_map[w.id].id;})})
}
var subflowNode = new Node(subflowInstance);
subflowNode.on("input", function(msg) { this.send(msg);});
// Wire the subflow outputs
if (sf.out) {
for (i=0;i<sf.out.length;i++) {
wires = sf.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id === sf.id) {
node = subflowNode;
delete subflowNode._wire;
} else {
node = node_map[wires[j].id];
}
node.wires[wires[j].port] = node.wires[wires[j].port].concat(sfn.wires[i]);
}
}
}
// Instantiate the nodes
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
var nn = null;
var type = node.type;
var m = /^subflow:(.+)$/.exec(type);
if (!m) {
var nt = typeRegistry.get(type);
if (nt) {
try {
nn = new nt(node);
}
catch (err) {
util.log("[red] "+type+" : "+err);
}
}
if (nn === null) {
util.log("[red] unknown type: "+type);
}
} else {
var subflowId = m[1];
createSubflow(subflows[subflowId],node);
}
}
}
/**
* Parses the current activeConfig and creates the required node instances
*/
function parseConfig() {
var i;
var nt;
var type;
var subflow;
missingTypes = [];
activeConfigNodes = {};
// Scan the configuration for any unknown node types
for (i=0;i<activeConfig.length;i++) {
type = activeConfig[i].type;
// TODO: remove workspace in next release+1
if (type != "workspace" && type != "tab" && !/^subflow($|:.+$)/.test(type)) {
nt = typeRegistry.get(type);
if (!nt && missingTypes.indexOf(type) == -1) {
missingTypes.push(type);
}
}
}
// Abort if there are any missing types
if (missingTypes.length > 0) {
util.log("[red] Waiting for missing types to be registered:");
for (i=0;i<missingTypes.length;i++) {
util.log("[red] - "+missingTypes[i]);
}
return;
}
util.log("[red] Starting flows");
events.emit("nodes-starting");
for (i=0;i<activeConfig.length;i++) {
type = activeConfig[i].type;
if (type === "subflow") {
subflow = activeConfig[i];
subflow.nodes = [];
subflow.instances = [];
subflows[subflow.id] = subflow;
}
}
for (i=0;i<activeConfig.length;i++) {
if (subflows[activeConfig[i].z]) {
subflow = subflows[activeConfig[i].z];
subflow.nodes.push(activeConfig[i]);
}
}
var subflowNameRE = /^subflow:(.+)$/;
// Instantiate each node in the flow
for (i=0;i<activeConfig.length;i++) {
activeConfigNodes[activeConfig[i].id] = activeConfig[i];
var nn = null;
type = activeConfig[i].type;
var m = subflowNameRE.exec(type);
if (!m) {
// TODO: remove workspace in next release+1
if (type != "workspace" && type != "tab" && type != "subflow" && !subflows[activeConfig[i].z]) {
nt = typeRegistry.get(type);
if (nt) {
try {
nn = new nt(activeConfig[i]);
}
catch (err) {
util.log("[red] "+type+" : "+err);
}
}
if (nn === null) {
util.log("[red] unknown type: "+type);
}
}
} else {
var subflowId = m[1];
createSubflow(subflows[subflowId],activeConfig[i]);
}
}
// Clean up any orphaned credentials
credentials.clean(flowNodes.get);
events.emit("nodes-started");
}
/**
* Stops the current activeConfig
*/
function stopFlows() {
if (activeConfig&&activeConfig.length > 0) {
util.log("[red] Stopping flows");
}
return flowNodes.clear();
}
function diffNodes(oldNode,newNode) {
if (oldNode == null) {
return true;
} else {
for (var p in newNode) {
if (newNode.hasOwnProperty(p) && p != "x" && p != "y") {
if (!redUtil.compareObjects(oldNode[p],newNode[p])) {
return true;
break;
}
}
}
}
return false;
}
var flowNodes = module.exports = {
init: function(_storage) {
@ -261,13 +54,12 @@ var flowNodes = module.exports = {
load: function() {
return storage.getFlows().then(function(flows) {
return credentials.load().then(function() {
activeConfig = flows;
if (activeConfig && activeConfig.length > 0) {
parseConfig();
}
activeFlow = new Flow(flows);
flowNodes.startFlows();
});
}).otherwise(function(err) {
util.log("[red] Error loading flows : "+err);
console.log(err.stack);
});
},
@ -276,7 +68,7 @@ var flowNodes = module.exports = {
* @param n the node to add
*/
add: function(n) {
nodes[n.id] = n;
//console.log("ADDED NODE:",n.id,n.type,n.name||"");
n.on("log",log.log);
},
@ -286,196 +78,114 @@ var flowNodes = module.exports = {
* @return the node
*/
get: function(i) {
return nodes[i];
return activeFlow.getNode(i);
},
/**
* Stops all active nodes and clears the active set
* @return a promise for the stopping of all active nodes
*/
clear: function() {
return when.promise(function(resolve) {
events.emit("nodes-stopping");
var promises = [];
for (var n in nodes) {
if (nodes.hasOwnProperty(n)) {
try {
var p = nodes[n].close();
if (p) {
promises.push(p);
}
} catch(err) {
nodes[n].error(err);
}
}
}
when.settle(promises).then(function() {
events.emit("nodes-stopped");
nodes = {};
resolve();
});
});
},
/**
* Provides an iterator over the active set of nodes
* @param cb a function to be called for each node in the active set
*/
each: function(cb) {
for (var n in nodes) {
if (nodes.hasOwnProperty(n)) {
cb(nodes[n]);
}
}
},
//clear: function(nodesToStop) {
// var stopList;
// if (nodesToStop == null) {
// stopList = Object.keys(nodes);
// } else {
// stopList = Object.keys(nodesToStop);
// }
// console.log(stopList);
// return when.promise(function(resolve) {
// events.emit("nodes-stopping");
// var promises = [];
// console.log("running nodes:",Object.keys(nodes).length);
// for (var i=0;i<stopList.length;i++) {
// var node = nodes[stopList[i]];
// if (node) {
// try {
// var p = node.close();
// if (p) {
// promises.push(p);
// }
// } catch(err) {
// node.error(err);
// }
// delete nodes[stopList[i]];
// delete activeConfigNodes[stopList[i]];
// }
// }
// console.log("running nodes:",Object.keys(nodes).length);
// when.settle(promises).then(function() {
// events.emit("nodes-stopped");
// resolve();
// });
// });
//},
/**
* @return the active configuration
*/
getFlows: function() {
return activeConfig;
return activeFlow.getFlow();
},
/**
* Sets the current active config.
* @param config the configuration to enable
* @param type the type of deployment to do: full (default), nodes, flows
* @return a promise for the starting of the new flow
*/
setFlows: function (config) {
//TODO: identify the deleted nodes
var newNodes = {};
var changedNodeStack = [];
var changedSubflowsObj = {};
var visited = {};
setFlows: function (config,type) {
var nodeLinks = {};
var changedNodes = {};
var linkChangedNodes = {};
var deletedNodes = [];
type = type||"full";
for (var i=0; i<config.length; i++) {
var node = config[i];
newNodes[node.id] = node;
var changed = false;
// Extract any credential updates
var credentialsChanged = false;
var credentialSavePromise = null;
config.forEach(function(node) {
if (node.credentials) {
credentials.extract(node);
changed = true;
delete node.credentials;
} else {
// Check for any changes on the node
changed = diffNodes(activeConfigNodes[node.id],node);
}
if (changed) {
changedNodes[node.id] = node;
changedNodeStack.push(node.id);
// If this is part of a subflow, mark the subflow as changed
if (activeConfigNodes[node.z] && activeConfigNodes[node.z].type == "subflow") {
changedSubflowsObj[node.z] = true;
}
credentialsChanged = true;
}
});
if (credentialsChanged) {
credentialsSavePromise = credentials.save();
} else {
credentialSavePromise = when.resolve();
}
activeConfig.forEach(function(n) {
if (!newNodes[n.id]) {
deletedNodes.push(n.id);
// Upstream nodes will be flagged as changed as their wires
// array will be different.
// Need to flag downstream nodes
if (n.wires) {
for (var j=0;j<n.wires.length;j++) {
var wires = n.wires[j];
for (var k=0;k<wires.length;k++) {
var nn = newNodes[wires[k]];
if (nn) {
linkChangedNodes[wires[k]] = nn;
}
}
}
}
}
});
// For all changed subflows, mark any instances as changed
// If the subflow instance exists in a subflow, recursively mark them all
var changedSubflows = Object.keys(changedSubflowsObj);
while (changedSubflows.length > 0) {
var newChangedSubflowsObj = {};
changedSubflows.forEach(function(id) {
changedNodes[id] = newNodes[id];
});
var subflowNameRE = /^subflow:(.*)$/;
config.forEach(function(n) {
var m = subflowNameRE.exec(n.type);
if (m && changedSubflowsObj[m[1]]) {
// This is an instance of a changed subflow
changedNodes[n.id] = n;
if (activeConfigNodes[n.z] && activeConfigNodes[n.z].type == "subflow" && !changedNodes[n.z]) {
// This instance is itself in a subflow that has not yet been dealt with
newChangedSubflowsObj[n.z] = true;
}
}
});
changedSubflowsObj = newChangedSubflowsObj;
changedSubflows = Object.keys(newChangedSubflowsObj);
if (type=="full") {
return credentialSavePromise
.then(function() { return storage.saveFlows(config);})
.then(function() { return flowNodes.stopFlows(); })
.then(function() { activeFlow = new Flow(config); flowNodes.startFlows();});
} else {
return credentialSavePromise
.then(function() { return storage.saveFlows(config);})
.then(function() { return activeFlow.applyConfig(config,type); });
}
// Build the list of what each node is connected to
config.forEach(function(n) {
nodeLinks[n.id] = nodeLinks[n.id] || [];
if (n.wires) {
for (var j=0;j<n.wires.length;j++) {
var wires = n.wires[j];
for (var k=0;k<wires.length;k++) {
nodeLinks[n.id].push(wires[k]);
var nn = newNodes[wires[k]];
if (nn) {
nodeLinks[nn.id] = nodeLinks[nn.id] || [];
nodeLinks[nn.id].push(n.id);
}
}
}
}
});
// For all of the changed nodes, mark all downstream and upstream
// nodes as linkChanged, and add them to the stack to propagate
while(changedNodeStack.length > 0) {
var nid = changedNodeStack.pop();
var n = newNodes[nid];
if (!visited[nid]) {
visited[nid] = true;
if (nodeLinks[nid]) {
nodeLinks[nid].forEach(function(id) {
var nn = newNodes[id];
if (!changedNodes[id]) {
linkChangedNodes[id] = nn;
changedNodeStack.push(nn.id);
}
});
}
}
}
config.forEach(function(n) {
if (changedNodes[n.id]|| linkChangedNodes[n.id]) {
console.log(changedNodes[n.id]!=null,linkChangedNodes[n.id]!=null,n.id,n.type,n.name);
}
});
deletedNodes.forEach(function(n) {
console.log("Deleted:",n);
});
return credentials.save()
.then(function() { return storage.saveFlows(config);})
.then(function() { return stopFlows();})
.then(function () {
activeConfig = config;
parseConfig();
});
},
stopFlows: stopFlows
startFlows: function() {
util.log("[red] Starting flows");
try {
activeFlow.start();
} catch(err) {
var missingTypes = activeFlow.getMissingTypes();
if (missingTypes.length > 0) {
util.log("[red] Waiting for missing types to be registered:");
for (i=0;i<missingTypes.length;i++) {
util.log("[red] - "+missingTypes[i]);
}
}
}
},
stopFlows: function() {
util.log("[red] Stopping flows");
return activeFlow.stop();
}
};
var activeFlow = null;

View File

@ -39,8 +39,13 @@ function registerType(type,constructor,opts) {
*/
function createNode(node,def) {
Node.call(node,def);
var creds = credentials.get(node.id);
var id = node.id;
if (def._alias) {
id = def._alias;
}
var creds = credentials.get(id);
if (creds) {
//console.log("Attaching credentials to ",node.id);
node.credentials = creds;
}
}
@ -57,7 +62,8 @@ function checkTypeInUse(id) {
throw new Error("Unrecognised id: "+id);
} else {
var inUse = {};
flows.each(function(n) {
var config = flows.getFlows();
config.forEach(function(n) {
inUse[n.type] = (inUse[n.type]||0)+1;
});
var nodesInUse = [];