Add option for file-in node to include all properties (default off)

and add test
This commit is contained in:
Dave Conway-Jones 2021-06-25 14:39:18 +01:00
parent d6f6efc189
commit 6aac44db14
No known key found for this signature in database
GPG Key ID: 88BA2B8A411BE9FF
4 changed files with 95 additions and 26 deletions

View File

@ -48,9 +48,14 @@
<option value="stream" data-i18n="file.output.stream"></option> <option value="stream" data-i18n="file.output.stream"></option>
</select> </select>
</div> </div>
<div class="form-row" id="file-allprops">
<label>&nbsp;</label>
<input type="checkbox" id="node-input-allProps" style="display:inline-block; width:auto; vertical-align:top;">
<label for="node-input-allProps" style="width: 70%;"><span data-i18n="file.label.allProps"></span></label>
</div>
<div class="form-row" id="encoding-spec"> <div class="form-row" id="encoding-spec">
<label for="node-input-encoding"><i class="fa fa-flag"></i> <span data-i18n="file.label.encoding"></span></label> <label for="node-input-encoding"><i class="fa fa-flag"></i> <span data-i18n="file.label.encoding"></span></label>
<select type="text" id="node-input-encoding" style="width: 250px;"> <select type="text" id="node-input-encoding" style="width:250px;">
</select> </select>
</div> </div>
<div class="form-row"> <div class="form-row">
@ -264,7 +269,8 @@
format: {value:"utf8"}, format: {value:"utf8"},
chunk: {value:false}, chunk: {value:false},
sendError: {value: false}, sendError: {value: false},
encoding: {value: "none"} encoding: {value: "none"},
allProps: {value: false}
}, },
color:"BurlyWood", color:"BurlyWood",
inputs:1, inputs:1,
@ -317,6 +323,12 @@
else { else {
$("#encoding-spec").hide(); $("#encoding-spec").hide();
} }
if ((format === "lines") || (format === "stream")) {
$("#file-allprops").show();
}
else {
$("#file-allprops").hide();
}
}); });
} }
}); });

View File

@ -261,6 +261,7 @@ module.exports = function(RED) {
this.format = n.format; this.format = n.format;
this.chunk = false; this.chunk = false;
this.encoding = n.encoding || "none"; this.encoding = n.encoding || "none";
this.allProps = n.allProps || false;
if (n.sendError === undefined) { if (n.sendError === undefined) {
this.sendError = true; this.sendError = true;
} else { } else {
@ -299,6 +300,7 @@ module.exports = function(RED) {
var rs = fs.createReadStream(fullFilename) var rs = fs.createReadStream(fullFilename)
.on('readable', function () { .on('readable', function () {
var chunk; var chunk;
var m;
var hwm = rs._readableState.highWaterMark; var hwm = rs._readableState.highWaterMark;
while (null !== (chunk = rs.read())) { while (null !== (chunk = rs.read())) {
if (node.chunk === true) { if (node.chunk === true) {
@ -307,24 +309,32 @@ module.exports = function(RED) {
spare += decode(chunk, node.encoding); spare += decode(chunk, node.encoding);
var bits = spare.split("\n"); var bits = spare.split("\n");
for (var i=0; i < bits.length - 1; i++) { for (var i=0; i < bits.length - 1; i++) {
var m = { m = {};
payload:bits[i], if (node.allProps == true) {
topic:msg.topic, m = RED.util.cloneMessage(msg);
filename:msg.filename,
parts:{index:count, ch:ch, type:type, id:msg._msgid}
} }
else {
m.topic = msg.topic;
m.filename = msg.filename;
}
m.payload = bits[i];
m.parts= {index:count, ch:ch, type:type, id:msg._msgid}
count += 1; count += 1;
nodeSend(m); nodeSend(m);
} }
spare = bits[i]; spare = bits[i];
} }
if (node.format === "stream") { if (node.format === "stream") {
var m = { m = {};
payload:chunk, if (node.allProps == true) {
topic:msg.topic, m = RED.util.cloneMessage(msg);
filename:msg.filename,
parts:{index:count, ch:ch, type:type, id:msg._msgid}
} }
else {
m.topic = msg.topic;
m.filename = msg.filename;
}
m.payload = chunk;
m.parts = {index:count, ch:ch, type:type, id:msg._msgid}
count += 1; count += 1;
if (chunk.length < hwm) { // last chunk is smaller that high water mark = eof if (chunk.length < hwm) { // last chunk is smaller that high water mark = eof
getout = false; getout = false;
@ -357,17 +367,22 @@ module.exports = function(RED) {
nodeSend(msg); nodeSend(msg);
} }
else if (node.format === "lines") { else if (node.format === "lines") {
var m = { var m = {};
payload: spare, if (node.allProps) {
topic:msg.topic, m = RED.util.cloneMessage(msg);
parts: { }
index: count, else {
count: count+1, m.topic = msg.topic;
ch: ch, m.filename = msg.filename;
type: type, }
id: msg._msgid m.payload = spare;
} m.parts = {
}; index: count,
count: count + 1,
ch: ch,
type: type,
id: msg._msgid
}
nodeSend(m); nodeSend(m);
} }
else if (getout) { // last chunk same size as high water mark - have to send empty extra packet. else if (getout) { // last chunk same size as high water mark - have to send empty extra packet.

View File

@ -861,7 +861,8 @@
"encoding": "Encoding", "encoding": "Encoding",
"deletelabel": "delete __file__", "deletelabel": "delete __file__",
"utf8String": "UTF8 string", "utf8String": "UTF8 string",
"binaryBuffer": "binary buffer" "binaryBuffer": "binary buffer",
"allProps": "include all existing properties in each msg"
}, },
"action": { "action": {
"append": "append to file", "append": "append to file",

View File

@ -14,7 +14,6 @@
* limitations under the License. * limitations under the License.
**/ **/
var should = require("should");
var path = require('path'); var path = require('path');
var fs = require('fs-extra'); var fs = require('fs-extra');
var os = require('os'); var os = require('os');
@ -1184,6 +1183,9 @@ describe('file Nodes', function() {
n2.on("input", function(msg) { n2.on("input", function(msg) {
try { try {
msg.should.have.property('payload'); msg.should.have.property('payload');
msg.should.have.property('topic');
msg.should.not.have.property('foo');
msg.should.not.have.property('bar');
msg.payload.should.be.a.String(); msg.payload.should.be.a.String();
msg.should.have.property('parts'); msg.should.have.property('parts');
msg.parts.should.have.property('index',c); msg.parts.should.have.property('index',c);
@ -1205,7 +1207,7 @@ describe('file Nodes', function() {
done(e); done(e);
} }
}); });
n1.receive({payload:""}); n1.receive({payload:"",topic:"A",foo:"bar",bar:"foo"});
}); });
}); });
@ -1245,6 +1247,45 @@ describe('file Nodes', function() {
}); });
}); });
it('should read in a file and output split lines with parts and extra props', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"lines", allProps:true, wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(fileNode, flow, function() {
var n1 = helper.getNode("fileInNode1");
var n2 = helper.getNode("n2");
var c = 0;
n2.on("input", function(msg) {
// console.log(msg)
try {
msg.should.have.property('payload');
msg.payload.should.be.a.String();
msg.should.have.property('topic');
msg.should.have.property('foo');
msg.should.have.property('bar');
msg.should.have.property('parts');
msg.parts.should.have.property('index',c);
msg.parts.should.have.property('type','string');
msg.parts.should.have.property('ch','\n');
if (c === 0) {
msg.payload.should.equal("File message line 1");
}
if (c === 1) {
msg.payload.should.equal("File message line 2");
}
if (c === 2) {
msg.payload.should.equal("");
done();
}
c++;
}
catch(e) {
done(e);
}
});
n1.receive({payload:"",topic:"B",foo:"bar",bar:"foo"});
});
});
it('should read in a file and output a buffer with parts', function(done) { it('should read in a file and output a buffer with parts', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"stream", wires:[["n2"]]}, var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"stream", wires:[["n2"]]},
{id:"n2", type:"helper"}]; {id:"n2", type:"helper"}];