Add add/update/delete flow apis

This commit is contained in:
Nick O'Leary 2015-12-09 21:51:46 +00:00
parent fd2e47ed73
commit c4b1795396
8 changed files with 305 additions and 68 deletions

View File

@ -26,7 +26,64 @@ module.exports = {
},
get: function(req,res) {
var id = req.params.id;
log.audit({event: "flow.get"},req);
res.json(redNodes.getFlow(id));
var flow = redNodes.getFlow(id);
if (flow) {
log.audit({event: "flow.get",id:id},req);
res.json(flow);
} else {
log.audit({event: "flow.get",id:id,error:"not_found"},req);
res.status(404).end();
}
},
post: function(req,res) {
var flow = req.body;
redNodes.addFlow(flow).then(function(id) {
log.audit({event: "flow.add",id:id},req);
res.json({id:id});
}).otherwise(function(err) {
log.audit({event: "flow.add",error:err.code||"unexpected_error",message:err.toString()},req);
res.status(400).json({error:err.code||"unexpected_error", message:err.toString()});
})
},
put: function(req,res) {
var id = req.params.id;
var flow = req.body;
try {
redNodes.updateFlow(id,flow).then(function() {
log.audit({event: "flow.update",id:id},req);
res.json({id:id});
}).otherwise(function(err) {
console.log(err.stack);
log.audit({event: "flow.update",error:err.code||"unexpected_error",message:err.toString()},req);
res.status(400).json({error:err.code||"unexpected_error", message:err.toString()});
})
} catch(err) {
if (err.code === 404) {
log.audit({event: "flow.update",id:id,error:"not_found"},req);
res.status(404).end();
} else {
console.log(err.stack);
log.audit({event: "flow.update",error:err.code||"unexpected_error",message:err.toString()},req);
res.status(400).json({error:err.code||"unexpected_error", message:err.toString()});
}
}
},
delete: function(req,res) {
var id = req.params.id;
try {
redNodes.removeFlow(id).then(function() {
log.audit({event: "flow.remove",id:id},req);
res.status(204).end();
})
} catch(err) {
if (err.code === 404) {
log.audit({event: "flow.remove",id:id,error:"not_found"},req);
res.status(404).end();
} else {
log.audit({event: "flow.remove",id:id,error:err.code||"unexpected_error",message:err.toString()},req);
res.status(400).json({error:err.code||"unexpected_error", message:err.toString()});
}
}
}
}

View File

@ -106,6 +106,9 @@ function init(_server,runtime) {
adminApp.post("/flows",needsPermission("flows.write"),flows.post);
adminApp.get("/flow/:id",needsPermission("flows.read"),flow.get);
adminApp.post("/flow",needsPermission("flows.write"),flow.post);
adminApp.delete("/flow/:id",needsPermission("flows.write"),flow.delete);
adminApp.put("/flow/:id",needsPermission("flows.write"),flow.put);
// Nodes
adminApp.get("/nodes",needsPermission("nodes.read"),nodes.getAll);

View File

@ -121,6 +121,7 @@ module.exports = {
var nodeType = node.type;
var newCreds = node.credentials;
if (newCreds) {
delete node.credentials;
var savedCredentials = credentialCache[nodeID] || {};
var dashedType = nodeType.replace(/\s+/g, '-');
var definition = credentialsDef[dashedType];
@ -145,7 +146,6 @@ module.exports = {
}
}
credentialCache[nodeID] = savedCredentials;
delete node.credentials;
}
},

View File

@ -47,7 +47,7 @@ function Flow(global,flow) {
}
}
}
if (diff) {
if (diff && diff.rewired) {
for (var j=0;j<diff.rewired.length;j++) {
var rewireNode = activeNodes[diff.rewired[j]];
if (rewireNode) {
@ -262,7 +262,6 @@ function mapEnvVarProperties(obj,prop) {
}
function createNode(type,config) {
// console.log("CREATE",type,config.id);
var nn = null;
var nt = typeRegistry.get(type);
if (nt) {

View File

@ -200,6 +200,7 @@ function handleStatus(node,statusMessage) {
function start(type,diff) {
//dumpActiveNodes();
type = type||"full";
started = true;
var i;
@ -231,14 +232,18 @@ function start(type,diff) {
}
var id;
if (!diff) {
activeFlows['_GLOBAL_'] = Flow.create(activeFlowConfig);
if (!activeFlows['global']) {
activeFlows['global'] = Flow.create(activeFlowConfig);
}
for (id in activeFlowConfig.flows) {
if (activeFlowConfig.flows.hasOwnProperty(id)) {
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
if (!activeFlows[id]) {
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
}
}
}
} else {
activeFlows['_GLOBAL_'].update(activeFlowConfig,activeFlowConfig);
activeFlows['global'].update(activeFlowConfig,activeFlowConfig);
for (id in activeFlowConfig.flows) {
if (activeFlowConfig.flows.hasOwnProperty(id)) {
if (activeFlows[id]) {
@ -352,7 +357,34 @@ function checkTypeInUse(id) {
}
}
function updateMissingTypes() {
var subflowInstanceRE = /^subflow:(.+)$/;
activeFlowConfig.missingTypes = [];
for (var id in activeFlowConfig.allNodes) {
if (activeFlowConfig.allNodes.hasOwnProperty(id)) {
var node = activeFlowConfig.allNodes[id];
if (node.type !== 'tab' && node.type !== 'subflow') {
var subflowDetails = subflowInstanceRE.exec(node.type);
if ( (subflowDetails && !activeFlowConfig.subflows[subflowDetails[1]]) || (!subflowDetails && !typeRegistry.get(node.type)) ) {
if (activeFlowConfig.missingTypes.indexOf(node.type) === -1) {
activeFlowConfig.missingTypes.push(node.type);
}
}
}
}
}
}
// function dumpActiveNodes() {
// console.log("--------")
// for (var i in activeFlowConfig.allNodes) {
// console.log(i,activeFlowConfig.allNodes[i].type,activeFlowConfig.allNodes[i].z)
// }
// console.log("--------")
// }
function addFlow(flow) {
//dumpActiveNodes();
/*
{
id:'',
@ -364,69 +396,214 @@ function addFlow(flow) {
// flow.id should not exist - it will be assigned by the runtime
// all flow.{subflows|configs|nodes}.z will be set to flow.id
// all nodes will have new ids assigned if there is a clash
// check all known types - fail if otherwise?
//
// resolves with generated flow id
return when.promise(function(resolve,reject) {
var i,id,node;
var i,id,node;
flow.id = redUtil.generateId();
flow.id = redUtil.generateId();
for (i=0;i<flow.nodes.length;i++) {
node = flow.nodes[i];
for (i=0;i<flow.nodes.length;i++) {
node = flow.nodes[i];
if (activeFlowConfig.allNodes[node.id]) {
// TODO nls
return when.reject(new Error('duplicate id'));
}
node.z = flow.id;
}
if (flow.configs) {
for (i=0;i<flow.configs.length;i++) {
node = flow.configs[i];
if (activeFlowConfig.allNodes[node.id]) {
// TODO nls
return reject(new Error('duplicate id'));
return when.reject(new Error('duplicate id'));
}
node.z = flow.id;
}
var tabNode = {
type:'tab',
label:flow.label,
id:flow.id
}
var tabNode = {
type:'tab',
label:flow.label,
id:flow.id
}
var nodes = [tabNode].concat(flow.nodes||[]).concat(flow.configs||[]);
var credentialSavePromise;
var credentialsChanged = false;
nodes.forEach(function(node) {
if (node.credentials) {
credentials.extract(node);
credentialsChanged = true;
}
var nodes = [tabNode].concat(flow.nodes);
var parsedConfig = flowUtil.parseConfig(clone(nodes));
// TODO: handle unknown type
for (id in parsedConfig.flows[flow.id]) {
if (parsedConfig.flows[flow.id].hasOwnProperty(id)) {
activeFlowConfig.allNodes[id] = parsedConfig.flows[flow.id][id];
});
if (credentialsChanged) {
credentialSavePromise = credentials.save();
} else {
credentialSavePromise = when.resolve();
}
var parsedConfig = flowUtil.parseConfig(clone(nodes));
parsedConfig.missingTypes.forEach(function(type) {
if (activeFlowConfig.missingTypes.indexOf(type) == -1) {
activeFlowConfig.missingTypes.push(type);
}
})
activeFlowConfig.allNodes[tabNode.id] = tabNode;
for (id in parsedConfig.flows[flow.id].nodes) {
if (parsedConfig.flows[flow.id].nodes.hasOwnProperty(id)) {
activeFlowConfig.allNodes[id] = parsedConfig.flows[flow.id].nodes[id];
}
}
if (parsedConfig.flows[flow.id].configs) {
for (id in parsedConfig.flows[flow.id].configs) {
if (parsedConfig.flows[flow.id].configs.hasOwnProperty(id)) {
activeFlowConfig.allNodes[id] = parsedConfig.flows[flow.id].configs[id];
}
}
activeFlowConfig.flows[flow.id] = parsedConfig.flows[flow.id];
}
activeConfig = activeConfig.concat(nodes);
// TODO: extract creds
// TODO: save config
start("flows",{added:flow.nodes.map(function(n) { return n.id})}).then(function() {
// console.log(activeFlowConfig);
resolve(flow.id);
activeFlowConfig.flows[flow.id] = parsedConfig.flows[flow.id];
activeConfig = activeConfig.concat(nodes);
// TODO: extract creds
return credentialSavePromise.then(function() {
return storage.saveFlows(activeConfig).then(function() {
return start("flows",{added:flow.nodes.map(function(n) { return n.id})}).then(function() {
//dumpActiveNodes();
// console.log(activeFlowConfig);
return flow.id;
})
})
})
});
}
function getFlow(id) {
var flow = activeFlowConfig.flows[id];
var flow;
if (id === 'global') {
flow = activeFlowConfig;
} else {
flow = activeFlowConfig.flows[id];
}
if (!flow) {
return null;
}
var result = {
id: id,
label: flow.label,
nodes: []
id: id
};
for (var i=0;i<activeConfig.length;i++) {
if (activeConfig[i].z === id && activeConfig[i].type != 'tab') {
result.nodes.push(activeConfig[i]);
if (flow.label) {
result.label = flow.label;
}
if (flow.nodes) {
var nodeIds = Object.keys(flow.nodes);
if (nodeIds.length > 0) {
result.nodes = nodeIds.map(function(nodeId) {
return clone(flow.nodes[nodeId]);
})
}
}
if (flow.configs) {
var configIds = Object.keys(flow.configs);
result.configs = configIds.map(function(configId) {
return clone(flow.configs[configId]);
})
}
if (flow.subflows) {
var subflowIds = Object.keys(flow.subflows);
result.subflows = subflowIds.map(function(subflowId) {
var subflow = clone(flow.subflows[subflowId]);
var nodeIds = Object.keys(subflow.nodes);
subflow.nodes = nodeIds.map(function(id) {
return subflow.nodes[id];
});
if (subflow.configs) {
var configIds = Object.keys(subflow.configs);
subflow.configs = configIds.map(function(id) {
return subflow.configs[id];
})
}
delete subflow.instances;
return subflow;
});
}
return result;
}
function updateFlow(id,newFlow) {
if (id === 'global') {
// TODO: handle global update
throw new Error('not allowed to update global');
}
var flow = activeFlowConfig.flows[id];
if (!flow) {
var e = new Error();
e.code = 404;
throw e;
}
var newConfig = clone(activeConfig);
newConfig = newConfig.filter(function(node) {
return node.z !== id && node.id !== id;
});
var tabNode = {
type:'tab',
label:newFlow.label,
id:id
}
var nodes = [tabNode].concat(newFlow.nodes||[]).concat(newFlow.configs||[]);
nodes.forEach(function(n) {
n.z = id;
});
newConfig = newConfig.concat(nodes);
return setConfig(newConfig,'flows');
// filter activeConfig to remove nodes
}
function removeFlow(id) {
if (id === 'global') {
// TODO: nls + error code
throw new Error('not allowed to remove global');
}
var flow = activeFlowConfig.flows[id];
if (!flow) {
var e = new Error();
e.code = 404;
throw e;
}
var diff = {
removed: [id].concat(Object.keys(flow.nodes)).concat(Object.keys(flow.configs)),
linked:[],
changed:[]
}
delete activeFlowConfig.flows[id];
diff.removed.forEach(function(id) {
delete activeFlowConfig.allNodes[id];
});
activeConfig = activeConfig.filter(function(node) {
return node.z !== id && node.id !== id;
});
var missingTypeCount = activeFlowConfig.missingTypes.length;
updateMissingTypes();
return credentials.clean(activeConfig).then(function() {
storage.saveFlows(activeConfig).then(function() {
stop("flows",diff).then(function() {
if (missingTypeCount > 0 && activeFlowConfig.missingTypes.length === 0) {
return start();
}
//dumpActiveNodes();
});
});
})
}
module.exports = {
init: init,
@ -470,11 +647,10 @@ module.exports = {
checkTypeInUse: checkTypeInUse,
addFlow: addFlow,
getFlow: getFlow,
updateFlow:null,
removeFlow:null,
updateFlow: updateFlow,
removeFlow: removeFlow,
disableFlow:null,
enableFlow:null

View File

@ -76,29 +76,28 @@ module.exports = {
if (flow.missingTypes.indexOf(n.type) === -1) {
flow.missingTypes.push(n.type);
}
} else {
var container = null;
if (flow.flows[n.z]) {
container = flow.flows[n.z];
} else if (flow.subflows[n.z]) {
container = flow.subflows[n.z];
}
var container = null;
if (flow.flows[n.z]) {
container = flow.flows[n.z];
} else if (flow.subflows[n.z]) {
container = flow.subflows[n.z];
}
if (n.hasOwnProperty('x') && n.hasOwnProperty('y')) {
if (subflowDetails) {
var subflowType = subflowDetails[1]
n.subflow = subflowType;
flow.subflows[subflowType].instances.push(n)
}
if (n.hasOwnProperty('x') && n.hasOwnProperty('y')) {
if (subflowDetails) {
var subflowType = subflowDetails[1]
n.subflow = subflowType;
flow.subflows[subflowType].instances.push(n)
}
if (container) {
container.nodes[n.id] = n;
}
if (container) {
container.nodes[n.id] = n;
}
} else {
if (container) {
container.configs[n.id] = n;
} else {
if (container) {
container.configs[n.id] = n;
} else {
flow.configs[n.id] = n;
flow.configs[n.id]._users = [];
}
flow.configs[n.id] = n;
flow.configs[n.id]._users = [];
}
}
}

View File

@ -531,6 +531,9 @@ describe('flows/index', function() {
storage.getFlows = function() {
return when.resolve(originalConfig);
}
storage.setFlows = function() {
return when.resolve();
}
flows.init({},storage);
flows.load().then(function() {
return flows.startFlows();
@ -539,7 +542,8 @@ describe('flows/index', function() {
label:'new flow',
nodes:[
{id:"t2-1",x:10,y:10,z:"t1",type:"test",wires:[]},
{id:"t2-2",x:10,y:10,z:"t1",type:"test",wires:[]},
{id:"t2-2",x:10,y:10,z:"t1",type:"test",wires:[]}
,
{id:"t2-3",z:"t1",type:"test"}
]
}).then(function(id) {

View File

@ -137,8 +137,7 @@ describe('flows/util', function() {
];
var parsedConfig = flowUtil.parseConfig(originalConfig);
parsedConfig.missingTypes.should.eql(['missing']);
var expectedConfig = {"allNodes":{"t1":{"id":"t1","type":"tab"},"t1-1":{"id":"t1-1","x":10,"y":10,"z":"t1","type":"sf1","wires":[]},"t1-2":{"id":"t1-2","x":10,"y":10,"z":"t1","type":"missing","wires":[]}},"subflows":{},"configs":{},"flows":{"t1":{"id":"t1","type":"tab","subflows":{},"configs":{},"nodes":{"t1-1":{"id":"t1-1","x":10,"y":10,"z":"t1","type":"sf1","wires":[]}}}},"missingTypes":["missing"]};
var expectedConfig = {"allNodes":{"t1":{"id":"t1","type":"tab"},"t1-1":{"id":"t1-1","x":10,"y":10,"z":"t1","type":"sf1","wires":[]},"t1-2":{"id":"t1-2","x":10,"y":10,"z":"t1","type":"missing","wires":[]}},"subflows":{},"configs":{},"flows":{"t1":{"id":"t1","type":"tab","subflows":{},"configs":{},"nodes":{"t1-1":{"id":"t1-1","x":10,"y":10,"z":"t1","type":"sf1","wires":[]},'t1-2': { id: 't1-2', x: 10, y: 10, z: 't1', type: 'missing', wires: [] }}}},"missingTypes":["missing"]};
redUtil.compareObjects(parsedConfig,expectedConfig).should.be.true;
});