From 144104245818679711f0a1509cc4dccd450e39a7 Mon Sep 17 00:00:00 2001 From: Hiroyasu Nishiyama Date: Wed, 3 Oct 2018 21:29:28 +0900 Subject: [PATCH] update close & input handling of File node --- nodes/core/storage/50-file.js | 166 ++++++++++++++++++---------------- 1 file changed, 86 insertions(+), 80 deletions(-) diff --git a/nodes/core/storage/50-file.js b/nodes/core/storage/50-file.js index 43195901f..936e2f84f 100644 --- a/nodes/core/storage/50-file.js +++ b/nodes/core/storage/50-file.js @@ -28,10 +28,9 @@ module.exports = function(RED) { this.createDir = n.createDir || false; var node = this; node.wstream = null; - node.data = []; node.msgQueue = []; node.closing = false; - node.closeCallbacks = []; + node.closeCallback = null; function processMsg(msg, done) { var filename = node.filename || msg.filename || ""; @@ -76,83 +75,81 @@ module.exports = function(RED) { if (typeof data === "boolean") { data = data.toString(); } if (typeof data === "number") { data = data.toString(); } if ((node.appendNewline) && (!Buffer.isBuffer(data))) { data += os.EOL; } - node.data.push({msg:msg,data:Buffer.from(data)}); - - while (node.data.length > 0) { - if (node.overwriteFile === "true") { - (function(packet) { - node.wstream = fs.createWriteStream(filename, { encoding:'binary', flags:'w', autoClose:true }); - node.wstream.on("error", function(err) { - node.error(RED._("file.errors.writefail",{error:err.toString()}),msg); - }); - node.wstream.on("open", function() { - node.wstream.end(packet.data, function() { - node.send(packet.msg); - done(); - }); - }) - })(node.data.shift()); - } - else { - // Append mode - var recreateStream = !node.wstream || !node.filename; - if (node.wstream && node.wstreamIno) { - // There is already a stream open and we have the inode - // of the file. Check the file hasn't been deleted - // or deleted and recreated. - try { - var stat = fs.statSync(filename); - // File exists - check the inode matches - if (stat.ino !== node.wstreamIno) { - // The file has been recreated. Close the current - // stream and recreate it - recreateStream = true; - node.wstream.end(); - delete node.wstream; - delete node.wstreamIno; - } - } catch(err) { - // File does not exist + if (node.overwriteFile === "true") { + var wstream = fs.createWriteStream(filename, { encoding:'binary', flags:'w', autoClose:true }); + node.wstream = wstream; + wstream.on("error", function(err) { + node.error(RED._("file.errors.writefail",{error:err.toString()}),msg); + done(); + }); + wstream.on("open", function() { + wstream.end(data, function() { + node.send(msg); + done(); + }); + }) + return; + } + else { + // Append mode + var recreateStream = !node.wstream || !node.filename; + if (node.wstream && node.wstreamIno) { + // There is already a stream open and we have the inode + // of the file. Check the file hasn't been deleted + // or deleted and recreated. + try { + var stat = fs.statSync(filename); + // File exists - check the inode matches + if (stat.ino !== node.wstreamIno) { + // The file has been recreated. Close the current + // stream and recreate it recreateStream = true; node.wstream.end(); delete node.wstream; delete node.wstreamIno; } - } - if (recreateStream) { - node.wstream = fs.createWriteStream(filename, { encoding:'binary', flags:'a', autoClose:true }); - node.wstream.on("open", function(fd) { - try { - var stat = fs.statSync(filename); - node.wstreamIno = stat.ino; - } catch(err) { - } - }); - node.wstream.on("error", function(err) { - node.error(RED._("file.errors.appendfail",{error:err.toString()}),msg); - }); - } - if (node.filename) { - // Static filename - write and reuse the stream next time - var packet = node.data.shift() - node.wstream.write(packet.data, function() { - node.send(packet.msg); - done(); - }); - - } else { - // Dynamic filename - write and close the stream - var packet = node.data.shift() - node.wstream.end(packet.data, function() { - node.send(packet.msg); - delete node.wstream; - delete node.wstreamIno; - done(); - }); + } catch(err) { + // File does not exist + recreateStream = true; + node.wstream.end(); + delete node.wstream; + delete node.wstreamIno; } } + if (recreateStream) { + node.wstream = fs.createWriteStream(filename, { encoding:'binary', flags:'a', autoClose:true }); + node.wstream.on("open", function(fd) { + try { + var stat = fs.statSync(filename); + node.wstreamIno = stat.ino; + } catch(err) { + } + }); + node.wstream.on("error", function(err) { + node.error(RED._("file.errors.appendfail",{error:err.toString()}),msg); + done(); + }); + } + if (node.filename) { + // Static filename - write and reuse the stream next time + node.wstream.write(data, function() { + node.send(msg); + done(); + }); + } else { + // Dynamic filename - write and close the stream + node.wstream.end(data, function() { + node.send(msg); + delete node.wstream; + delete node.wstreamIno; + done(); + }); + } } } + else { + done(); + } } function processQ(queue) { @@ -162,6 +159,9 @@ module.exports = function(RED) { if (queue.length > 0) { processQ(queue); } + else if (node.closing) { + closeNode(); + } }); } @@ -171,9 +171,15 @@ module.exports = function(RED) { // pending write exists return; } - processQ(msgQueue); - if (node.closing) { - closeNode(); + try { + processQ(msgQueue); + } + catch (e) { + node.msgQueue = []; + if (node.closing) { + closeNode(); + } + throw e; } }); @@ -181,23 +187,23 @@ module.exports = function(RED) { if (node.wstream) { node.wstream.end(); } if (node.tout) { clearTimeout(node.tout); } node.status({}); - var callbacks = node.closeCallbacks; - node.closeCallbacks = []; + var cb = node.closeCallback; + node.closeCallback = null; node.closing = false; - for (cb in callbacks) { + if (cb) { cb(); } } - this.on('close', function(cb) { - if (cb) { - node.closeCallbacks.push(done); - } + this.on('close', function(done) { if (node.closing) { // already closing return; } node.closing = true; + if (done) { + node.closeCallback = done; + } if (node.msgQueue.length > 0) { // close after queue processed return;