diff --git a/nodes/core/storage/50-file.js b/nodes/core/storage/50-file.js index 76cedb136..abe591c9d 100644 --- a/nodes/core/storage/50-file.js +++ b/nodes/core/storage/50-file.js @@ -29,8 +29,9 @@ module.exports = function(RED) { var node = this; node.wstream = null; node.data = []; + node.msgQueue = []; - this.on("input",function(msg) { + function processMsg(msg, done) { var filename = node.filename || msg.filename || ""; if ((!node.filename) && (!node.tout)) { node.tout = setTimeout(function() { @@ -41,6 +42,7 @@ module.exports = function(RED) { } if (filename === "") { node.warn(RED._("file.errors.nofilename")); + done(); } else if (node.overwriteFile === "delete") { fs.unlink(filename, function (err) { if (err) { @@ -51,6 +53,7 @@ module.exports = function(RED) { } node.send(msg); } + done(); }); } else if (msg.hasOwnProperty("payload") && (typeof msg.payload !== "undefined")) { var dir = path.dirname(filename); @@ -59,6 +62,7 @@ module.exports = function(RED) { fs.ensureDirSync(dir); } catch(err) { node.error(RED._("file.errors.createfail",{error:err.toString()}),msg); + done(); return; } } @@ -82,6 +86,7 @@ module.exports = function(RED) { node.wstream.on("open", function() { node.wstream.end(packet.data, function() { node.send(packet.msg); + done(); }); }) })(node.data.shift()); @@ -130,6 +135,7 @@ module.exports = function(RED) { var packet = node.data.shift() node.wstream.write(packet.data, function() { node.send(packet.msg); + done(); }); } else { @@ -137,14 +143,34 @@ module.exports = function(RED) { var packet = node.data.shift() node.wstream.end(packet.data, function() { node.send(packet.msg); + delete node.wstream; + delete node.wstreamIno; + done(); }); - delete node.wstream; - delete node.wstreamIno; } } } } + } + + this.on("input", function(msg) { + var msgQueue = node.msgQueue; + function processQ() { + var msg = msgQueue[0]; + processMsg(msg, function() { + msgQueue.shift(); + if (msgQueue.length > 0) { + processQ(); + } + }); + } + if (msgQueue.push(msg) > 1) { + // pending write exists + return; + } + processQ(); }); + this.on('close', function() { if (node.wstream) { node.wstream.end(); } if (node.tout) { clearTimeout(node.tout); } diff --git a/test/nodes/core/storage/50-file_spec.js b/test/nodes/core/storage/50-file_spec.js index e61c3fb68..cba22cd54 100644 --- a/test/nodes/core/storage/50-file_spec.js +++ b/test/nodes/core/storage/50-file_spec.js @@ -540,6 +540,49 @@ describe('file Nodes', function() { }); }); + it('should write to multiple files', function(done) { + var flow = [{id:"fileNode1", type:"file", name: "fileNode", "appendNewline":true, "overwriteFile":true, "createDir":true, wires: [["helperNode1"]]}, + {id:"helperNode1", type:"helper"}]; + var tmp_path = path.join(resourcesDir, "tmp"); + var len = 1024*1024*10; + var file_count = 5; + helper.load(fileNode, flow, function() { + var n1 = helper.getNode("fileNode1"); + var n2 = helper.getNode("helperNode1"); + var count = 0; + n2.on("input", function(msg) { + try { + count++; + if (count == file_count) { + for(var i = 0; i < file_count; i++) { + var name = path.join(tmp_path, String(i)); + var f = fs.readFileSync(name); + f.should.have.length(len); + f[0].should.have.equal(i); + } + fs.removeSync(tmp_path); + done(); + } + } + catch (e) { + try { + fs.removeSync(tmp_path); + } + catch (e1) { + } + done(e); + } + }); + for(var i = 0; i < file_count; i++) { + var data = new Buffer(len); + data.fill(i); + var name = path.join(tmp_path, String(i)); + var msg = {payload:data, filename:name}; + n1.receive(msg); + } + }); + }); + });