diff --git a/package.json b/package.json index b215610dd..ff285309b 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ ], "dependencies": { "ajv": "6.12.3", + "async-mutex": "0.2.4", "basic-auth": "2.0.1", "bcryptjs": "2.4.3", "body-parser": "1.19.0", diff --git a/packages/node_modules/@node-red/runtime/lib/api/flows.js b/packages/node_modules/@node-red/runtime/lib/api/flows.js index 277099ecc..6cbda197a 100644 --- a/packages/node_modules/@node-red/runtime/lib/api/flows.js +++ b/packages/node_modules/@node-red/runtime/lib/api/flows.js @@ -34,6 +34,8 @@ */ var runtime; +var Mutex = require('async-mutex').Mutex; +const mutex = new Mutex(); var api = module.exports = { init: function(_runtime) { @@ -64,37 +66,39 @@ var api = module.exports = { * @memberof @node-red/runtime_flows */ setFlows: function(opts) { - return new Promise(function(resolve,reject) { + return mutex.runExclusive(function() { + return new Promise(function(resolve,reject) { - var flows = opts.flows; - var deploymentType = opts.deploymentType||"full"; - runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req); + var flows = opts.flows; + var deploymentType = opts.deploymentType||"full"; + runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req); - var apiPromise; - if (deploymentType === 'reload') { - apiPromise = runtime.nodes.loadFlows(true); - } else { - if (flows.hasOwnProperty('rev')) { - var currentVersion = runtime.nodes.getFlows().rev; - if (currentVersion !== flows.rev) { - var err; - err = new Error(); - err.code = "version_mismatch"; - err.status = 409; - //TODO: log warning - return reject(err); + var apiPromise; + if (deploymentType === 'reload') { + apiPromise = runtime.nodes.loadFlows(true); + } else { + if (flows.hasOwnProperty('rev')) { + var currentVersion = runtime.nodes.getFlows().rev; + if (currentVersion !== flows.rev) { + var err; + err = new Error(); + err.code = "version_mismatch"; + err.status = 409; + //TODO: log warning + return reject(err); + } } + apiPromise = runtime.nodes.setFlows(flows.flows,flows.credentials,deploymentType); } - apiPromise = runtime.nodes.setFlows(flows.flows,flows.credentials,deploymentType); - } - apiPromise.then(function(flowId) { - return resolve({rev:flowId}); - }).catch(function(err) { - runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message})); - runtime.log.warn(err.stack); - return reject(err); + apiPromise.then(function(flowId) { + return resolve({rev:flowId}); + }).catch(function(err) { + runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message})); + runtime.log.warn(err.stack); + return reject(err); + }); }); - }); + }); }, /** @@ -107,19 +111,23 @@ var api = module.exports = { * @memberof @node-red/runtime_flows */ addFlow: function(opts) { - return new Promise(function(resolve,reject) { - var flow = opts.flow; - runtime.nodes.addFlow(flow).then(function(id) { - runtime.log.audit({event: "flow.add",id:id}, opts.req); - return resolve(id); - }).catch(function(err) { - runtime.log.audit({event: "flow.add",error:err.code||"unexpected_error",message:err.toString()}, opts.req); - err.status = 400; - return reject(err); + return mutex.runExclusive(function() { + return new Promise(function (resolve, reject) { + var flow = opts.flow; + runtime.nodes.addFlow(flow).then(function (id) { + runtime.log.audit({event: "flow.add", id: id}, opts.req); + return resolve(id); + }).catch(function (err) { + runtime.log.audit({ + event: "flow.add", + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; + return reject(err); + }) }) - }) - - + }); }, /** @@ -145,7 +153,6 @@ var api = module.exports = { return reject(err); } }) - }, /** * Updates an existing flow configuration @@ -158,33 +165,42 @@ var api = module.exports = { * @memberof @node-red/runtime_flows */ updateFlow: function(opts) { - return new Promise(function (resolve,reject) { - var flow = opts.flow; - var id = opts.id; - try { - runtime.nodes.updateFlow(id,flow).then(function() { - runtime.log.audit({event: "flow.update",id:id}, opts.req); - return resolve(id); - }).catch(function(err) { - runtime.log.audit({event: "flow.update",error:err.code||"unexpected_error",message:err.toString()}, opts.req); - err.status = 400; - return reject(err); - }) - } catch(err) { - if (err.code === 404) { - runtime.log.audit({event: "flow.update",id:id,error:"not_found"}, opts.req); - // TODO: this swap around of .code and .status isn't ideal - err.status = 404; - err.code = "not_found"; - return reject(err); - } else { - runtime.log.audit({event: "flow.update",error:err.code||"unexpected_error",message:err.toString()}, opts.req); - err.status = 400; - return reject(err); + return mutex.runExclusive(function() { + return new Promise(function (resolve, reject) { + var flow = opts.flow; + var id = opts.id; + try { + runtime.nodes.updateFlow(id, flow).then(function () { + runtime.log.audit({event: "flow.update", id: id}, opts.req); + return resolve(id); + }).catch(function (err) { + runtime.log.audit({ + event: "flow.update", + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; + return reject(err); + }) + } catch (err) { + if (err.code === 404) { + runtime.log.audit({event: "flow.update", id: id, error: "not_found"}, opts.req); + // TODO: this swap around of .code and .status isn't ideal + err.status = 404; + err.code = "not_found"; + return reject(err); + } else { + runtime.log.audit({ + event: "flow.update", + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; + return reject(err); + } } - } + }); }); - }, /** * Deletes a flow @@ -196,30 +212,42 @@ var api = module.exports = { * @memberof @node-red/runtime_flows */ deleteFlow: function(opts) { - return new Promise(function (resolve,reject) { - var id = opts.id; - try { - runtime.nodes.removeFlow(id).then(function() { - runtime.log.audit({event: "flow.remove",id:id}, opts.req); - return resolve(); - }).catch(function(err) { - runtime.log.audit({event: "flow.remove",id:id,error:err.code||"unexpected_error",message:err.toString()}, opts.req); - err.status = 400; - return reject(err); - }); - } catch(err) { - if (err.code === 404) { - runtime.log.audit({event: "flow.remove",id:id,error:"not_found"}, opts.req); - // TODO: this swap around of .code and .status isn't ideal - err.status = 404; - err.code = "not_found"; - return reject(err); - } else { - runtime.log.audit({event: "flow.remove",id:id,error:err.code||"unexpected_error",message:err.toString()}, opts.req); - err.status = 400; - return reject(err); + return mutex.runExclusive(function() { + return new Promise(function (resolve, reject) { + var id = opts.id; + try { + runtime.nodes.removeFlow(id).then(function () { + runtime.log.audit({event: "flow.remove", id: id}, opts.req); + return resolve(); + }).catch(function (err) { + runtime.log.audit({ + event: "flow.remove", + id: id, + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; + return reject(err); + }); + } catch (err) { + if (err.code === 404) { + runtime.log.audit({event: "flow.remove", id: id, error: "not_found"}, opts.req); + // TODO: this swap around of .code and .status isn't ideal + err.status = 404; + err.code = "not_found"; + return reject(err); + } else { + runtime.log.audit({ + event: "flow.remove", + id: id, + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; + return reject(err); + } } - } + }); }); }, @@ -264,5 +292,4 @@ var api = module.exports = { resolve(sendCredentials); }) } - } diff --git a/packages/node_modules/@node-red/runtime/package.json b/packages/node_modules/@node-red/runtime/package.json index 8346bbdef..4b33cd70e 100644 --- a/packages/node_modules/@node-red/runtime/package.json +++ b/packages/node_modules/@node-red/runtime/package.json @@ -18,6 +18,7 @@ "dependencies": { "@node-red/registry": "1.1.3", "@node-red/util": "1.1.3", + "async-mutex": "0.2.4", "clone": "2.1.2", "express": "4.17.1", "fs-extra": "8.1.0",