updated split/join node (split still needs work before release)

This commit is contained in:
Dave Conway-Jones 2017-06-05 17:04:17 +01:00
parent f527841c29
commit 7c42b04eff
No known key found for this signature in database
GPG Key ID: 81B04231572A9A2D
3 changed files with 283 additions and 133 deletions

View File

@ -1,4 +1,3 @@
<!DOCTYPE html>
<!--
Copyright JS Foundation and other contributors, http://js.foundation
@ -84,7 +83,7 @@
</div>
<div class="form-row">
<label>to create </label>
<select id="node-input-build" style="width:200px;">
<select id="node-input-build" style="width:70%;">
<option id="node-input-build-string" value="string">a String</option>
<option value="array">an Array</option>
<option value="object">a key/value Object</option>
@ -92,36 +91,40 @@
</select>
</div>
<div class="form-row node-row-key">
<label style="vertical-align: top; margin-top: 7px;">using</label>
<label style="vertical-align:top; margin-top:7px;">using</label>
<div style="display:inline-block">
<input type="text" id="node-input-key" style="width:300px;">
<div style="margin-top: 7px;">as the property key</div>
<input type="text" id="node-input-key" style="width:252px;"> as the key
</div>
</div>
<div class="form-row node-row-joiner">
<label for="node-input-joiner">joined using</label>
<input type="text" id="node-input-joiner" style="width: 40px">
<input type="text" id="node-input-joiner" style="width:40px">
</div>
<div class="form-row node-row-trigger">
<label style="width: auto;">Send the message:</label>
<div class="form-row node-row-trigger" id="trigger-row">
<label style="width:auto;">Send the message:</label>
<ul>
<li style="height: 40px;">
<label style="width: 280px;" for="node-input-count">After a fixed number of messages:</label> <input id="node-input-count" placeholder="count" type="text" style="width: 75px;">
<li>
<label style="width:280px;" for="node-input-count">After a number of message parts</label> <input id="node-input-count" placeholder="count" type="text" style="width:75px;">
</li>
<li style="height: 40px;">
<label style="width: 280px;" for="node-input-timeout">After a timeout following the first message:</label> <input id="node-input-timeout" placeholder="seconds" type="text" style="width: 75px;">
<li style="list-style-type:none;">
<input type="checkbox" id="node-input-accumulate" style="display:inline-block; width:20px; list-style-type:none; margin-left:20px; vertical-align:top;"> and every subsequent message.
</li>
<li style="height: 40px;">
<label style="width: auto; padding-top: 6px;">After a message with the <code>msg.complete</code> property set</label>
<li>
<label style="width:280px;" for="node-input-timeout">After a timeout following the first message</label> <input id="node-input-timeout" placeholder="seconds" type="text" style="width:75px;">
</li>
<li>
<label style="width:auto; padding-top:6px;">After a message with the <code>msg.complete</code> property set</label>
</li>
</ul>
</div>
</div>
<div class="form-row">
<label for="node-input-name">Name</label>
<input type="text" id="node-input-name" placeholder="Name">
</div>
<div class="form-tips form-tips-auto hide">This mode assumes this node is either
paired with a <i>split</i> node or the received messages will have a properly
configured <code>msg.parts</code> property.</div>
</script>
<script type="text/x-red" data-help-name="join">
@ -162,10 +165,12 @@
name: {value:""},
mode: {value:"auto"},
build: { value:"string"},
property: { value: "payload", validate: RED.validators.typedInput("propertyType")},
property: { value:"payload", validate:RED.validators.typedInput("propertyType")},
propertyType: { value:"msg"},
key: {value:"topic"},
joiner: { value:"\\n"},
accumulate: { value:"false" },
//topic: { value:""},
timeout: {value:""},
count: {value:""}
},
@ -187,7 +192,7 @@
$("#node-input-build").change(function(e) {
var val = $(this).val();
$(".node-row-key").toggle(val==='object');
$(".node-row-key").toggle((val==='object')||(val==='accus'));
$(".node-row-joiner").toggle(val==='string');
$(".node-row-trigger").toggle(val!=='auto');
if (val === 'string') {
@ -195,15 +200,18 @@
} else {
$("#node-input-property").typedInput('types',['msg', {value:"full",label:"complete message",hasValue:false}]);
}
})
// if (val === "accum") { $("#trigger-row").hide(); }
// else { $("#trigger-row").show(); }
});
$("#node-input-property").typedInput({
typeField: $("#node-input-propertyType"),
types:['msg', {value:"full",label:"complete message",hasValue:false}]
})
types:['msg', {value:"full", label:"complete message", hasValue:false}]
});
$("#node-input-key").typedInput({
types:['msg', {value:"merge",label:"",hasValue: false}]
})
types:['msg', {value:"merge", label:"", hasValue:false}]
});
$("#node-input-build").change();
$("#node-input-mode").change();

View File

@ -20,7 +20,14 @@ module.exports = function(RED) {
function SplitNode(n) {
RED.nodes.createNode(this,n);
this.splt = (n.splt || "\\n").replace(/\\n/,"\n").replace(/\\r/,"\r").replace(/\\t/,"\t").replace(/\\e/,"\e").replace(/\\f/,"\f").replace(/\\0/,"\0");
try {
var s = JSON.parse(n.splt.trim());
if (Array.isArray(s)) { this.splt = new Buffer.from(s); }
}
catch (e) {}
var node = this;
node.c = 0;
node.buffer = new Buffer([]);
this.on("input", function(msg) {
if (msg.hasOwnProperty("payload")) {
var a = msg.payload;
@ -38,6 +45,7 @@ module.exports = function(RED) {
msg.payload = a[i];
msg.parts.index = i;
msg.parts.count = a.length;
//if (i === a.length-1) { msg.complete = true; }
node.send(RED.util.cloneMessage(msg));
}
}
@ -57,7 +65,28 @@ module.exports = function(RED) {
}
}
}
// TODO not handling Buffers at present...
// Handle Buffer objects.... with overlaps to handle partial stream like
else if (Buffer.isBuffer(msg.payload)) {
var len = node.buffer.length + msg.payload.length;
var buff = Buffer.concat([node.buffer, msg.payload], len);
var pos = buff.indexOf(node.splt);
msg.parts.type = "buffer";
msg.parts.ch = node.splt; // pass the split to other end for rejoin (maybe)
while (pos !== -1) {
msg.payload = buff.slice(0,pos);
msg.parts.index = node.c;
node.c += 1;
buff = buff.slice(pos + node.splt.length);
if (buff.length === 0) {
msg.parts.count = node.c;
node.c = 0; //reset the count if no remainder.
}
node.send(RED.util.cloneMessage(msg));
pos = buff.indexOf(node.splt);
}
// save the remainder to use as start of next time round
node.buffer = buff;
}
//else { } // otherwise drop the message.
}
});
@ -75,18 +104,18 @@ module.exports = function(RED) {
}
this.key = n.key||"topic";
this.timer = (this.mode === "auto") ? 0 : Number(n.timeout || 0)*1000;
this.timerr = n.timerr || "send";
this.count = Number(n.count || 0);
this.joiner = (n.joiner||"").replace(/\\n/g,"\n").replace(/\\r/g,"\r").replace(/\\t/g,"\t").replace(/\\e/g,"\e").replace(/\\f/g,"\f").replace(/\\0/g,"\0");
this.build = n.build || "array";
this.accumulate = n.accumulate || "false";
//this.topic = n.topic;
var node = this;
var inflight = {};
var completeSend = function(partId) {
var group = inflight[partId];
clearTimeout(group.timeout);
delete inflight[partId];
if ((node.accumulate !== true) || group.msg.hasOwnProperty("complete")) { delete inflight[partId]; }
if (group.type === 'string') {
RED.util.setMessageProperty(group.msg,node.property,group.payload.join(group.joinChar));
} else {
@ -97,123 +126,143 @@ module.exports = function(RED) {
} else {
delete group.msg.parts;
}
delete group.msg.complete;
node.send(group.msg);
}
this.on("input", function(msg) {
try {
var property;
if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
return;
}
if (node.propertyType == "full") {
property = msg;
} else {
try {
property = RED.util.getMessageProperty(msg,node.property);
} catch(err) {
node.warn("Message property "+node.property+" not found");
try {
var property;
if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
return;
}
}
var partId;
var payloadType;
var propertyKey;
var targetCount;
var joinChar;
var propertyIndex;
if (node.mode === "auto") {
// Use msg.parts to identify all of the group information
partId = msg.parts.id;
payloadType = msg.parts.type;
targetCount = msg.parts.count;
joinChar = msg.parts.ch;
propertyKey = msg.parts.key;
propertyIndex = msg.parts.index;
} else {
// Use the node configuration to identify all of the group information
partId = "_";
payloadType = node.build;
targetCount = node.count;
joinChar = node.joiner;
if (targetCount === 0 && msg.hasOwnProperty('parts')) {
targetCount = msg.parts.count || 0;
if (node.propertyType == "full") {
property = msg;
}
if (node.build === 'object') {
propertyKey = RED.util.getMessageProperty(msg,node.key);
}
}
if (payloadType === 'object' && (propertyKey === null || propertyKey === undefined || propertyKey === "")) {
if (node.mode === "auto") {
node.warn("Message missing 'msg.parts.key' property - cannot add to object");
} else {
node.warn("Message missing key property 'msg."+node.key+"' '- cannot add to object")
}
return;
}
if (!inflight.hasOwnProperty(partId)) {
if (payloadType === 'object' || payloadType === 'merged') {
inflight[partId] = {
currentCount:0,
payload:{},
targetCount:targetCount,
type:"object",
msg:msg
};
} else {
inflight[partId] = {
currentCount:0,
payload:[],
targetCount:targetCount,
type:payloadType,
joinChar: joinChar,
msg:msg
};
if (payloadType === 'string') {
inflight[partId].joinChar = joinChar;
else {
try {
property = RED.util.getMessageProperty(msg,node.property);
} catch(err) {
node.warn("Message property "+node.property+" not found");
return;
}
}
if (node.timer > 0) {
inflight[partId].timeout = setTimeout(function() {
completeSend(partId)
}, node.timer)
}
}
var group = inflight[partId];
if (payloadType === 'object') {
group.payload[propertyKey] = property;
group.currentCount = Object.keys(group.payload).length;
} else if (payloadType === 'merged') {
if (Array.isArray(property) || typeof property !== 'object') {
node.warn("Cannot merge non-object types");
} else {
for (propertyKey in property) {
if (property.hasOwnProperty(propertyKey)) {
group.payload[propertyKey] = property[propertyKey];
var partId;
var payloadType;
var propertyKey;
var targetCount;
var joinChar;
var propertyIndex;
if (node.mode === "auto") {
// Use msg.parts to identify all of the group information
partId = msg.parts.id;
payloadType = msg.parts.type;
targetCount = msg.parts.count;
joinChar = msg.parts.ch;
propertyKey = msg.parts.key;
propertyIndex = msg.parts.index;
}
else {
// Use the node configuration to identify all of the group information
partId = "_";
payloadType = node.build;
targetCount = node.count;
joinChar = node.joiner;
if (targetCount === 0 && msg.hasOwnProperty('parts')) {
targetCount = msg.parts.count || 0;
}
if (node.build === 'object') {
propertyKey = RED.util.getMessageProperty(msg,node.key);
}
}
if ((payloadType === 'object') && (propertyKey === null || propertyKey === undefined || propertyKey === "")) {
if (node.mode === "auto") {
node.warn("Message missing 'msg.parts.key' property - cannot add to object");
} else {
node.warn("Message missing key property 'msg."+node.key+"' - cannot add to object")
}
return;
}
if (!inflight.hasOwnProperty(partId)) {
if (payloadType === 'object' || payloadType === 'merged') {
inflight[partId] = {
currentCount:0,
payload:{},
targetCount:targetCount,
type:"object",
msg:msg
};
}
else if (node.accumulate === true) {
if (msg.hasOwnProperty("reset")) { delete inflight[partId]; }
inflight[partId] = inflight[partId] || {
currentCount:0,
payload:{},
targetCount:targetCount,
type:"object",
msg:msg
}
}
else {
inflight[partId] = {
currentCount:0,
payload:[],
targetCount:targetCount,
type:payloadType,
joinChar: joinChar,
msg:msg
};
if (payloadType === 'string') {
inflight[partId].joinChar = joinChar;
}
}
if (node.timer > 0) {
inflight[partId].timeout = setTimeout(function() {
completeSend(partId)
}, node.timer)
}
}
var group = inflight[partId];
if (payloadType === 'object') {
group.payload[propertyKey] = property;
group.currentCount = Object.keys(group.payload).length;
//msg.topic = node.topic || msg.topic;
}
else if (payloadType === 'merged') {
if (Array.isArray(property) || typeof property !== 'object') {
if (!msg.hasOwnProperty("complete")) {
node.warn("Cannot merge non-object types");
}
}
else {
for (propertyKey in property) {
if (property.hasOwnProperty(propertyKey)) {
group.payload[propertyKey] = property[propertyKey];
}
}
group.currentCount = Object.keys(group.payload).length;
//group.currentCount++;
}
}
else {
if (!isNaN(propertyIndex)) {
group.payload[propertyIndex] = property;
} else {
group.payload.push(property);
}
group.currentCount++;
}
} else {
if (!isNaN(propertyIndex)) {
group.payload[propertyIndex] = property;
} else {
group.payload.push(property);
// TODO: currently reuse the last received - add option to pick first received
group.msg = msg;
if (group.currentCount >= group.targetCount || msg.hasOwnProperty('complete')) {
completeSend(partId);
}
group.currentCount++;
} catch(err) {
console.log(err.stack);
}
// TODO: currently reuse the last received - add option to pick first received
group.msg = msg;
if (group.currentCount === group.targetCount || msg.hasOwnProperty('complete')) {
delete msg.complete;
completeSend(partId);
}
} catch(err) {
console.log(err.stack);
}
});
this.on("close", function() {

View File

@ -139,7 +139,6 @@ describe('JOIN node', function() {
joinNode1.should.have.property('name', 'joinNode');
joinNode1.should.have.property('count', 0);
joinNode1.should.have.property('timer', 0);
joinNode1.should.have.property('timerr', 'send');
joinNode1.should.have.property('build', 'array');
done();
});
@ -169,7 +168,7 @@ describe('JOIN node', function() {
});
it('should join things into an object after a count', function(done) {
var flow = [{id:"n1", type:"join", wires:[["n2"]], count:5, build:"object",mode:"custom"},
var flow = [{id:"n1", type:"join", wires:[["n2"]], count:5, build:"object", mode:"custom"},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
@ -198,7 +197,7 @@ describe('JOIN node', function() {
});
it('should merge objects', function(done) {
var flow = [{id:"n1", type:"join", wires:[["n2"]], count:6, build:"merged",mode:"custom"},
var flow = [{id:"n1", type:"join", wires:[["n2"]], count:5, build:"merged", mode:"custom"},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
@ -206,7 +205,7 @@ describe('JOIN node', function() {
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.have.property("a",6);
msg.payload.should.have.property("a",1);
msg.payload.should.have.property("b",2);
msg.payload.should.have.property("c",3);
msg.payload.should.have.property("d",4);
@ -215,12 +214,106 @@ describe('JOIN node', function() {
}
catch(e) { done(e)}
});
n1.receive({payload:{a:9}, topic:"f"});
n1.receive({payload:{a:1}, topic:"a"});
n1.receive({payload:{b:9}, topic:"b"});
n1.receive({payload:{b:2}, topic:"b"});
n1.receive({payload:{c:3}, topic:"c"});
n1.receive({payload:{d:4}, topic:"d"});
n1.receive({payload:{e:5}, topic:"e"});
n1.receive({payload:{a:6}, topic:"f"});
});
});
it('should accumulate a merged object', function(done) {
var flow = [{id:"n1", type:"join", wires:[["n2"]], build:"merged",mode:"custom",accumulate:true},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var c = 0;
n2.on("input", function(msg) {
if (c === 5) {
try {
msg.should.have.property("payload");
msg.payload.should.have.property("a",3);
msg.payload.should.have.property("b",2);
msg.payload.should.have.property("c",1);
done();
}
catch(e) { done(e) }
}
c += 1;
});
n1.receive({payload:{a:1}, topic:"a"});
n1.receive({payload:{b:2}, topic:"b"});
n1.receive({payload:{c:3}, topic:"c"});
n1.receive({payload:{a:3}, topic:"d"});
n1.receive({payload:{b:2}, topic:"e"});
n1.receive({payload:{c:1}, topic:"f"});
});
});
it('should be able to reset an accumulation', function(done) {
var flow = [{id:"n1", type:"join", wires:[["n2"]], build:"merged",accumulate:true,mode:"custom"},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var c = 0;
n2.on("input", function(msg) {
if (c === 3) {
try {
msg.should.have.property("payload");
msg.payload.should.have.property("a",1);
msg.payload.should.have.property("b",2);
msg.payload.should.have.property("c",3);
msg.payload.should.have.property("d",4);
}
catch(e) { done(e) }
}
if (c === 5) {
try {
msg.should.have.property("payload");
msg.payload.should.have.property("b",2);
msg.payload.should.have.property("c",1);
done();
}
catch(e) { done(e) }
}
c += 1;
});
n1.receive({payload:{a:1}, topic:"a"});
n1.receive({payload:{b:2}, topic:"b"});
n1.receive({payload:{c:3}, topic:"c"});
n1.receive({payload:{d:4}, topic:"d", complete:true});
n1.receive({payload:{b:2}, topic:"e"});
n1.receive({payload:{c:1}, topic:"f"});
});
});
it('should accumulate a key/value object', function(done) {
var flow = [{id:"n1", type:"join", wires:[["n2"]], build:"object", accumulate:true, mode:"custom", topic:"bar", key:"foo", count:4},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var c = 0;
n2.on("input", function(msg) {
try {
//msg.should.have.property("topic","bar");
msg.should.have.property("payload");
msg.payload.should.have.property("a",1);
msg.payload.should.have.property("b",2);
msg.payload.should.have.property("c",3);
msg.payload.should.have.property("d",4);
done();
}
catch(e) { done(e) }
});
n1.receive({payload:1, foo:"a"});
n1.receive({payload:2, foo:"b"});
n1.receive({payload:3, foo:"c"});
n1.receive({payload:4, foo:"d"});
});
});