From ff627fd1284014004e3c6fde7615a2f2d185c412 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Thu, 9 Aug 2018 14:39:20 +0100 Subject: [PATCH] Fix localfilesystem clean handling --- red/runtime/nodes/context/localfilesystem.js | 133 ++++++--- .../nodes/context/localfilesystem_spec.js | 275 ++++++++++++------ 2 files changed, 279 insertions(+), 129 deletions(-) diff --git a/red/runtime/nodes/context/localfilesystem.js b/red/runtime/nodes/context/localfilesystem.js index 33ca11430..828179ac5 100644 --- a/red/runtime/nodes/context/localfilesystem.js +++ b/red/runtime/nodes/context/localfilesystem.js @@ -19,12 +19,16 @@ * * Configuration options: * { - * base: "contexts", // the base directory to use - * // default: "contexts" + * base: "context", // the base directory to use + * // default: "context" * dir: "/path/to/storage", // the directory to create the base directory in * // default: settings.userDir - * cache: true // whether to cache contents in memory + * cache: true, // whether to cache contents in memory * // default: true + * flushInterval: 30 // if cache is enabled, the minimum interval + * // between writes to storage, in seconds. This + * can be used to reduce wear on underlying storage. + * default: 30 seconds * } * * @@ -61,7 +65,7 @@ function getStoragePath(storageBaseDir, scope) { } function getBasePath(config) { - var base = config.base || "contexts"; + var base = config.base || "context"; var storageBaseDir; if (!config.dir) { if(config.settings && config.settings.userDir){ @@ -102,12 +106,30 @@ function loadFile(storagePath){ }); } +function listFiles(storagePath) { + var promises = []; + return fs.readdir(storagePath).then(function(files) { + files.forEach(function(file) { + promises.push(fs.readdir(path.join(storagePath,file)).then(function(subdirFiles) { + return subdirFiles.map(subfile => path.join(file,subfile)); + })) + }); + return Promise.all(promises); + }).then(dirs => dirs.reduce((acc, val) => acc.concat(val), [])); +} + function LocalFileSystem(config){ this.config = config; this.storageBaseDir = getBasePath(this.config); if (config.hasOwnProperty('cache')?config.cache:true) { this.cache = MemoryStore({}); } + this.pendingWrites = {}; + if (config.hasOwnProperty('flushInterval')) { + this.flushInterval = Math.max(0,config.flushInterval) * 1000; + } else { + this.flushInterval = 30000; + } } LocalFileSystem.prototype.open = function(){ @@ -115,25 +137,17 @@ LocalFileSystem.prototype.open = function(){ if (this.cache) { var scopes = []; var promises = []; - var subdirs = []; - var subdirPromises = []; - return fs.readdir(self.storageBaseDir).then(function(dirs){ - dirs.forEach(function(fn) { - var p = getStoragePath(self.storageBaseDir ,fn)+".json"; - scopes.push(fn); - promises.push(loadFile(p)); - subdirs.push(path.join(self.storageBaseDir,fn)); - subdirPromises.push(fs.readdir(path.join(self.storageBaseDir,fn))); - }) - return Promise.all(subdirPromises); - }).then(function(dirs) { - dirs.forEach(function(files,i) { - files.forEach(function(fn) { - if (fn !== 'flow.json' && fn !== 'global.json') { - scopes.push(fn.substring(0,fn.length-5)+":"+scopes[i]); - promises.push(loadFile(path.join(subdirs[i],fn))) - } - }); + return listFiles(self.storageBaseDir).then(function(files) { + files.forEach(function(file) { + var parts = file.split("/"); + if (parts[0] === 'global') { + scopes.push("global"); + } else if (parts[1] === 'flow.json') { + scopes.push(parts[0]) + } else { + scopes.push(parts[1].substring(0,parts[1].length-5)+":"+parts[0]); + } + promises.push(loadFile(path.join(self.storageBaseDir,file))); }) return Promise.all(promises); }).then(function(res) { @@ -149,13 +163,32 @@ LocalFileSystem.prototype.open = function(){ }else{ return Promise.reject(err); } + }).then(function() { + self._flushPendingWrites = function() { + var scopes = Object.keys(self.pendingWrites); + self.pendingWrites = {}; + var promises = []; + var newContext = self.cache._export(); + scopes.forEach(function(scope) { + var storagePath = getStoragePath(self.storageBaseDir,scope); + var context = newContext[scope]; + promises.push(fs.outputFile(storagePath + ".json", JSON.stringify(context, undefined, 4), "utf8")); + }); + delete self._pendingWriteTimeout; + return Promise.all(promises); + } }); } else { - return Promise.resolve(); + return fs.ensureDir(self.storageBaseDir); } } LocalFileSystem.prototype.close = function(){ + if (this.cache && this._flushPendingWrites) { + clearTimeout(this._pendingWriteTimeout); + delete this._pendingWriteTimeout; + return this._flushPendingWrites(); + } return Promise.resolve(); } @@ -191,10 +224,14 @@ LocalFileSystem.prototype.set = function(scope, key, value, callback) { var storagePath = getStoragePath(this.storageBaseDir ,scope); if (this.cache) { this.cache.set(scope,key,value,callback); - // With cache enabled, no need to re-read the file prior to writing. - var newContext = this.cache._export()[scope]; - fs.outputFile(storagePath + ".json", JSON.stringify(newContext, undefined, 4), "utf8").catch(function(err) { - }); + this.pendingWrites[scope] = true; + if (this._pendingWriteTimeout) { + // there's a pending write which will handle this + return; + } else { + var self = this; + this._pendingWriteTimeout = setTimeout(function() { self._flushPendingWrites.call(self)}, this.flushInterval); + } } else if (callback && typeof callback !== 'function') { throw new Error("Callback must be a function"); } else { @@ -254,36 +291,44 @@ LocalFileSystem.prototype.delete = function(scope){ cachePromise = Promise.resolve(); } var that = this; + delete this.pendingWrites[scope]; return cachePromise.then(function() { var storagePath = getStoragePath(that.storageBaseDir,scope); return fs.remove(storagePath + ".json"); }); } -LocalFileSystem.prototype.clean = function(activeNodes){ +LocalFileSystem.prototype.clean = function(_activeNodes) { + var activeNodes = {}; + _activeNodes.forEach(function(node) { activeNodes[node] = true }); var self = this; var cachePromise; if (this.cache) { - cachePromise = this.cache.clean(activeNodes); + cachePromise = this.cache.clean(_activeNodes); } else { cachePromise = Promise.resolve(); } - return cachePromise.then(function() { - return fs.readdir(self.storageBaseDir).then(function(dirs){ - return Promise.all(dirs.reduce(function(result, item){ - if(item !== "global" && activeNodes.indexOf(item) === -1){ - result.push(fs.remove(path.join(self.storageBaseDir,item))); - } - return result; - },[])); - }).catch(function(err){ - if(err.code == 'ENOENT') { - return Promise.resolve(); - }else{ - return Promise.reject(err); + return cachePromise.then(() => listFiles(self.storageBaseDir)).then(function(files) { + var promises = []; + files.forEach(function(file) { + var parts = file.split("/"); + var removePromise; + if (parts[0] === 'global') { + // never clean global + return; + } else if (!activeNodes[parts[0]]) { + // Flow removed - remove the whole dir + removePromise = fs.remove(path.join(self.storageBaseDir,parts[0])); + } else if (parts[1] !== 'flow.json' && !activeNodes[parts[1].substring(0,parts[1].length-5)]) { + // Node removed - remove the context file + removePromise = fs.remove(path.join(self.storageBaseDir,file)); + } + if (removePromise) { + promises.push(removePromise); } }); - }); + return Promise.all(promises) + }) } module.exports = function(config){ diff --git a/test/red/runtime/nodes/context/localfilesystem_spec.js b/test/red/runtime/nodes/context/localfilesystem_spec.js index 0d5e7062e..24ab3dc7e 100644 --- a/test/red/runtime/nodes/context/localfilesystem_spec.js +++ b/test/red/runtime/nodes/context/localfilesystem_spec.js @@ -21,27 +21,29 @@ var LocalFileSystem = require('../../../../../red/runtime/nodes/context/localfil var resourcesDir = path.resolve(path.join(__dirname,"..","resources","context")); +var defaultContextBase = "context"; + describe('localfilesystem',function() { - var context; before(function() { return fs.remove(resourcesDir); }); - beforeEach(function() { - context = LocalFileSystem({dir: resourcesDir, cache: false}); - return context.open(); - }); + describe('#get/set',function() { + var context; + beforeEach(function() { + context = LocalFileSystem({dir: resourcesDir, cache: false}); + return context.open(); + }); - afterEach(function() { - return context.clean([]).then(function(){ - return context.close().then(function(){ + afterEach(function() { + return context.clean([]).then(function(){ + return context.close(); + }).then(function(){ return fs.remove(resourcesDir); }); }); - }); - describe('#get/set',function() { it('should store property',function(done) { context.get("nodeX","foo",function(err, value){ should.not.exist(value); @@ -348,7 +350,7 @@ describe('localfilesystem',function() { }); it('should handle empty context file', function (done) { - fs.outputFile(path.join(resourcesDir,"contexts","nodeX","flow.json"),"",function(){ + fs.outputFile(path.join(resourcesDir,defaultContextBase,"nodeX","flow.json"),"",function(){ context.get("nodeX", "foo", function (err, value) { should.not.exist(value); context.set("nodeX", "foo", "test", function (err) { @@ -362,7 +364,7 @@ describe('localfilesystem',function() { }); it('should throw an error when reading corrupt context file', function (done) { - fs.outputFile(path.join(resourcesDir, "contexts", "nodeX", "flow.json"),"{abc",function(){ + fs.outputFile(path.join(resourcesDir, defaultContextBase, "nodeX", "flow.json"),"{abc",function(){ context.get("nodeX", "foo", function (err, value) { should.exist(err); done(); @@ -372,6 +374,20 @@ describe('localfilesystem',function() { }); describe('#keys',function() { + var context; + beforeEach(function() { + context = LocalFileSystem({dir: resourcesDir, cache: false}); + return context.open(); + }); + + afterEach(function() { + return context.clean([]).then(function(){ + return context.close(); + }).then(function(){ + return fs.remove(resourcesDir); + }); + }); + it('should enumerate context keys', function(done) { context.keys("nodeX",function(err, value){ value.should.be.an.Array(); @@ -436,6 +452,20 @@ describe('localfilesystem',function() { }); describe('#delete',function() { + var context; + beforeEach(function() { + context = LocalFileSystem({dir: resourcesDir, cache: false}); + return context.open(); + }); + + afterEach(function() { + return context.clean([]).then(function(){ + return context.close(); + }).then(function(){ + return fs.remove(resourcesDir); + }); + }); + it('should delete context',function(done) { context.get("nodeX","foo",function(err, value){ should.not.exist(value); @@ -466,64 +496,36 @@ describe('localfilesystem',function() { }); describe('#clean',function() { - it('should clean unnecessary context',function(done) { - context.get("nodeX","foo",function(err, value){ - should.not.exist(value); - context.get("nodeY","foo",function(err, value){ - should.not.exist(value); - context.set("nodeX","foo","testX",function(err){ - context.set("nodeY","foo","testY",function(err){ - context.get("nodeX","foo",function(err, value){ - value.should.be.equal("testX"); - context.get("nodeY","foo",function(err, value){ - value.should.be.equal("testY"); - context.clean([]).then(function(){ - context.get("nodeX","foo",function(err, value){ - should.not.exist(value); - context.get("nodeY","foo",function(err, value){ - should.not.exist(value); - done(); - }); - }); - }); - }); - }); - }); - }); + var context; + var contextGet; + var contextSet; + beforeEach(function() { + context = LocalFileSystem({dir: resourcesDir, cache: false}); + contextGet = function(scope,key) { + return new Promise((res,rej) => { + context.get(scope,key, function(err,value) { + if (err) { + rej(err); + } else { + res(value); + } + }) }); - }); + } + contextSet = function(scope,key,value) { + return new Promise((res,rej) => { + context.set(scope,key,value, function(err) { + if (err) { + rej(err); + } else { + res(); + } + }) + }); + } + return context.open(); }); - it('should not clean active context',function(done) { - context.get("nodeX","foo",function(err, value){ - should.not.exist(value); - context.get("nodeY","foo",function(err, value){ - should.not.exist(value); - context.set("nodeX","foo","testX",function(err){ - context.set("nodeY","foo","testY",function(err){ - context.get("nodeX","foo",function(err, value){ - value.should.be.equal("testX"); - context.get("nodeY","foo",function(err, value){ - value.should.be.equal("testY"); - context.clean(["nodeX"]).then(function(){ - context.get("nodeX","foo",function(err, value){ - value.should.be.equal("testX"); - context.get("nodeY","foo",function(err, value){ - should.not.exist(value); - done(); - }); - }); - }); - }); - }); - }); - }); - }); - }); - }); - }); - - describe('#if cache is enabled',function() { afterEach(function() { return context.clean([]).then(function(){ return context.close().then(function(){ @@ -531,23 +533,101 @@ describe('localfilesystem',function() { }); }); }); + it('should clean unnecessary context',function(done) { + contextSet("global","foo","testGlobal").then(function() { + return contextSet("nodeX:flow1","foo","testX"); + }).then(function() { + return contextSet("nodeY:flow2","foo","testY"); + }).then(function() { + return contextGet("nodeX:flow1","foo"); + }).then(function(value) { + value.should.be.equal("testX"); + }).then(function() { + return contextGet("nodeY:flow2","foo"); + }).then(function(value) { + value.should.be.equal("testY"); + }).then(function() { + return context.clean([]) + }).then(function() { + return contextGet("nodeX:flow1","foo"); + }).then(function(value) { + should.not.exist(value); + }).then(function() { + return contextGet("nodeY:flow2","foo"); + }).then(function(value) { + should.not.exist(value); + }).then(function() { + return contextGet("global","foo"); + }).then(function(value) { + value.should.eql("testGlobal"); + }).then(done).catch(done); + }); + + it('should not clean active context',function(done) { + contextSet("global","foo","testGlobal").then(function() { + return contextSet("nodeX:flow1","foo","testX"); + }).then(function() { + return contextSet("nodeY:flow2","foo","testY"); + }).then(function() { + return contextGet("nodeX:flow1","foo"); + }).then(function(value) { + value.should.be.equal("testX"); + }).then(function() { + return contextGet("nodeY:flow2","foo"); + }).then(function(value) { + value.should.be.equal("testY"); + }).then(function() { + return context.clean(["flow1","nodeX"]) + }).then(function() { + return contextGet("nodeX:flow1","foo"); + }).then(function(value) { + value.should.be.equal("testX"); + }).then(function() { + return contextGet("nodeY:flow2","foo"); + }).then(function(value) { + should.not.exist(value); + }).then(function() { + return contextGet("global","foo"); + }).then(function(value) { + value.should.eql("testGlobal"); + }).then(done).catch(done); + }); + }); + + describe('#if cache is enabled',function() { + + var context; + beforeEach(function() { + context = LocalFileSystem({dir: resourcesDir, cache: false}); + return context.open(); + }); + + afterEach(function() { + return context.clean([]).then(function(){ + return context.close(); + }).then(function(){ + return fs.remove(resourcesDir); + }); + }); + + it('should load contexts into the cache',function() { var globalData = {key:"global"}; var flowData = {key:"flow"}; var nodeData = {key:"node"}; return Promise.all([ - fs.outputFile(path.join(resourcesDir,"contexts","global","global.json"), JSON.stringify(globalData,null,4), "utf8"), - fs.outputFile(path.join(resourcesDir,"contexts","flow","flow.json"), JSON.stringify(flowData,null,4), "utf8"), - fs.outputFile(path.join(resourcesDir,"contexts","flow","node.json"), JSON.stringify(nodeData,null,4), "utf8") + fs.outputFile(path.join(resourcesDir,defaultContextBase,"global","global.json"), JSON.stringify(globalData,null,4), "utf8"), + fs.outputFile(path.join(resourcesDir,defaultContextBase,"flow","flow.json"), JSON.stringify(flowData,null,4), "utf8"), + fs.outputFile(path.join(resourcesDir,defaultContextBase,"flow","node.json"), JSON.stringify(nodeData,null,4), "utf8") ]).then(function(){ context = LocalFileSystem({dir: resourcesDir, cache: true}); return context.open(); }).then(function(){ return Promise.all([ - fs.remove(path.join(resourcesDir,"contexts","global","global.json")), - fs.remove(path.join(resourcesDir,"contexts","flow","flow.json")), - fs.remove(path.join(resourcesDir,"contexts","flow","node.json")) + fs.remove(path.join(resourcesDir,defaultContextBase,"global","global.json")), + fs.remove(path.join(resourcesDir,defaultContextBase,"flow","flow.json")), + fs.remove(path.join(resourcesDir,defaultContextBase,"flow","node.json")) ]); }).then(function(){ context.get("global","key").should.be.equal("global"); @@ -557,19 +637,31 @@ describe('localfilesystem',function() { }); it('should store property to the cache',function() { - context = LocalFileSystem({dir: resourcesDir, cache: true}); + context = LocalFileSystem({dir: resourcesDir, cache: true, flushInterval: 1}); return context.open().then(function(){ return new Promise(function(resolve, reject){ context.set("global","foo","bar",function(err){ if(err){ reject(err); } else { - resolve(); + fs.readJson(path.join(resourcesDir,defaultContextBase,"global","global.json")).then(function(data) { + // File should not exist as flush hasn't happened + reject("File global/global.json should not exist"); + }).catch(function(err) { + setTimeout(function() { + fs.readJson(path.join(resourcesDir,defaultContextBase,"global","global.json")).then(function(data) { + data.should.eql({foo:'bar'}); + resolve(); + }).catch(function(err) { + reject(err); + }); + },1100) + }) } }); }); }).then(function(){ - return fs.remove(path.join(resourcesDir,"contexts","global","global.json")); + return fs.remove(path.join(resourcesDir,defaultContextBase,"global","global.json")); }).then(function(){ context.get("global","foo").should.be.equal("bar"); }) @@ -577,11 +669,11 @@ describe('localfilesystem',function() { it('should enumerate context keys in the cache',function() { var globalData = {foo:"bar"}; - fs.outputFile(path.join(resourcesDir,"contexts","global","global.json"), JSON.stringify(globalData,null,4), "utf8").then(function(){ - context = LocalFileSystem({dir: resourcesDir, cache: true}); + fs.outputFile(path.join(resourcesDir,defaultContextBase,"global","global.json"), JSON.stringify(globalData,null,4), "utf8").then(function(){ + context = LocalFileSystem({dir: resourcesDir, cache: true, flushInterval: 2}); return context.open() }).then(function(){ - return fs.remove(path.join(resourcesDir,"contexts","global","global.json")); + return fs.remove(path.join(resourcesDir,defaultContextBase,"global","global.json")); }).then(function(){ var keys = context.keys("global"); keys.should.have.length(1); @@ -596,7 +688,7 @@ describe('localfilesystem',function() { }); }); }).then(function(){ - return fs.remove(path.join(resourcesDir,"contexts","global","global.json")); + return fs.remove(path.join(resourcesDir,defaultContextBase,"global","global.json")); }).then(function(){ var keys = context.keys("global"); keys.should.have.length(2); @@ -605,7 +697,7 @@ describe('localfilesystem',function() { }); it('should delete context in the cache',function() { - context = LocalFileSystem({dir: resourcesDir, cache: true}); + context = LocalFileSystem({dir: resourcesDir, cache: true, flushInterval: 2}); return context.open().then(function(){ return new Promise(function(resolve, reject){ context.set("global","foo","bar",function(err){ @@ -628,10 +720,10 @@ describe('localfilesystem',function() { var flowAData = {key:"flowA"}; var flowBData = {key:"flowB"}; return Promise.all([ - fs.outputFile(path.join(resourcesDir,"contexts","flowA","flow.json"), JSON.stringify(flowAData,null,4), "utf8"), - fs.outputFile(path.join(resourcesDir,"contexts","flowB","flow.json"), JSON.stringify(flowBData,null,4), "utf8") + fs.outputFile(path.join(resourcesDir,defaultContextBase,"flowA","flow.json"), JSON.stringify(flowAData,null,4), "utf8"), + fs.outputFile(path.join(resourcesDir,defaultContextBase,"flowB","flow.json"), JSON.stringify(flowBData,null,4), "utf8") ]).then(function(){ - context = LocalFileSystem({dir: resourcesDir, cache: true}); + context = LocalFileSystem({dir: resourcesDir, cache: true, flushInterval: 2}); return context.open(); }).then(function(){ context.get("flowA","key").should.be.equal("flowA"); @@ -645,6 +737,19 @@ describe('localfilesystem',function() { }); describe('Configuration', function () { + var context; + beforeEach(function() { + context = LocalFileSystem({dir: resourcesDir, cache: false}); + return context.open(); + }); + + afterEach(function() { + return context.clean([]).then(function(){ + return context.close(); + }).then(function(){ + return fs.remove(resourcesDir); + }); + }); it('should change a base directory', function (done) { var differentBaseContext = LocalFileSystem({ base: "contexts2", @@ -688,7 +793,7 @@ describe('localfilesystem',function() { it('should use NODE_RED_HOME', function (done) { var oldNRH = process.env.NODE_RED_HOME; process.env.NODE_RED_HOME = resourcesDir; - fs.mkdirSync(resourcesDir); + fs.ensureDirSync(resourcesDir); fs.writeFileSync(path.join(resourcesDir,".config.json"),""); var nrHomeContext = LocalFileSystem({ base: "contexts2",