Update Join node implementation

This commit is contained in:
Nick O'Leary 2016-06-09 11:33:40 +01:00
parent e97d9fb0b2
commit fd66569950
2 changed files with 111 additions and 120 deletions

View File

@ -27,15 +27,13 @@
</script>
<script type="text/x-red" data-help-name="split">
<p>A function that splits the <code>msg.payload</code> in various ways to create multiple messages out.</p>
<p><ul>
<li>splits an <b>array</b> into individual messages, one element per message.</li>
<li>splits a <b>string</b> on a character - default is <code>""</code>, split on every character. A typical
use would be <code>\n</code> to split on line breaks. Split is actually a regex value so use <code>\.</code> to
split on a full stop .</li>
<li>splits an <b>object</b> into individual messages, promoting each property value to the <code>msg.payload</code>,
and saving each property key name as <code>msg.parts.key</code>.
</ul></p>
<p>A function that splits <code>msg.payload</code> into multiple messages.</p>
<p>The behaviour is determined by the type of <code>msg.payload</code>:</p>
<ul>
<li><b>string</b> - a message is sent for each part of the string after it is split using the specified character, by default a newline (<code>\n</code>).
<li><b>array</b> - a message is sent for each element of the array</li>
<li><b>object</b> - a message is sent for each key/value pair of the object. <code>msg.parts.key</code> is set to the key of the property.</li>
</ul>
</script>
<script type="text/javascript">

View File

@ -19,8 +19,7 @@ module.exports = function(RED) {
function SplitNode(n) {
RED.nodes.createNode(this,n);
this.splt = n.splt || "";
this.split = new RegExp(this.splt);
this.splt = (n.splt || "\\n").replace(/\\n/,"\n").replace(/\\r/,"\r").replace(/\\t/,"\t").replace(/\\e/,"\e").replace(/\\f/,"\f").replace(/\\0/,"\0");
var node = this;
this.on("input", function(msg) {
if (msg.hasOwnProperty("payload")) {
@ -29,7 +28,7 @@ module.exports = function(RED) {
else { msg.parts = {}; }
msg.parts.id = msg._msgid; // use the existing _msgid by default.
if (typeof msg.payload === "string") { // Split String into array
a = msg.payload.split(node.split);
a = msg.payload.split(node.splt);
msg.parts.ch = node.splt; // pass the split char to other end for rejoin
msg.parts.type = "string";
}
@ -68,59 +67,43 @@ module.exports = function(RED) {
function JoinNode(n) {
RED.nodes.createNode(this,n);
this.mode = n.mode||"auto";
this.property = n.property||"payload";
this.propertyType = n.propertyType||"msg";
this.key = n.key||"topic";
this.timer = Number(n.timeout || 0);
this.timer = (this.mode === "auto") ? 0 : Number(n.timeout || 0);
this.timerr = n.timerr || "send";
this.count = Number(n.count || 0);
this.joiner = n.joiner;
this.joiner = (n.joiner||"").replace(/\\n/g,"\n").replace(/\\r/g,"\r").replace(/\\t/g,"\t").replace(/\\e/g,"\e").replace(/\\f/g,"\f").replace(/\\0/g,"\0");
this.build = n.build || "array";
var node = this;
var inflight = {};
var misc = (this.build === "array") ? [] : {};
var tout;
// if array came from a string then reassemble it and send it
var sendIt = function(m) {
if (inflight[m.parts.id].ch !== undefined) { // if it was a string - rejoin it using the split char
var jc = (node.joiner || inflight[m.parts.id].ch).replace(/\\n/,"\n").replace(/\\r/,"\r").replace(/\\t/,"\t").replace(/\\e/,"\e").replace(/\\f/,"\c").replace(/\\0/,"\0");
m.payload = inflight[m.parts.id].a.join(jc);
} else { // leave it as an array
m.payload = inflight[m.parts.id].a;
}
m._msgid = m.parts.id;
clearTimeout(inflight[m.parts.id].timeout); // unset any timer
delete inflight[m.parts.id]; // remove from the keep track object
if (m.parts.hasOwnProperty("parts")) {
m.parts = m.parts.parts; // pop existing parts
}
else { delete m.parts; } // remove the parts flags
node.send(m);
}
var completeSend = function(partId) {
var group = inflight[partId];
clearTimeout(group.timeout);
delete inflight[partId];
// check all elements of the array are strings (or claim to be).
var onlyString = function(a) { // check if the array is all strings
for (var i = 0; i < a.length; i++) {
if (typeof a[i] !== "string") { return false; }
if (group.type === 'string') {
group.msg.payload = group.payload.join(group.joinChar);
} else {
group.msg.payload = group.payload;
}
return true;
}
// send array of misc message that arrived. (convert to string if all were strings and need joining)
var sendMisc = function(m) {
if (tout) { clearTimeout(tout); tout = null; }
m.payload = misc;
if (node.joiner && onlyString(misc)) { // if the array is all strings and there is a join char set
m.payload = misc.join(node.joiner.replace(/\\n/,"\n").replace(/\\r/,"\r").replace(/\\t/,"\t").replace(/\\e/,"\e").replace(/\\f/,"\c").replace(/\\0/,"\0"));
if (group.msg.hasOwnProperty('parts') && group.msg.parts.hasOwnProperty('parts')) {
group.msg.parts = group.msg.parts;
} else {
delete group.msg.parts;
}
if (node.build === "array") { misc = []; }
else if (node.build === "object") { misc = {}; }
node.send(m);
node.send(group.msg);
}
this.on("input", function(msg) {
try {
var property;
if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) {
// TODO: log warning - no msg.parts in auto mode, ignoring
return;
}
if (node.propertyType == "full") {
property = msg;
} else {
@ -131,85 +114,95 @@ module.exports = function(RED) {
return;
}
}
if (msg.hasOwnProperty("parts")) {
// only act if it has parts
var count = node.count || msg.parts.count || 1;
if (msg.parts.hasOwnProperty("index")) {
// it's a numbered part (from a split node)
if (!inflight[msg.parts.id]) {
// New message - create new empty array of correct size
if (msg.parts.type === "object") {
inflight[msg.parts.id] = {i:0, a:{}, c:msg.parts.count, ch:msg.parts.ch, t:msg.parts.type};
} else {
// it's an array or string
inflight[msg.parts.id] = {i:0, a:new Array(msg.parts.count), ch:msg.parts.ch, t:msg.parts.type};
}
if (node.timer !== 0) { // If there is a timer to set start it now
inflight[msg.parts.id].timeout = setTimeout(function() {
if (node.timerr === "send") { sendIt(msg); }
else if (node.timerr === "error") { node.error("Incomplete",msg); }
delete inflight[msg.parts.id];
}, node.timer);
}
}
if (msg.parts.type === "object") {
// Add to the tracking array
inflight[msg.parts.id].a[msg.parts.key] = property;
inflight[msg.parts.id].i = Object.keys(inflight[msg.parts.id].a).length;
} else {
// it's an array or string
// Add to the tracking array
inflight[msg.parts.id].a[msg.parts.index] = property;
// Increment the count
inflight[msg.parts.id].i += 1;
}
if (inflight[msg.parts.id].i >= count) {
// All arrived - send
sendIt(msg);
}
} // otherwise ignore it
if (msg.hasOwnProperty("complete")) { // if set then send right away anyway...
delete(msg.complete);
sendIt(msg);
}
var partId;
var payloadType;
var propertyKey;
var targetCount;
var joinChar;
var propertyIndex;
if (node.mode === "auto") {
// Use msg.parts to identify all of the group information
partId = msg.parts.id;
payloadType = msg.parts.type;
targetCount = msg.parts.count;
joinChar = msg.parts.ch;
propertyKey = msg.parts.key;
propertyIndex = msg.parts.index;
} else {
// The case for any messages arriving without parts - ie random messages you want to join.
var l;
if (node.build === "array" || node.build === "string") {
// simple case of build the array
// Add the payload to an array
misc.push(property);
l = misc.length;
} else {
// OK so let's build an object
if (msg.hasOwnProperty(node.key) && msg[node.key] !== "") {
misc[msg[node.key]] = property;
}
l = Object.keys(misc).length;
// Use the node configuration to identify all of the group information
partId = "_";
payloadType = node.build;
targetCount = node.count;
joinChar = node.joiner;
if (targetCount === 0 && msg.hasOwnProperty('parts')) {
targetCount = msg.parts.count || 0;
}
if (l >= node.count) {
// if it's long enough send it
sendMisc(msg);
} else if (msg.hasOwnProperty("complete")) {
// if set then send right away anyway...
delete(msg.complete);
sendMisc(msg);
} else if ((node.timer !== 0) && !tout) {
// if not start the timer if there is one.
tout = setTimeout(function() {
if (node.timerr === "send") { sendMisc(msg); }
else if (node.timerr === "error") { node.error("Timeout",msg); }
if (node.build === "array") { misc = []; }
else if (node.build === "object") { misc = {}; }
}, node.timer);
if (node.build === 'object') {
propertyKey = RED.util.getMessageProperty(msg,node.key);
}
}
if (payloadType === 'object' && (propertyKey === null || propertyKey === undefined || propertyKey === "")) {
//TODO: log error - no key property found for object
return;
}
if (!inflight.hasOwnProperty(partId)) {
if (payloadType === 'object') {
inflight[partId] = {
currentCount:0,
payload:{},
targetCount:targetCount,
type:"object",
msg:msg
};
} else {
inflight[partId] = {
currentCount:0,
payload:[],
targetCount:targetCount,
type:payloadType,
joinChar: joinChar,
msg:msg
};
if (payloadType === 'string') {
inflight[partId].joinChar = joinChar;
}
}
if (node.timer > 0) {
inflight[partId].timeout = setTimeout(function() {
completeSend(partId)
}, node.timer)
}
}
var group = inflight[partId];
if (payloadType === 'object') {
group.payload[propertyKey] = property;
group.currentCount = Object.keys(group.payload).length;
} else {
if (!isNaN(propertyIndex)) {
group.payload[propertyIndex] = property;
} else {
group.payload.push(property);
}
group.currentCount++;
}
// TODO: currently reuse the last received - add option to pick first received
group.msg = msg;
if (group.currentCount === group.targetCount || msg.hasOwnProperty('complete')) {
delete msg.complete;
completeSend(partId);
}
} catch(err) {
console.log(err.stack);
}
});
this.on("close", function() {
if (tout) { clearTimeout(tout); }
for (var i in inflight) {
if (inflight[i].timeout) { clearTimeout(inflight[i].timeout); }
if (inflight.hasOwnProperty(i)) {
clearTimeout(inflight[i].timeout);
}
}
});
}