Add buffer support to split node

This commit is contained in:
Nick O'Leary 2017-06-13 21:01:04 +01:00
parent d9dc171c28
commit c26852da77
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
3 changed files with 332 additions and 89 deletions

View File

@ -785,5 +785,41 @@
"createfail": "failed to create file: __error__"
},
"tip": "Tip: The filename should be an absolute path, otherwise it will be relative to the working directory of the Node-RED process."
},
"split": {
"intro":"Split <code>msg.payload</code> based on type:",
"object":"<b>Object</b>",
"objectSend":"Send a message for each key/value pair",
"strBuff":"<b>String</b> / <b>Buffer</b>",
"array":"<b>Array</b>",
"splitUsing": "Split using",
"splitLength": "Fixed length of",
"stream":"Handle as a stream of messages"
},
"join":{
"mode":{
"mode":"Mode",
"auto":"automatic",
"custom":"manual"
},
"combine":"Combine each",
"create":"to create",
"type":{
"string":"a String",
"array":"an Array",
"object":"a key/value Object",
"merged":"a merged Object"
},
"using":"using",
"key":"as the key",
"joinedUsing":"joined using",
"send":"Send the message:",
"afterCount":"After a number of message parts",
"count":"count",
"subsequent":"and every subsequent message.",
"afterTimeout":"After a timeout following the first message",
"seconds":"seconds",
"complete":"After a message with the <code>msg.complete</code> property set",
"tip":"This mode assumes this node is either paired with a <i>split</i> node or the received messages will have a properly configured <code>msg.parts</code> property."
}
}

View File

@ -15,13 +15,31 @@
-->
<script type="text/x-red" data-template-name="split">
<div class="form-row"><span data-i18n="[html]split.intro"></span></div>
<div class="form-row"><span data-i18n="[html]split.object"></span></div>
<div class="form-row" style="padding-left: 10px"><span data-i18n="[html]split.objectSend"></span></div>
<div class="form-row"><span data-i18n="[html]split.strBuff"></span></div>
<div class="form-row">
<label for="node-input-splt"><i class="fa fa-scissors"></i> Split</label>
<input type="text" id="node-input-splt" placeholder="character to split strings on : e.g. \n">
<label for="node-input-splt" style="padding-left: 10px;margin-right: -10px;" data-i18n="split.splitUsing"></label>
<input type="text" id="node-input-splt" style="width: 70%">
<input type="hidden" id="node-input-spltType">
</div>
<!--
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
<input type="text" id="node-input-name" placeholder="Name">
<input type="checkbox" id="node-input-stream" style="margin-left: 10px; vertical-align:top; width: auto;">
<label for="node-input-stream" style="width:auto;" data-i18n="split.stream"></label>
</div>
-->
<div class="form-row"><span data-i18n="[html]split.array"></span></div>
<div class="form-row">
<label for="node-input-arraySplt" style="padding-left: 10px;margin-right: -10px;" data-i18n="split.splitUsing"></label>
<input type="text" id="node-input-arraySplt" style="width: 70%">
<input type="hidden" id="node-input-arraySpltType">
</div>
<hr/>
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
</div>
</script>
@ -53,7 +71,11 @@
color:"#E2D96E",
defaults: {
name: {value:""},
splt: {value:"\\n"}
splt: {value:"\\n"},
spltType: {value:"str"},
arraySplt: {value:1},
arraySpltType: {value:"len"},
stream: {value: false}
},
inputs:1,
outputs:1,
@ -63,98 +85,127 @@
},
labelStyle: function() {
return this.name?"node_label_italic":"";
},
oneditprepare: function() {
$("#node-input-splt").typedInput({
default: 'str',
typeField: $("#node-input-spltType"),
types:[
'str',
'bin',
{value:"len", label:RED._("node-red:split.splitLength"),validate:/^\d+$/}
]
});
if (this.arraySplt === undefined) {
$("#node-input-arraySplt").val(1);
}
$("#node-input-arraySplt").typedInput({
default: 'len',
typeField: $("#node-input-arraySpltType"),
types:[
{value:"len", label:RED._("node-red:split.splitLength"),validate:/^\d+$/}
]
});
}
});
</script>
<script type="text/x-red" data-template-name="join">
<div class="form-row">
<label>Mode</label>
<label data-i18n="join.mode.mode"></label>
<select id="node-input-mode" style="width:200px;">
<option value="auto">automatic</option>
<option value="custom">manual</option>
<option value="auto" data-i18n="join.mode.auto"></option>
<option value="custom" data-i18n="join.mode.custom"></option>
</select>
</div>
<div class="node-row-custom">
<div class="form-row node-row-property">
<label>Combine each </label>
<label data-i18n="join.combine"> </label>
<input type="text" id="node-input-property" style="width:70%;">
<input type="hidden" id="node-input-propertyType">
</div>
<div class="form-row">
<label>to create </label>
<label data-i18n="join.create"></label>
<select id="node-input-build" style="width:70%;">
<option id="node-input-build-string" value="string">a String</option>
<option value="array">an Array</option>
<option value="object">a key/value Object</option>
<option value="merged">a merged Object</option>
<option id="node-input-build-string" value="string" data-i18n="join.type.string"></option>
<option value="array" data-i18n="join.type.array"></option>
<option value="object" data-i18n="join.type.object"></option>
<option value="merged" data-i18n="join.type.merged"></option>
</select>
</div>
<div class="form-row node-row-key">
<label style="vertical-align:top; margin-top:7px;">using</label>
<label style="vertical-align:top; margin-top:7px;" data-i18n="join.using"></label>
<div style="display:inline-block">
<input type="text" id="node-input-key" style="width:252px;"> as the key
<input type="text" id="node-input-key" style="width:252px;"> <span data-i18n="join.key"></span>
</div>
</div>
<div class="form-row node-row-joiner">
<label for="node-input-joiner">joined using</label>
<label for="node-input-joiner" data-i18n="join.joinedUsing"></label>
<input type="text" id="node-input-joiner" style="width:40px">
</div>
<div class="form-row node-row-trigger" id="trigger-row">
<label style="width:auto;">Send the message:</label>
<label style="width:auto;" data-i18n="join.send"></label>
<ul>
<li>
<label style="width:280px;" for="node-input-count">After a number of message parts</label> <input id="node-input-count" placeholder="count" type="text" style="width:75px;">
<label style="width:280px;" for="node-input-count" data-i18n="join.afterCount"></label> <input id="node-input-count" data-i18n="[placeholder]join.count" type="text" style="width:75px;">
</li>
<li style="list-style-type:none;">
<input type="checkbox" id="node-input-accumulate" style="display:inline-block; width:20px; list-style-type:none; margin-left:20px; vertical-align:top;"> and every subsequent message.
<input type="checkbox" id="node-input-accumulate" style="display:inline-block; width:20px; list-style-type:none; margin-left:20px; vertical-align:top;"> <span data-i18n="join.subsequent"></span>
</li>
<li>
<label style="width:280px;" for="node-input-timeout">After a timeout following the first message</label> <input id="node-input-timeout" placeholder="seconds" type="text" style="width:75px;">
<label style="width:280px;" for="node-input-timeout" data-i18n="join.afterTimeout"></label> <input id="node-input-timeout" data-i18n="[placeholder]join.seconds" type="text" style="width:75px;">
</li>
<li>
<label style="width:auto; padding-top:6px;">After a message with the <code>msg.complete</code> property set</label>
<label style="width:auto; padding-top:6px;" data-i18n="[html]join.complete"></label>
</li>
</ul>
</div>
</div>
<div class="form-row">
<label for="node-input-name">Name</label>
<input type="text" id="node-input-name" placeholder="Name">
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
</div>
<div class="form-tips form-tips-auto hide">This mode assumes this node is either
paired with a <i>split</i> node or the received messages will have a properly
configured <code>msg.parts</code> property.</div>
<div class="form-tips form-tips-auto hide" data-i18n="[html]join.tip"></div>
</script>
<script type="text/x-red" data-help-name="join">
<p>A function that joins a sequence of messages into a single message.</p>
<p>Joins sequences of messages into a single message.</p>
<p>When paired with the <b>split</b> node, it will automatically join the messages
to reverse the split that was performed.</p>
<p>The node can join either a specific property of each received message or,
if the output type is not string, the entire message.</p>
<p>The type of the resulting message property can be:</p>
<h3>Inputs</h3>
<dl class="message-properties">
<dt class="optional">parts<span class="property-type">object</span></dt>
<dd>To automatically join a sequence of messages, they should all have
this property set. The <b>split</b> node generates this property but it
can be manually created. It has the following properties:
<ul>
<li><code>id</code> - an identifier for the group of messages</li>
<li><code>index</code> - the position within the group</li>
<li><code>count</code> - the total number of messages in the group</li>
<li><code>type</code> - the type of message - string/array/object</li>
<li><code>ch</code> - for a string, the character used to split</li>
<li><code>key</code> - for an object, the key of the property this message was created from</li>
<li><code>len</code> - for an array, the length of each array section</li>
</ul>
</dd>
<dt class="optional">complete</dt>
<dd>If set, the node will send its output message in its current state.</dd>
</dl>
<h3>Details</h3>
<p>When configured to join in manual mode, the node is able to join sequences
of messages in a variety of ways.</p>
<ul>
<li>a <b>string</b> - created by joining the property of each message with the specified join character.</li>
<li>a <b>string</b> - created by joining the selected property of each message with the specified join character.</li>
<li>an <b>array</b>.</li>
<li>a <b>key/value object</b> - created by using a property of each message to determine the key under which
the required value is stored.</li>
<li>a <b>merged object</b> - created by merging the property of each message under a single object.</li>
</ul>
The other properties of the output message are taken from the last message received before the result is sent.</p>
<p>A <i>count</i> can be set for how many messages should be received before generating the output message</p>
<p>The other properties of the output message are taken from the last message received before the result is sent.</p>
<p>A <i>count</i> can be set for how many messages should be received before generating the output message.</p>
<p>A <i>timeout</i> can be set to trigger sending the new message using whatever has been received so far.</p>
<p>If a message is received with the <b>msg.complete</b> property set, the output message is sent.</p>
<p>The automatic behaviour relies on the received messages to have <b>msg.parts</b> set. The split node generates
this property, but can be manually created. It has the following properties:</p>
<ul>
<li><b>id</b> - an identifier for the group of messages</li>
<li><b>index</b> - the position within the group</li>
<li><b>count</b> - the total number of messages in the group</li>
<li><b>type</b> - the type of message - string/array/object</li>
<li><b>ch</b> - for a string, the character used to split</li>
<li><b>key</b> - for an object, the key of the property this message was created from</li>
</ul>
</script>
<script type="text/javascript">
@ -192,7 +243,7 @@
$("#node-input-build").change(function(e) {
var val = $(this).val();
$(".node-row-key").toggle((val==='object')||(val==='accus'));
$(".node-row-key").toggle(val==='object');
$(".node-row-joiner").toggle(val==='string');
$(".node-row-trigger").toggle(val!=='auto');
if (val === 'string') {
@ -200,8 +251,6 @@
} else {
$("#node-input-property").typedInput('types',['msg', {value:"full",label:"complete message",hasValue:false}]);
}
// if (val === "accum") { $("#trigger-row").hide(); }
// else { $("#trigger-row").show(); }
});
$("#node-input-property").typedInput({

View File

@ -17,15 +17,45 @@
module.exports = function(RED) {
"use strict";
function sendArray(node,msg,array) {
for (var i = 0; i < array.length; i++) {
msg.payload = array[i];
msg.parts.index = i;
msg.parts.count = array.length;
//if (i === a.length-1) { msg.complete = true; }
node.send(RED.util.cloneMessage(msg));
}
}
function SplitNode(n) {
RED.nodes.createNode(this,n);
this.splt = (n.splt || "\\n").replace(/\\n/,"\n").replace(/\\r/,"\r").replace(/\\t/,"\t").replace(/\\e/,"\e").replace(/\\f/,"\f").replace(/\\0/,"\0");
try {
var s = JSON.parse(n.splt.trim());
if (Array.isArray(s)) { this.splt = new Buffer.from(s); }
}
catch (e) {}
var node = this;
node.stream = n.stream;
node.spltType = n.spltType || "str";
try {
if (node.spltType === "str") {
this.splt = (n.splt || "\\n").replace(/\\n/,"\n").replace(/\\r/,"\r").replace(/\\t/,"\t").replace(/\\e/,"\e").replace(/\\f/,"\f").replace(/\\0/,"\0");
} else if (node.spltType === "bin") {
var spltArray = JSON.parse(n.splt);
if (Array.isArray(spltArray)) {
this.splt = Buffer.from(spltArray);
} else {
throw new Error("not an array");
}
this.spltBuffer = spltArray;
} else if (node.spltType === "len") {
this.splt = parseInt(n.splt);
if (isNaN(this.splt) || this.splt < 1) {
throw new Error("invalid split length: "+n.splt);
}
}
this.arraySplt = (n.arraySplt === undefined)?1:parseInt(n.arraySplt);
if (isNaN(this.arraySplt) || this.arraySplt < 1) {
throw new Error("invalid array split length: "+n.arraySplt);
}
} catch(err) {
this.error("Invalid split property: "+err.toString());
return;
}
node.c = 0;
node.buffer = new Buffer([]);
this.on("input", function(msg) {
@ -35,21 +65,61 @@ module.exports = function(RED) {
else { msg.parts = {}; }
msg.parts.id = msg._msgid; // use the existing _msgid by default.
if (typeof msg.payload === "string") { // Split String into array
a = msg.payload.split(node.splt);
msg.parts.ch = node.splt; // pass the split char to other end for rejoin
msg.parts.type = "string";
}
if (Array.isArray(a)) { // then split array into messages
msg.parts.type = msg.parts.type || "array"; // if it wasn't a string in the first place
for (var i = 0; i < a.length; i++) {
msg.payload = a[i];
if (node.spltType === "len") {
// a = msg.payload.match()
msg.parts.ch = "";
var count = msg.payload.length/node.splt;
if (Math.floor(count) !== count) {
// Partial last packet
//TODO stream support
count = Math.ceil(count);
}
msg.parts.count = count;
var pos = 0;
var data = msg.payload;
for (var i=0;i<count;i++) {
msg.payload = data.substring(pos,pos+node.splt);
msg.parts.index = i;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
}
} else {
var a = [];
if (node.spltType === "bin") {
if (!node.spltBufferString) {
node.spltBufferString = node.splt.toString();
}
a = msg.payload.split(node.spltBufferString);
msg.parts.ch = node.spltBuffer; // pass the split char to other end for rejoin
} else if (node.spltType === "str") {
a = msg.payload.split(node.splt);
msg.parts.ch = node.splt; // pass the split char to other end for rejoin
}
sendArray(node,msg,a);
}
} else if (Array.isArray(msg.payload)) { // then split array into messages
msg.parts.type = "array";
var count = msg.payload.length/node.arraySplt;
if (Math.floor(count) !== count) {
// Partial last packet
//TODO stream support
count = Math.ceil(count);
}
msg.parts.count = count;
var pos = 0;
var data = msg.payload;
msg.parts.len = node.arraySplt;
for (var i=0;i<count;i++) {
msg.payload = data.slice(pos,pos+node.arraySplt);
if (node.arraySplt === 1) {
msg.payload = msg.payload[0];
}
msg.parts.index = i;
msg.parts.count = a.length;
//if (i === a.length-1) { msg.complete = true; }
pos += node.arraySplt;
node.send(RED.util.cloneMessage(msg));
}
}
else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
} else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
var j = 0;
var l = Object.keys(msg.payload).length;
var pay = msg.payload;
@ -64,28 +134,83 @@ module.exports = function(RED) {
j += 1;
}
}
}
// Handle Buffer objects.... with overlaps to handle partial stream like
else if (Buffer.isBuffer(msg.payload)) {
var len = node.buffer.length + msg.payload.length;
var buff = Buffer.concat([node.buffer, msg.payload], len);
var pos = buff.indexOf(node.splt);
} else if (Buffer.isBuffer(msg.payload)) {
msg.parts.type = "buffer";
msg.parts.ch = node.splt; // pass the split to other end for rejoin (maybe)
while (pos !== -1) {
msg.payload = buff.slice(0,pos);
msg.parts.index = node.c;
node.c += 1;
buff = buff.slice(pos + node.splt.length);
if (buff.length === 0) {
msg.parts.count = node.c;
node.c = 0; //reset the count if no remainder.
if (node.spltType === "len") {
var count = msg.payload.length/node.splt;
if (Math.floor(count) !== count) {
// Partial last packet
//TODO stream support
count = Math.ceil(count);
}
node.send(RED.util.cloneMessage(msg));
pos = buff.indexOf(node.splt);
msg.parts.count = count;
var pos = 0;
var data = msg.payload;
msg.parts.len = node.splt;
for (var i=0;i<count;i++) {
msg.payload = data.slice(pos,pos+node.splt);
msg.parts.index = i;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
}
} else {
var count = 0;
if (node.spltType === "bin") {
msg.parts.ch = node.spltBuffer;
} else if (node.spltType === "str") {
msg.parts.ch = node.splt;
}
var pos = msg.payload.indexOf(node.splt);
var end;
while(pos > -1) {
count++;
end = pos+node.splt.length;
pos = msg.payload.indexOf(node.splt,end);
}
//TODO: stream support
// if (end < msg.payload.length) {
count++;
// }
msg.parts.count = count;
var i = 0, p = 0;
var data = msg.payload;
pos = data.indexOf(node.splt);
while(pos > -1) {
msg.payload = data.slice(p,pos);
msg.parts.index = i;
node.send(RED.util.cloneMessage(msg));
i++;
p = pos+node.splt.length;
pos = data.indexOf(node.splt,p);
}
// if (p < data.length) {
// TODO: stream support;
msg.payload = data.slice(p,data.length);
msg.parts.index = i;
node.send(RED.util.cloneMessage(msg));
// }
}
// save the remainder to use as start of next time round
node.buffer = buff;
// var len = /*node.buffer.length + */ msg.payload.length;
// var buff = Buffer.concat([/*node.buffer,*/ msg.payload], len);
// var pos = buff.indexOf(node.splt);
// msg.parts.type = "buffer";
// msg.parts.ch = node.splt; // pass the split to other end for rejoin
// while (pos !== -1) {
// msg.payload = buff.slice(0,pos);
// msg.parts.index = node.c;
// node.c += 1;
// buff = buff.slice(pos + node.splt.length);
// if (buff.length === 0) {
// msg.parts.count = node.c;
// node.c = 0; //reset the count if no remainder.
// }
// node.send(RED.util.cloneMessage(msg));
// pos = buff.indexOf(node.splt);
// }
// // save the remainder to use as start of next time round
// node.buffer = buff;
}
//else { } // otherwise drop the message.
}
@ -116,6 +241,32 @@ module.exports = function(RED) {
var group = inflight[partId];
clearTimeout(group.timeout);
if ((node.accumulate !== true) || group.msg.hasOwnProperty("complete")) { delete inflight[partId]; }
if (group.type === 'array' && group.arrayLen > 1) {
var newArray = [];
group.payload.forEach(function(n) {
newArray = newArray.concat(n);
})
group.payload = newArray;
} else if (group.type === 'buffer') {
var buffers = [];
var bufferLen = 0;
if (group.joinChar !== undefined) {
var joinBuffer = Buffer.from(group.joinChar);
for (var i=0;i<group.payload.length;i++) {
if (i > 0) {
buffers.push(joinBuffer);
bufferLen += joinBuffer.length;
}
buffers.push(group.payload[i]);
bufferLen += group.payload[i].length;
}
} else {
bufferLen = group.bufferLen;
buffers = group.payload;
}
group.payload = Buffer.concat(buffers,bufferLen);
}
if (group.type === 'string') {
RED.util.setMessageProperty(group.msg,node.property,group.payload.join(group.joinChar));
} else {
@ -154,6 +305,7 @@ module.exports = function(RED) {
var propertyKey;
var targetCount;
var joinChar;
var arrayLen;
var propertyIndex;
if (node.mode === "auto") {
// Use msg.parts to identify all of the group information
@ -162,6 +314,7 @@ module.exports = function(RED) {
targetCount = msg.parts.count;
joinChar = msg.parts.ch;
propertyKey = msg.parts.key;
arrayLen = msg.parts.len;
propertyIndex = msg.parts.index;
}
else {
@ -216,7 +369,12 @@ module.exports = function(RED) {
};
if (payloadType === 'string') {
inflight[partId].joinChar = joinChar;
} else if (payloadType === 'array') {
inflight[partId].arrayLen = arrayLen;
} else if (payloadType === 'buffer') {
inflight[partId].bufferLen = 0;
}
}
if (node.timer > 0) {
inflight[partId].timeout = setTimeout(function() {
@ -226,18 +384,19 @@ module.exports = function(RED) {
}
var group = inflight[partId];
if (payloadType === 'buffer') {
inflight[partId].bufferLen += property.length;
}
if (payloadType === 'object') {
group.payload[propertyKey] = property;
group.currentCount = Object.keys(group.payload).length;
//msg.topic = node.topic || msg.topic;
}
else if (payloadType === 'merged') {
} else if (payloadType === 'merged') {
if (Array.isArray(property) || typeof property !== 'object') {
if (!msg.hasOwnProperty("complete")) {
node.warn("Cannot merge non-object types");
}
}
else {
} else {
for (propertyKey in property) {
if (property.hasOwnProperty(propertyKey)) {
group.payload[propertyKey] = property[propertyKey];
@ -246,8 +405,7 @@ module.exports = function(RED) {
group.currentCount = Object.keys(group.payload).length;
//group.currentCount++;
}
}
else {
} else {
if (!isNaN(propertyIndex)) {
group.payload[propertyIndex] = property;
} else {