From 4ce0e3976046338d10b00657ca9f10ec1db517d8 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Wed, 14 Aug 2019 22:28:10 +0100 Subject: [PATCH] Add nodeDone to File nodes --- .../@node-red/nodes/core/storage/50-file.js | 51 +++++++++++-------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/storage/50-file.js b/packages/node_modules/@node-red/nodes/core/storage/50-file.js index 547ef3132..c27c3ed5f 100644 --- a/packages/node_modules/@node-red/nodes/core/storage/50-file.js +++ b/packages/node_modules/@node-red/nodes/core/storage/50-file.js @@ -34,8 +34,9 @@ module.exports = function(RED) { } return data.toString(); } - + function FileNode(n) { + // Write/delete a file RED.nodes.createNode(this,n); this.filename = n.filename; this.appendNewline = n.appendNewline; @@ -48,7 +49,7 @@ module.exports = function(RED) { node.closing = false; node.closeCallback = null; - function processMsg(msg, done) { + function processMsg(msg,nodeSend, done) { var filename = node.filename || msg.filename || ""; if ((!node.filename) && (!node.tout)) { node.tout = setTimeout(function() { @@ -68,7 +69,7 @@ module.exports = function(RED) { if (RED.settings.verbose) { node.log(RED._("file.status.deletedfile",{file:filename})); } - node.send(msg); + nodeSend(msg); } done(); }); @@ -101,7 +102,7 @@ module.exports = function(RED) { }); wstream.on("open", function() { wstream.end(buf, function() { - node.send(msg); + nodeSend(msg); done(); }); }) @@ -150,13 +151,13 @@ module.exports = function(RED) { if (node.filename) { // Static filename - write and reuse the stream next time node.wstream.write(buf, function() { - node.send(msg); + nodeSend(msg); done(); }); } else { // Dynamic filename - write and close the stream node.wstream.end(buf, function() { - node.send(msg); + nodeSend(msg); delete node.wstream; delete node.wstreamIno; done(); @@ -169,12 +170,13 @@ module.exports = function(RED) { } } - function processQ(queue) { - var msg = queue[0]; - processMsg(msg, function() { + function processQueue(queue) { + var event = queue[0]; + processMsg(event.msg, event.send, function() { + event.done(); queue.shift(); if (queue.length > 0) { - processQ(queue); + processQueue(queue); } else if (node.closing) { closeNode(); @@ -182,14 +184,19 @@ module.exports = function(RED) { }); } - this.on("input", function(msg) { + this.on("input", function(msg,nodeSend,nodeDone) { var msgQueue = node.msgQueue; - if (msgQueue.push(msg) > 1) { + msgQueue.push({ + msg: msg, + send: nodeSend, + done: nodeDone + }) + if (msgQueue.length > 1) { // pending write exists return; } try { - processQ(msgQueue); + processQueue(msgQueue); } catch (e) { node.msgQueue = []; @@ -234,6 +241,7 @@ module.exports = function(RED) { function FileInNode(n) { + // Read a file RED.nodes.createNode(this,n); this.filename = n.filename; this.format = n.format; @@ -248,13 +256,14 @@ module.exports = function(RED) { if (this.format === "stream") { this.chunk = true; } var node = this; - this.on("input",function(msg) { + this.on("input",function(msg, nodeSend, nodeDone) { var filename = (node.filename || msg.filename || "").replace(/\t|\r|\n/g,''); if (!node.filename) { node.status({fill:"grey",shape:"dot",text:filename}); } if (filename === "") { node.warn(RED._("file.errors.nofilename")); + nodeDone(); } else { msg.filename = filename; @@ -288,7 +297,7 @@ module.exports = function(RED) { parts:{index:count, ch:ch, type:type, id:msg._msgid} } count += 1; - node.send(m); + nodeSend(m); } spare = bits[i]; } @@ -304,7 +313,7 @@ module.exports = function(RED) { getout = false; m.parts.count = count; } - node.send(m); + nodeSend(m); } } else { @@ -318,8 +327,9 @@ module.exports = function(RED) { var sendMessage = RED.util.cloneMessage(msg); delete sendMessage.payload; sendMessage.error = err; - node.send(sendMessage); + nodeSend(sendMessage); } + nodeDone(); }) .on('end', function() { if (node.chunk === false) { @@ -327,7 +337,7 @@ module.exports = function(RED) { msg.payload = decode(lines, node.encoding); } else { msg.payload = lines; } - node.send(msg); + nodeSend(msg); } else if (node.format === "lines") { var m = { payload: spare, @@ -339,12 +349,13 @@ module.exports = function(RED) { id: msg._msgid } }; - node.send(m); + nodeSend(m); } else if (getout) { // last chunk same size as high water mark - have to send empty extra packet. var m = { parts:{index:count, count:count, ch:ch, type:type, id:msg._msgid} }; - node.send(m); + nodeSend(m); } + nodeDone(); }); } });