mirror of
https://github.com/node-red/node-red.git
synced 2023-10-10 13:36:53 +02:00
Merge pull request #2679 from rorysavage77/mutex-for-flow-modification
Updated flow modification methods to support mutex serialization
This commit is contained in:
commit
0c5eae2349
@ -27,6 +27,7 @@
|
||||
],
|
||||
"dependencies": {
|
||||
"ajv": "6.12.3",
|
||||
"async-mutex": "0.2.4",
|
||||
"basic-auth": "2.0.1",
|
||||
"bcryptjs": "2.4.3",
|
||||
"body-parser": "1.19.0",
|
||||
|
@ -34,6 +34,8 @@
|
||||
*/
|
||||
|
||||
var runtime;
|
||||
var Mutex = require('async-mutex').Mutex;
|
||||
const mutex = new Mutex();
|
||||
|
||||
var api = module.exports = {
|
||||
init: function(_runtime) {
|
||||
@ -64,37 +66,39 @@ var api = module.exports = {
|
||||
* @memberof @node-red/runtime_flows
|
||||
*/
|
||||
setFlows: function(opts) {
|
||||
return new Promise(function(resolve,reject) {
|
||||
return mutex.runExclusive(function() {
|
||||
return new Promise(function(resolve,reject) {
|
||||
|
||||
var flows = opts.flows;
|
||||
var deploymentType = opts.deploymentType||"full";
|
||||
runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req);
|
||||
var flows = opts.flows;
|
||||
var deploymentType = opts.deploymentType||"full";
|
||||
runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req);
|
||||
|
||||
var apiPromise;
|
||||
if (deploymentType === 'reload') {
|
||||
apiPromise = runtime.nodes.loadFlows(true);
|
||||
} else {
|
||||
if (flows.hasOwnProperty('rev')) {
|
||||
var currentVersion = runtime.nodes.getFlows().rev;
|
||||
if (currentVersion !== flows.rev) {
|
||||
var err;
|
||||
err = new Error();
|
||||
err.code = "version_mismatch";
|
||||
err.status = 409;
|
||||
//TODO: log warning
|
||||
return reject(err);
|
||||
var apiPromise;
|
||||
if (deploymentType === 'reload') {
|
||||
apiPromise = runtime.nodes.loadFlows(true);
|
||||
} else {
|
||||
if (flows.hasOwnProperty('rev')) {
|
||||
var currentVersion = runtime.nodes.getFlows().rev;
|
||||
if (currentVersion !== flows.rev) {
|
||||
var err;
|
||||
err = new Error();
|
||||
err.code = "version_mismatch";
|
||||
err.status = 409;
|
||||
//TODO: log warning
|
||||
return reject(err);
|
||||
}
|
||||
}
|
||||
apiPromise = runtime.nodes.setFlows(flows.flows,flows.credentials,deploymentType);
|
||||
}
|
||||
apiPromise = runtime.nodes.setFlows(flows.flows,flows.credentials,deploymentType);
|
||||
}
|
||||
apiPromise.then(function(flowId) {
|
||||
return resolve({rev:flowId});
|
||||
}).catch(function(err) {
|
||||
runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message}));
|
||||
runtime.log.warn(err.stack);
|
||||
return reject(err);
|
||||
apiPromise.then(function(flowId) {
|
||||
return resolve({rev:flowId});
|
||||
}).catch(function(err) {
|
||||
runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message}));
|
||||
runtime.log.warn(err.stack);
|
||||
return reject(err);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
/**
|
||||
@ -107,19 +111,23 @@ var api = module.exports = {
|
||||
* @memberof @node-red/runtime_flows
|
||||
*/
|
||||
addFlow: function(opts) {
|
||||
return new Promise(function(resolve,reject) {
|
||||
var flow = opts.flow;
|
||||
runtime.nodes.addFlow(flow).then(function(id) {
|
||||
runtime.log.audit({event: "flow.add",id:id}, opts.req);
|
||||
return resolve(id);
|
||||
}).catch(function(err) {
|
||||
runtime.log.audit({event: "flow.add",error:err.code||"unexpected_error",message:err.toString()}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
return mutex.runExclusive(function() {
|
||||
return new Promise(function (resolve, reject) {
|
||||
var flow = opts.flow;
|
||||
runtime.nodes.addFlow(flow).then(function (id) {
|
||||
runtime.log.audit({event: "flow.add", id: id}, opts.req);
|
||||
return resolve(id);
|
||||
}).catch(function (err) {
|
||||
runtime.log.audit({
|
||||
event: "flow.add",
|
||||
error: err.code || "unexpected_error",
|
||||
message: err.toString()
|
||||
}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
});
|
||||
},
|
||||
|
||||
/**
|
||||
@ -145,7 +153,6 @@ var api = module.exports = {
|
||||
return reject(err);
|
||||
}
|
||||
})
|
||||
|
||||
},
|
||||
/**
|
||||
* Updates an existing flow configuration
|
||||
@ -158,33 +165,42 @@ var api = module.exports = {
|
||||
* @memberof @node-red/runtime_flows
|
||||
*/
|
||||
updateFlow: function(opts) {
|
||||
return new Promise(function (resolve,reject) {
|
||||
var flow = opts.flow;
|
||||
var id = opts.id;
|
||||
try {
|
||||
runtime.nodes.updateFlow(id,flow).then(function() {
|
||||
runtime.log.audit({event: "flow.update",id:id}, opts.req);
|
||||
return resolve(id);
|
||||
}).catch(function(err) {
|
||||
runtime.log.audit({event: "flow.update",error:err.code||"unexpected_error",message:err.toString()}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
})
|
||||
} catch(err) {
|
||||
if (err.code === 404) {
|
||||
runtime.log.audit({event: "flow.update",id:id,error:"not_found"}, opts.req);
|
||||
// TODO: this swap around of .code and .status isn't ideal
|
||||
err.status = 404;
|
||||
err.code = "not_found";
|
||||
return reject(err);
|
||||
} else {
|
||||
runtime.log.audit({event: "flow.update",error:err.code||"unexpected_error",message:err.toString()}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
return mutex.runExclusive(function() {
|
||||
return new Promise(function (resolve, reject) {
|
||||
var flow = opts.flow;
|
||||
var id = opts.id;
|
||||
try {
|
||||
runtime.nodes.updateFlow(id, flow).then(function () {
|
||||
runtime.log.audit({event: "flow.update", id: id}, opts.req);
|
||||
return resolve(id);
|
||||
}).catch(function (err) {
|
||||
runtime.log.audit({
|
||||
event: "flow.update",
|
||||
error: err.code || "unexpected_error",
|
||||
message: err.toString()
|
||||
}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
})
|
||||
} catch (err) {
|
||||
if (err.code === 404) {
|
||||
runtime.log.audit({event: "flow.update", id: id, error: "not_found"}, opts.req);
|
||||
// TODO: this swap around of .code and .status isn't ideal
|
||||
err.status = 404;
|
||||
err.code = "not_found";
|
||||
return reject(err);
|
||||
} else {
|
||||
runtime.log.audit({
|
||||
event: "flow.update",
|
||||
error: err.code || "unexpected_error",
|
||||
message: err.toString()
|
||||
}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
},
|
||||
/**
|
||||
* Deletes a flow
|
||||
@ -196,30 +212,42 @@ var api = module.exports = {
|
||||
* @memberof @node-red/runtime_flows
|
||||
*/
|
||||
deleteFlow: function(opts) {
|
||||
return new Promise(function (resolve,reject) {
|
||||
var id = opts.id;
|
||||
try {
|
||||
runtime.nodes.removeFlow(id).then(function() {
|
||||
runtime.log.audit({event: "flow.remove",id:id}, opts.req);
|
||||
return resolve();
|
||||
}).catch(function(err) {
|
||||
runtime.log.audit({event: "flow.remove",id:id,error:err.code||"unexpected_error",message:err.toString()}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
});
|
||||
} catch(err) {
|
||||
if (err.code === 404) {
|
||||
runtime.log.audit({event: "flow.remove",id:id,error:"not_found"}, opts.req);
|
||||
// TODO: this swap around of .code and .status isn't ideal
|
||||
err.status = 404;
|
||||
err.code = "not_found";
|
||||
return reject(err);
|
||||
} else {
|
||||
runtime.log.audit({event: "flow.remove",id:id,error:err.code||"unexpected_error",message:err.toString()}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
return mutex.runExclusive(function() {
|
||||
return new Promise(function (resolve, reject) {
|
||||
var id = opts.id;
|
||||
try {
|
||||
runtime.nodes.removeFlow(id).then(function () {
|
||||
runtime.log.audit({event: "flow.remove", id: id}, opts.req);
|
||||
return resolve();
|
||||
}).catch(function (err) {
|
||||
runtime.log.audit({
|
||||
event: "flow.remove",
|
||||
id: id,
|
||||
error: err.code || "unexpected_error",
|
||||
message: err.toString()
|
||||
}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
});
|
||||
} catch (err) {
|
||||
if (err.code === 404) {
|
||||
runtime.log.audit({event: "flow.remove", id: id, error: "not_found"}, opts.req);
|
||||
// TODO: this swap around of .code and .status isn't ideal
|
||||
err.status = 404;
|
||||
err.code = "not_found";
|
||||
return reject(err);
|
||||
} else {
|
||||
runtime.log.audit({
|
||||
event: "flow.remove",
|
||||
id: id,
|
||||
error: err.code || "unexpected_error",
|
||||
message: err.toString()
|
||||
}, opts.req);
|
||||
err.status = 400;
|
||||
return reject(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
@ -264,5 +292,4 @@ var api = module.exports = {
|
||||
resolve(sendCredentials);
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
"dependencies": {
|
||||
"@node-red/registry": "1.1.3",
|
||||
"@node-red/util": "1.1.3",
|
||||
"async-mutex": "0.2.4",
|
||||
"clone": "2.1.2",
|
||||
"express": "4.17.1",
|
||||
"fs-extra": "8.1.0",
|
||||
|
Loading…
Reference in New Issue
Block a user