1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

clone msg for every single line output

(this may not be desired)
Add tests to ensure properties go trhough
This commit is contained in:
Dave Conway-Jones 2019-10-25 15:09:20 +01:00
parent c8b8beda6a
commit 7c4e782c60
No known key found for this signature in database
GPG Key ID: 302A6725C594817F
2 changed files with 80 additions and 33 deletions

View File

@ -140,8 +140,8 @@ module.exports = function(RED) {
try {
var stat = fs.statSync(filename);
node.wstreamIno = stat.ino;
} catch(err) {
}
catch(err) { }
});
node.wstream.on("error", function(err) {
node.error(RED._("file.errors.appendfail",{error:err.toString()}),msg);
@ -276,7 +276,6 @@ module.exports = function(RED) {
ch = "\n";
type = "string";
}
var hwm;
var getout = false;
var rs = fs.createReadStream(filename)
@ -290,30 +289,24 @@ module.exports = function(RED) {
spare += decode(chunk, node.encoding);
var bits = spare.split("\n");
for (var i=0; i < bits.length - 1; i++) {
var m = {
payload:bits[i],
topic:msg.topic,
filename:msg.filename,
parts:{index:count, ch:ch, type:type, id:msg._msgid}
}
var sendMessage = RED.util.cloneMessage(msg);
sendMessage.payload = bits[i];
sendMessage.parts = {index:count, ch:ch, type:type, id:msg._msgid};
count += 1;
nodeSend(m);
nodeSend(sendMessage);
}
spare = bits[i];
}
if (node.format === "stream") {
var m = {
payload:chunk,
topic:msg.topic,
filename:msg.filename,
parts:{index:count, ch:ch, type:type, id:msg._msgid}
}
var sendMessage = RED.util.cloneMessage(msg);
sendMessage.payload = chunk;
sendMessage.parts = {index:count, ch:ch, type:type, id:msg._msgid};
count += 1;
if (chunk.length < hwm) { // last chunk is smaller that high water mark = eof
getout = false;
m.parts.count = count;
sendMessage.parts.count = count;
}
nodeSend(m);
nodeSend(sendMessage);
}
}
else {
@ -332,28 +325,23 @@ module.exports = function(RED) {
nodeDone();
})
.on('end', function() {
var sendMessage = RED.util.cloneMessage(msg);
if (node.chunk === false) {
if (node.format === "utf8") {
msg.payload = decode(lines, node.encoding);
sendMessage.payload = decode(lines, node.encoding);
}
else { msg.payload = lines; }
nodeSend(msg);
else { sendMessage.payload = lines; }
nodeSend(sendMessage);
}
else if (node.format === "lines") {
var m = { payload: spare,
parts: {
index: count,
count: count+1,
ch: ch,
type: type,
id: msg._msgid
}
};
nodeSend(m);
sendMessage.payload = spare;
sendMessage.parts = { index:count, count:count+1, ch:ch, type:type, id:msg._msgid };
nodeSend(sendMessage);
}
else if (getout) { // last chunk same size as high water mark - have to send empty extra packet.
var m = { parts:{index:count, count:count, ch:ch, type:type, id:msg._msgid} };
nodeSend(m);
delete sendMessage.payload;
sendMessage.parts = { parts:{index:count, count:count, ch:ch, type:type, id:msg._msgid} };
nodeSend(sendMessage);
}
nodeDone();
});

View File

@ -1192,6 +1192,43 @@ describe('file Nodes', function() {
});
});
it('should read in a file and output split lines with parts and preserve other properties', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"lines", wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(fileNode, flow, function() {
var n1 = helper.getNode("fileInNode1");
var n2 = helper.getNode("n2");
var c = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property('payload');
msg.payload.should.be.a.String();
msg.should.have.property('topic',"dujour");
msg.should.have.property('foo',"bar");
msg.should.have.property('parts');
msg.parts.should.have.property('index',c);
msg.parts.should.have.property('type','string');
msg.parts.should.have.property('ch','\n');
if (c === 0) {
msg.payload.should.equal("File message line 1");
}
if (c === 1) {
msg.payload.should.equal("File message line 2");
}
if (c === 2) {
msg.payload.should.equal("");
done();
}
c++;
}
catch(e) {
done(e);
}
});
n1.receive({payload:"",topic:"dujour",foo:"bar"});
});
});
it('should read in a file and output a buffer with parts', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"stream", wires:[["n2"]]},
{id:"n2", type:"helper"}];
@ -1212,6 +1249,28 @@ describe('file Nodes', function() {
});
});
it('should read in a file and output a buffer with parts and preserve other properties', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", filename:fileToTest, format:"stream", wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(fileNode, flow, function() {
var n1 = helper.getNode("fileInNode1");
var n2 = helper.getNode("n2");
n2.on("input", function(msg) {
msg.should.have.property('payload');
msg.should.have.property('topic',"dujour");
msg.should.have.property('foo',"bar");
Buffer.isBuffer(msg.payload).should.be.true();
msg.payload.should.have.length(40);
msg.should.have.property('parts');
msg.parts.should.have.property('count',1);
msg.parts.should.have.property('type','buffer');
msg.parts.should.have.property('ch','');
done();
});
n1.receive({payload:"",topic:"dujour",foo:"bar"});
});
});
it('should warn if no filename set', function(done) {
var flow = [{id:"fileInNode1", type:"file in", name: "fileInNode", "format":""}];
helper.load(fileNode, flow, function() {