diff --git a/packages/node_modules/@node-red/nodes/core/storage/50-file.js b/packages/node_modules/@node-red/nodes/core/storage/50-file.js index d55782785..99a539a43 100644 --- a/packages/node_modules/@node-red/nodes/core/storage/50-file.js +++ b/packages/node_modules/@node-red/nodes/core/storage/50-file.js @@ -28,9 +28,11 @@ module.exports = function(RED) { this.createDir = n.createDir || false; var node = this; node.wstream = null; - node.data = []; + node.msgQueue = []; + node.closing = false; + node.closeCallback = null; - this.on("input",function(msg) { + function processMsg(msg, done) { var filename = node.filename || msg.filename || ""; if ((!node.filename) && (!node.tout)) { node.tout = setTimeout(function() { @@ -41,6 +43,7 @@ module.exports = function(RED) { } if (filename === "") { node.warn(RED._("file.errors.nofilename")); + done(); } else if (node.overwriteFile === "delete") { fs.unlink(filename, function (err) { if (err) { @@ -51,6 +54,7 @@ module.exports = function(RED) { } node.send(msg); } + done(); }); } else if (msg.hasOwnProperty("payload") && (typeof msg.payload !== "undefined")) { var dir = path.dirname(filename); @@ -59,6 +63,7 @@ module.exports = function(RED) { fs.ensureDirSync(dir); } catch(err) { node.error(RED._("file.errors.createfail",{error:err.toString()}),msg); + done(); return; } } @@ -70,85 +75,142 @@ module.exports = function(RED) { if (typeof data === "boolean") { data = data.toString(); } if (typeof data === "number") { data = data.toString(); } if ((node.appendNewline) && (!Buffer.isBuffer(data))) { data += os.EOL; } - node.data.push({msg:msg,data:Buffer.from(data)}); - - while (node.data.length > 0) { - if (node.overwriteFile === "true") { - (function(packet) { - node.wstream = fs.createWriteStream(filename, { encoding:'binary', flags:'w', autoClose:true }); - node.wstream.on("error", function(err) { - node.error(RED._("file.errors.writefail",{error:err.toString()}),msg); - }); - node.wstream.on("open", function() { - node.wstream.end(packet.data, function() { - node.send(packet.msg); - }); - }) - })(node.data.shift()); - } - else { - // Append mode - var recreateStream = !node.wstream || !node.filename; - if (node.wstream && node.wstreamIno) { - // There is already a stream open and we have the inode - // of the file. Check the file hasn't been deleted - // or deleted and recreated. - try { - var stat = fs.statSync(filename); - // File exists - check the inode matches - if (stat.ino !== node.wstreamIno) { - // The file has been recreated. Close the current - // stream and recreate it - recreateStream = true; - node.wstream.end(); - delete node.wstream; - delete node.wstreamIno; - } - } catch(err) { - // File does not exist + if (node.overwriteFile === "true") { + var wstream = fs.createWriteStream(filename, { encoding:'binary', flags:'w', autoClose:true }); + node.wstream = wstream; + wstream.on("error", function(err) { + node.error(RED._("file.errors.writefail",{error:err.toString()}),msg); + done(); + }); + wstream.on("open", function() { + wstream.end(data, function() { + node.send(msg); + done(); + }); + }) + return; + } + else { + // Append mode + var recreateStream = !node.wstream || !node.filename; + if (node.wstream && node.wstreamIno) { + // There is already a stream open and we have the inode + // of the file. Check the file hasn't been deleted + // or deleted and recreated. + try { + var stat = fs.statSync(filename); + // File exists - check the inode matches + if (stat.ino !== node.wstreamIno) { + // The file has been recreated. Close the current + // stream and recreate it recreateStream = true; node.wstream.end(); delete node.wstream; delete node.wstreamIno; } - } - if (recreateStream) { - node.wstream = fs.createWriteStream(filename, { encoding:'binary', flags:'a', autoClose:true }); - node.wstream.on("open", function(fd) { - try { - var stat = fs.statSync(filename); - node.wstreamIno = stat.ino; - } catch(err) { - } - }); - node.wstream.on("error", function(err) { - node.error(RED._("file.errors.appendfail",{error:err.toString()}),msg); - }); - } - if (node.filename) { - // Static filename - write and reuse the stream next time - var packet = node.data.shift() - node.wstream.write(packet.data, function() { - node.send(packet.msg); - }); - - } else { - // Dynamic filename - write and close the stream - var packet = node.data.shift() - node.wstream.end(packet.data, function() { - node.send(packet.msg); - }); + } catch(err) { + // File does not exist + recreateStream = true; + node.wstream.end(); delete node.wstream; delete node.wstreamIno; } } + if (recreateStream) { + node.wstream = fs.createWriteStream(filename, { encoding:'binary', flags:'a', autoClose:true }); + node.wstream.on("open", function(fd) { + try { + var stat = fs.statSync(filename); + node.wstreamIno = stat.ino; + } catch(err) { + } + }); + node.wstream.on("error", function(err) { + node.error(RED._("file.errors.appendfail",{error:err.toString()}),msg); + done(); + }); + } + if (node.filename) { + // Static filename - write and reuse the stream next time + node.wstream.write(data, function() { + node.send(msg); + done(); + }); + } else { + // Dynamic filename - write and close the stream + node.wstream.end(data, function() { + node.send(msg); + delete node.wstream; + delete node.wstreamIno; + done(); + }); + } } } + else { + done(); + } + } + + function processQ(queue) { + var msg = queue[0]; + processMsg(msg, function() { + queue.shift(); + if (queue.length > 0) { + processQ(queue); + } + else if (node.closing) { + closeNode(); + } + }); + } + + this.on("input", function(msg) { + var msgQueue = node.msgQueue; + if (msgQueue.push(msg) > 1) { + // pending write exists + return; + } + try { + processQ(msgQueue); + } + catch (e) { + node.msgQueue = []; + if (node.closing) { + closeNode(); + } + throw e; + } }); - this.on('close', function() { + + function closeNode() { if (node.wstream) { node.wstream.end(); } if (node.tout) { clearTimeout(node.tout); } node.status({}); + var cb = node.closeCallback; + node.closeCallback = null; + node.closing = false; + if (cb) { + cb(); + } + } + + this.on('close', function(done) { + if (node.closing) { + // already closing + return; + } + node.closing = true; + if (done) { + node.closeCallback = done; + } + if (node.msgQueue.length > 0) { + // close after queue processed + return; + } + else { + closeNode(); + } }); } RED.nodes.registerType("file",FileNode); diff --git a/packages/node_modules/node-red/settings.js b/packages/node_modules/node-red/settings.js index 6d6c6948b..641c56498 100644 --- a/packages/node_modules/node-red/settings.js +++ b/packages/node_modules/node-red/settings.js @@ -148,7 +148,7 @@ module.exports = { // The following property can be used to cause insecure HTTP connections to // be redirected to HTTPS. - //requireHttps: true + //requireHttps: true, // The following property can be used to disable the editor. The admin API // is not affected by this option. To disable both the editor and the admin diff --git a/test/nodes/core/storage/50-file_spec.js b/test/nodes/core/storage/50-file_spec.js index 901592ecc..4ec9b4fd0 100644 --- a/test/nodes/core/storage/50-file_spec.js +++ b/test/nodes/core/storage/50-file_spec.js @@ -46,14 +46,14 @@ describe('file Nodes', function() { it('should be loaded', function(done) { var flow = [{id:"fileNode1", type:"file", name: "fileNode", "filename":fileToTest, "appendNewline":true, "overwriteFile":true}]; helper.load(fileNode, flow, function() { - try { + try { var fileNode1 = helper.getNode("fileNode1"); fileNode1.should.have.property('name', 'fileNode'); done(); - } - catch (e) { - done(e); - } + } + catch (e) { + done(e); + } }); }); @@ -64,16 +64,16 @@ describe('file Nodes', function() { var n1 = helper.getNode("fileNode1"); var n2 = helper.getNode("helperNode1"); n2.on("input", function(msg) { - try { - var f = fs.readFileSync(fileToTest); - f.should.have.length(4); - fs.unlinkSync(fileToTest); - msg.should.have.property("payload", "test"); - done(); - } - catch (e) { - done(e); - } + try { + var f = fs.readFileSync(fileToTest); + f.should.have.length(4); + fs.unlinkSync(fileToTest); + msg.should.have.property("payload", "test"); + done(); + } + catch (e) { + done(e); + } }); n1.receive({payload:"test"}); }); @@ -93,26 +93,26 @@ describe('file Nodes', function() { var data = ["test2", true, 999, [2]]; n2.on("input", function (msg) { - try { - msg.should.have.property("payload"); - data.should.containDeep([msg.payload]); - if (count === 3) { + try { + msg.should.have.property("payload"); + data.should.containDeep([msg.payload]); + if (count === 3) { var f = fs.readFileSync(fileToTest).toString(); if (os.type() !== "Windows_NT") { - f.should.have.length(19); - f.should.equal("test2\ntrue\n999\n[2]\n"); + f.should.have.length(19); + f.should.equal("test2\ntrue\n999\n[2]\n"); } else { - f.should.have.length(23); - f.should.equal("test2\r\ntrue\r\n999\r\n[2]\r\n"); + f.should.have.length(23); + f.should.equal("test2\r\ntrue\r\n999\r\n[2]\r\n"); } done(); - } - count++; - } - catch (e) { - done(e); - } + } + count++; + } + catch (e) { + done(e); + } }); n1.receive({payload:"test2"}); // string @@ -142,37 +142,37 @@ describe('file Nodes', function() { var count = 0; n2.on("input", function (msg) { - try { - msg.should.have.property("payload"); - data.should.containDeep([msg.payload]); - try { + try { + msg.should.have.property("payload"); + data.should.containDeep([msg.payload]); + try { if (count === 1) { - // Check they got appended as expected - var f = fs.readFileSync(fileToTest).toString(); - f.should.equal("onetwo"); + // Check they got appended as expected + var f = fs.readFileSync(fileToTest).toString(); + f.should.equal("onetwo"); - // Delete the file - fs.unlinkSync(fileToTest); - setTimeout(function() { + // Delete the file + fs.unlinkSync(fileToTest); + setTimeout(function() { // Send two more messages to the file n1.receive({payload:"three"}); n1.receive({payload:"four"}); - }, wait); + }, wait); } if (count === 3) { - var f = fs.readFileSync(fileToTest).toString(); - f.should.equal("threefour"); - fs.unlinkSync(fileToTest); - done(); + var f = fs.readFileSync(fileToTest).toString(); + f.should.equal("threefour"); + fs.unlinkSync(fileToTest); + done(); } - } catch(err) { + } catch(err) { done(err); - } - count++; - } - catch (e) { - done(e); - } + } + count++; + } + catch (e) { + done(e); + } }); // Send two messages to the file @@ -197,7 +197,7 @@ describe('file Nodes', function() { n2.on("input", function (msg) { try { msg.should.have.property("payload"); - data.should.containDeep([msg.payload]); + data.should.containDeep([msg.payload]); if (count == 1) { // Check they got appended as expected var f = fs.readFileSync(fileToTest).toString(); @@ -256,25 +256,25 @@ describe('file Nodes', function() { var n2 = helper.getNode("helperNode1"); n2.on("input", function (msg) { - try { - msg.should.have.property("payload", "fine"); - msg.should.have.property("filename", fileToTest); + try { + msg.should.have.property("payload", "fine"); + msg.should.have.property("filename", fileToTest); - var f = fs.readFileSync(fileToTest).toString(); - if (os.type() !== "Windows_NT") { + var f = fs.readFileSync(fileToTest).toString(); + if (os.type() !== "Windows_NT") { f.should.have.length(5); f.should.equal("fine\n"); - } - else { + } + else { f.should.have.length(6); f.should.equal("fine\r\n"); - } - done(); - } - catch (e) { - done(e); - } - }); + } + done(); + } + catch (e) { + done(e); + } + }); n1.receive({payload:"fine", filename:fileToTest}); }); @@ -540,6 +540,93 @@ describe('file Nodes', function() { }); }); + it('should write to multiple files', function(done) { + var flow = [{id:"fileNode1", type:"file", name: "fileNode", "appendNewline":true, "overwriteFile":true, "createDir":true, wires: [["helperNode1"]]}, + {id:"helperNode1", type:"helper"}]; + var tmp_path = path.join(resourcesDir, "tmp"); + var len = 1024*1024*10; + var file_count = 5; + helper.load(fileNode, flow, function() { + var n1 = helper.getNode("fileNode1"); + var n2 = helper.getNode("helperNode1"); + var count = 0; + n2.on("input", function(msg) { + try { + count++; + if (count == file_count) { + for(var i = 0; i < file_count; i++) { + var name = path.join(tmp_path, String(i)); + var f = fs.readFileSync(name); + f.should.have.length(len); + f[0].should.have.equal(i); + } + fs.removeSync(tmp_path); + done(); + } + } + catch (e) { + try { + fs.removeSync(tmp_path); + } + catch (e1) { + } + done(e); + } + }); + for(var i = 0; i < file_count; i++) { + var data = new Buffer(len); + data.fill(i); + var name = path.join(tmp_path, String(i)); + var msg = {payload:data, filename:name}; + n1.receive(msg); + } + }); + }); + + it('should write to multiple files if node is closed', function(done) { + var flow = [{id:"fileNode1", type:"file", name: "fileNode", "appendNewline":true, "overwriteFile":true, "createDir":true, wires: [["helperNode1"]]}, + {id:"helperNode1", type:"helper"}]; + var tmp_path = path.join(resourcesDir, "tmp"); + var len = 1024*1024*10; + var file_count = 5; + helper.load(fileNode, flow, function() { + var n1 = helper.getNode("fileNode1"); + var n2 = helper.getNode("helperNode1"); + var count = 0; + n2.on("input", function(msg) { + try { + count++; + if (count == file_count) { + for(var i = 0; i < file_count; i++) { + var name = path.join(tmp_path, String(i)); + var f = fs.readFileSync(name); + f.should.have.length(len); + f[0].should.have.equal(i); + } + fs.removeSync(tmp_path); + done(); + } + } + catch (e) { + try { + fs.removeSync(tmp_path); + } + catch (e1) { + } + done(e); + } + }); + for(var i = 0; i < file_count; i++) { + var data = new Buffer(len); + data.fill(i); + var name = path.join(tmp_path, String(i)); + var msg = {payload:data, filename:name}; + n1.receive(msg); + } + n1.close(); + }); + }); + }); @@ -573,7 +660,7 @@ describe('file Nodes', function() { it('should read in a file and output a buffer', function(done) { var flow = [{id:"fileInNode1", type:"file in", name:"fileInNode", "filename":fileToTest, "format":"", wires:[["n2"]]}, - {id:"n2", type:"helper"}]; + {id:"n2", type:"helper"}]; helper.load(fileNode, flow, function() { var n1 = helper.getNode("fileInNode1"); var n2 = helper.getNode("n2"); @@ -590,7 +677,7 @@ describe('file Nodes', function() { it('should read in a file and output a utf8 string', function(done) { var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", "filename":fileToTest, "format":"utf8", wires:[["n2"]]}, - {id:"n2", type:"helper"}]; + {id:"n2", type:"helper"}]; helper.load(fileNode, flow, function() { var n1 = helper.getNode("fileInNode1"); var n2 = helper.getNode("n2"); @@ -632,7 +719,7 @@ describe('file Nodes', function() { it('should read in a file and output split lines with parts', function(done) { var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"lines", wires:[["n2"]]}, - {id:"n2", type:"helper"}]; + {id:"n2", type:"helper"}]; helper.load(fileNode, flow, function() { var n1 = helper.getNode("fileInNode1"); var n2 = helper.getNode("n2"); @@ -670,7 +757,7 @@ describe('file Nodes', function() { var line = data.join("\n"); fs.writeFileSync(fileToTest, line); var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"lines", wires:[["n2"]]}, - {id:"n2", type:"helper"}]; + {id:"n2", type:"helper"}]; helper.load(fileNode, flow, function() { var n1 = helper.getNode("fileInNode1"); var n2 = helper.getNode("n2"); @@ -703,7 +790,7 @@ describe('file Nodes', function() { it('should read in a file and output a buffer with parts', function(done) { var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"stream", wires:[["n2"]]}, - {id:"n2", type:"helper"}]; + {id:"n2", type:"helper"}]; helper.load(fileNode, flow, function() { var n1 = helper.getNode("fileInNode1"); var n2 = helper.getNode("n2");