1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

adding streaming modes into split node

and add tests
This commit is contained in:
Dave Conway-Jones 2017-06-16 22:26:14 +01:00
parent e70766a535
commit 4f34980c9f
No known key found for this signature in database
GPG Key ID: 81B04231572A9A2D
4 changed files with 198 additions and 73 deletions

View File

@ -24,12 +24,10 @@
<input type="text" id="node-input-splt" style="width: 70%">
<input type="hidden" id="node-input-spltType">
</div>
<!--
<div class="form-row">
<input type="checkbox" id="node-input-stream" style="margin-left: 10px; vertical-align:top; width: auto;">
<label for="node-input-stream" style="width:auto;" data-i18n="split.stream"></label>
</div>
-->
<div class="form-row"><span data-i18n="[html]split.array"></span></div>
<div class="form-row">
<label for="node-input-arraySplt" style="padding-left: 10px;margin-right: -10px;" data-i18n="split.splitUsing"></label>

View File

@ -18,14 +18,22 @@ module.exports = function(RED) {
"use strict";
function sendArray(node,msg,array) {
for (var i = 0; i < array.length; i++) {
for (var i = 0; i < array.length-1; i++) {
msg.payload = array[i];
msg.parts.index = i;
msg.parts.count = array.length;
//if (i === a.length-1) { msg.complete = true; }
msg.parts.index = node.c++;
if (node.stream !== true) { msg.parts.count = array.length; }
node.send(RED.util.cloneMessage(msg));
}
if (node.stream !== true) {
msg.payload = array[i];
msg.parts.index = node.c++;
msg.parts.count = array.length;
node.send(RED.util.cloneMessage(msg));
node.c = 0;
}
else { node.remainder = array[i]; }
}
function SplitNode(n) {
RED.nodes.createNode(this,n);
var node = this;
@ -57,17 +65,16 @@ module.exports = function(RED) {
return;
}
node.c = 0;
node.buffer = new Buffer([]);
node.buffer = new Buffer.from([]);
this.on("input", function(msg) {
if (msg.hasOwnProperty("payload")) {
var a = msg.payload;
if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack
else { msg.parts = {}; }
msg.parts.id = msg._msgid; // use the existing _msgid by default.
if (typeof msg.payload === "string") { // Split String into array
msg.payload = (node.remainder || "") + msg.payload;
msg.parts.type = "string";
if (node.spltType === "len") {
// a = msg.payload.match()
msg.parts.ch = "";
var count = msg.payload.length/node.splt;
if (Math.floor(count) !== count) {
@ -75,16 +82,27 @@ module.exports = function(RED) {
//TODO stream support
count = Math.ceil(count);
}
msg.parts.count = count;
if (node.stream !== true) {
msg.parts.count = count;
node.c = 0;
}
var pos = 0;
var data = msg.payload;
for (var i=0; i<count; i++) {
for (var i=0; i<count-1; i++) {
msg.payload = data.substring(pos,pos+node.splt);
msg.parts.index = i;
msg.parts.index = node.c++;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
}
} else {
node.remainder = data.substring(pos);
if ((node.stream !== true) || (node.remainder.length === node.splt)) {
msg.payload = node.remainder;
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
node.remainder = "";
}
}
else {
var a = [];
if (node.spltType === "bin") {
if (!node.spltBufferString) {
@ -98,7 +116,8 @@ module.exports = function(RED) {
}
sendArray(node,msg,a);
}
} else if (Array.isArray(msg.payload)) { // then split array into messages
}
else if (Array.isArray(msg.payload)) { // then split array into messages
msg.parts.type = "array";
var count = msg.payload.length/node.arraySplt;
if (Math.floor(count) !== count) {
@ -119,7 +138,8 @@ module.exports = function(RED) {
pos += node.arraySplt;
node.send(RED.util.cloneMessage(msg));
}
} else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
}
else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
var j = 0;
var l = Object.keys(msg.payload).length;
var pay = msg.payload;
@ -134,83 +154,81 @@ module.exports = function(RED) {
j += 1;
}
}
} else if (Buffer.isBuffer(msg.payload)) {
}
else if (Buffer.isBuffer(msg.payload)) {
var len = node.buffer.length + msg.payload.length;
var buff = Buffer.concat([node.buffer, msg.payload], len);
msg.parts.type = "buffer";
if (node.spltType === "len") {
var count = msg.payload.length/node.splt;
var count = buff.length/node.splt;
if (Math.floor(count) !== count) {
// Partial last packet
//TODO stream support
count = Math.ceil(count);
}
msg.parts.count = count;
if (node.stream !== true) {
msg.parts.count = count;
node.c = 0;
}
var pos = 0;
var data = msg.payload;
msg.parts.len = node.splt;
for (var i=0; i<count; i++) {
msg.payload = data.slice(pos,pos+node.splt);
msg.parts.index = i;
for (var i=0; i<count-1; i++) {
msg.payload = buff.slice(pos,pos+node.splt);
msg.parts.index = node.c++;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
}
} else {
node.buffer = buff.slice(pos);
if ((node.stream !== true) || (node.buffer.length === node.splt)) {
msg.payload = node.buffer;
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
node.buffer = new Buffer.from([]);
}
}
else {
var count = 0;
if (node.spltType === "bin") {
msg.parts.ch = node.spltBuffer;
} else if (node.spltType === "str") {
msg.parts.ch = node.splt;
}
var pos = msg.payload.indexOf(node.splt);
var pos = buff.indexOf(node.splt);
var end;
while (pos > -1) {
count++;
end = pos+node.splt.length;
pos = msg.payload.indexOf(node.splt,end);
pos = buff.indexOf(node.splt,end);
}
//TODO: stream support
// if (end < msg.payload.length) {
count++;
// }
msg.parts.count = count;
if (node.stream !== true) {
msg.parts.count = count;
node.c = 0;
}
var i = 0, p = 0;
var data = msg.payload;
pos = data.indexOf(node.splt);
pos = buff.indexOf(node.splt);
while (pos > -1) {
msg.payload = data.slice(p,pos);
msg.parts.index = i;
msg.payload = buff.slice(p,pos);
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
i++;
p = pos+node.splt.length;
pos = data.indexOf(node.splt,p);
pos = buff.indexOf(node.splt,p);
}
// if (p < data.length) {
if ((node.stream !== true) && (p < buff.length)) {
// TODO: stream support;
msg.payload = data.slice(p,data.length);
msg.parts.index = i;
msg.payload = buff.slice(p,buff.length);
msg.parts.index = node.c++;
msg.parts.count = node.c++;
node.send(RED.util.cloneMessage(msg));
// }
}
else {
node.buffer = buff.slice(p,buff.length);
}
}
// var len = /*node.buffer.length + */ msg.payload.length;
// var buff = Buffer.concat([/*node.buffer,*/ msg.payload], len);
// var pos = buff.indexOf(node.splt);
// msg.parts.type = "buffer";
// msg.parts.ch = node.splt; // pass the split to other end for rejoin
// while (pos !== -1) {
// msg.payload = buff.slice(0,pos);
// msg.parts.index = node.c;
// node.c += 1;
// buff = buff.slice(pos + node.splt.length);
// if (buff.length === 0) {
// msg.parts.count = node.c;
// node.c = 0; //reset the count if no remainder.
// }
// node.send(RED.util.cloneMessage(msg));
// pos = buff.indexOf(node.splt);
// }
// // save the remainder to use as start of next time round
// node.buffer = buff;
}
//else { } // otherwise drop the message.
}

View File

@ -134,7 +134,7 @@ describe('range Node', function() {
var sinon = require('sinon');
sinon.stub(rangeNode1, 'log', function(log) {
if(log.indexOf("notnumber") > -1) {
if (log.indexOf("notnumber") > -1) {
done();
} else {
try {

View File

@ -58,6 +58,47 @@ describe('SPLIT node', function() {
});
});
it('should split an array into multiple messages of a specified size', function(done) {
var flow = [{id:"sn1", type:"split", wires:[["sn2"]], arraySplt:3, arraySpltType:"len"},
{id:"sn2", type:"helper"}];
helper.load(splitNode, flow, function() {
var sn1 = helper.getNode("sn1");
var sn2 = helper.getNode("sn2");
sn2.on("input", function(msg) {
msg.should.have.property("parts");
msg.parts.should.have.property("count",2);
msg.parts.should.have.property("type","array");
msg.parts.should.have.property("index");
msg.payload.should.be.an.Array;
if (msg.parts.index === 0) { msg.payload.length.should.equal(3); }
if (msg.parts.index === 1) { msg.payload.length.should.equal(1); done(); }
});
sn1.receive({payload:[1,2,3,4]});
});
});
it('should split an object into pieces', function(done) {
var flow = [{id:"sn1", type:"split", wires:[["sn2"]]},
{id:"sn2", type:"helper"}];
helper.load(splitNode, flow, function() {
var sn1 = helper.getNode("sn1");
var sn2 = helper.getNode("sn2");
var count = 0;
sn2.on("input", function(msg) {
msg.should.have.property("payload");
msg.should.have.property("parts");
msg.parts.should.have.property("type","object");
msg.parts.should.have.property("key");
msg.parts.should.have.property("count");
msg.parts.should.have.property("index");
if (msg.parts.index === 0) { msg.payload.should.equal(1); }
if (msg.parts.index === 1) { msg.payload.should.equal("2"); }
if (msg.parts.index === 2) { msg.payload.should.equal(true); done(); }
});
sn1.receive({payload:{a:1,b:"2",c:true}});
});
});
it('should split a string into new-lines', function(done) {
var flow = [{id:"sn1", type:"split", wires:[["sn2"]]},
{id:"sn2", type:"helper"}];
@ -69,12 +110,12 @@ describe('SPLIT node', function() {
msg.parts.should.have.property("count",4);
msg.parts.should.have.property("type","string");
msg.parts.should.have.property("index");
if (msg.parts.index === 0) { msg.payload.should.equal("D"); }
if (msg.parts.index === 1) { msg.payload.should.equal("a"); }
if (msg.parts.index === 2) { msg.payload.should.equal("v"); }
if (msg.parts.index === 3) { msg.payload.should.equal("e"); done(); }
if (msg.parts.index === 0) { msg.payload.should.equal("Da"); }
if (msg.parts.index === 1) { msg.payload.should.equal("ve"); }
if (msg.parts.index === 2) { msg.payload.should.equal(" "); }
if (msg.parts.index === 3) { msg.payload.should.equal("CJ"); done(); }
});
sn1.receive({payload:"D\na\nv\ne"});
sn1.receive({payload:"Da\nve\n \nCJ"});
});
});
@ -98,25 +139,93 @@ describe('SPLIT node', function() {
});
});
it('should split an object into pieces', function(done) {
var flow = [{id:"sn1", type:"split", wires:[["sn2"]]},
it('should split a string into lengths', function(done) {
var flow = [{id:"sn1", type:"split", wires:[["sn2"]], splt:"2", spltType:"len"},
{id:"sn2", type:"helper"}];
helper.load(splitNode, flow, function() {
var sn1 = helper.getNode("sn1");
var sn2 = helper.getNode("sn2");
var count = 0;
sn2.on("input", function(msg) {
msg.should.have.property("payload");
msg.should.have.property("parts");
msg.parts.should.have.property("type","object");
msg.parts.should.have.property("key");
msg.parts.should.have.property("count");
msg.parts.should.have.property("count",4);
msg.parts.should.have.property("ch","");
msg.parts.should.have.property("index");
if (msg.parts.index === 0) { msg.payload.should.equal(1); }
if (msg.parts.index === 1) { msg.payload.should.equal("2"); }
if (msg.parts.index === 2) { msg.payload.should.equal(true); done(); }
msg.parts.should.have.property("type","string");
if (msg.parts.index === 0) { msg.payload.should.equal("12"); }
if (msg.parts.index === 1) { msg.payload.should.equal("34"); }
if (msg.parts.index === 2) { msg.payload.should.equal("56"); }
if (msg.parts.index === 3) { msg.payload.should.equal("78"); done(); }
});
sn1.receive({payload:{a:1,b:"2",c:true}});
sn1.receive({payload:"12345678"});
});
});
it('should split a string on a specified char in stream mode', function(done) {
var flow = [{id:"sn1", type:"split", wires:[["sn2"]], splt:"\n", stream:true},
{id:"sn2", type:"helper"}];
helper.load(splitNode, flow, function() {
var sn1 = helper.getNode("sn1");
var sn2 = helper.getNode("sn2");
sn2.on("input", function(msg) {
msg.should.have.property("parts");
msg.parts.should.have.property("ch","\n");
msg.parts.should.have.property("index");
msg.parts.should.have.property("type","string");
if (msg.parts.index === 0) { msg.payload.should.equal("1"); }
if (msg.parts.index === 1) { msg.payload.should.equal("2"); }
if (msg.parts.index === 2) { msg.payload.should.equal("3"); }
if (msg.parts.index === 3) { msg.payload.should.equal("4"); }
if (msg.parts.index === 4) { msg.payload.should.equal("5"); }
if (msg.parts.index === 5) { msg.payload.should.equal("6"); done(); }
});
sn1.receive({payload:"1\n2\n3\n"});
sn1.receive({payload:"4\n5\n6\n"});
});
});
it('should split a buffer into lengths', function(done) {
var flow = [{id:"sn1", type:"split", wires:[["sn2"]], splt:"2", spltType:"len"},
{id:"sn2", type:"helper"}];
helper.load(splitNode, flow, function() {
var sn1 = helper.getNode("sn1");
var sn2 = helper.getNode("sn2");
sn2.on("input", function(msg) {
//console.log(msg);
msg.should.have.property("parts");
msg.payload.should.be.a.Buffer;
msg.parts.should.have.property("count",4);
msg.parts.should.have.property("index");
msg.parts.should.have.property("type","buffer");
if (msg.parts.index === 0) { msg.payload.toString().should.equal("12"); }
if (msg.parts.index === 1) { msg.payload.toString().should.equal("34"); }
if (msg.parts.index === 2) { msg.payload.toString().should.equal("56"); }
if (msg.parts.index === 3) { msg.payload.toString().should.equal("78"); done(); }
});
var b = new Buffer.from("12345678");
sn1.receive({payload:b});
});
});
it('should split a buffer on another buffer (streaming)', function(done) {
var flow = [{id:"sn1", type:"split", wires:[["sn2"]], splt:"[52]", spltType:"bin", stream:true},
{id:"sn2", type:"helper"}];
helper.load(splitNode, flow, function() {
var sn1 = helper.getNode("sn1");
var sn2 = helper.getNode("sn2");
sn2.on("input", function(msg) {
//console.log(msg);
msg.should.have.property("parts");
msg.payload.should.be.a.Buffer;
msg.parts.should.have.property("index");
msg.parts.should.have.property("type","buffer");
if (msg.parts.index === 0) { msg.payload.toString().should.equal("123"); }
if (msg.parts.index === 1) { msg.payload.toString().should.equal("123"); }
if (msg.parts.index === 2) { msg.payload.toString().should.equal("123"); done(); }
});
var b1 = new Buffer.from("123412");
var b2 = new Buffer.from("341234");
sn1.receive({payload:b1});
sn1.receive({payload:b2});
});
});