node-red/nodes/core/logic/17-split.js

218 lines
9.8 KiB
JavaScript
Raw Normal View History

2016-06-04 01:40:40 +02:00
/**
* Copyright 2016 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
module.exports = function(RED) {
"use strict";
function SplitNode(n) {
RED.nodes.createNode(this,n);
this.splt = n.splt || "";
this.split = new RegExp(this.splt);
var node = this;
this.on("input", function(msg) {
if (msg.hasOwnProperty("payload")) {
var a = msg.payload;
if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack
else { msg.parts = {}; }
msg.parts.id = msg._msgid; // use the existing _msgid by default.
if (typeof msg.payload === "string") { // Split String into array
a = msg.payload.split(node.split);
msg.parts.ch = node.splt; // pass the split char to other end for rejoin
msg.parts.type = "string";
}
if (Array.isArray(a)) { // then split array into messages
msg.parts.type = msg.parts.type || "array"; // if it wasn't a string in the first place
for (var i = 0; i < a.length; i++) {
msg.payload = a[i];
msg.parts.index = i;
msg.parts.count = a.length;
node.send(msg);
}
}
else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
var j = 0;
var l = Object.keys(msg.payload).length;
var pay = msg.payload;
msg.parts.type = "object";
for (var p in pay) {
if (pay.hasOwnProperty(p)) {
msg.payload = pay[p];
msg.parts.key = p;
msg.parts.index = j;
msg.parts.count = l;
node.send(msg);
j += 1;
}
}
}
// TODO not handling Buffers at present...
//else { } // otherwise drop the message.
}
});
}
RED.nodes.registerType("split",SplitNode);
function JoinNode(n) {
RED.nodes.createNode(this,n);
this.property = n.property||"payload";
this.propertyType = n.propertyType||"msg";
this.key = n.key||"topic";
2016-06-04 01:40:40 +02:00
this.timer = Number(n.timeout || 0);
this.timerr = n.timerr || "send";
this.count = Number(n.count || 0);
this.joiner = n.joiner;
this.build = n.build || "array";
var node = this;
var inflight = {};
var misc = (this.build === "array") ? [] : {};
var tout;
// if array came from a string then reassemble it and send it
var sendIt = function(m) {
if (inflight[m.parts.id].ch !== undefined) { // if it was a string - rejoin it using the split char
var jc = (node.joiner || inflight[m.parts.id].ch).replace(/\\n/,"\n").replace(/\\r/,"\r").replace(/\\t/,"\t").replace(/\\e/,"\e").replace(/\\f/,"\c").replace(/\\0/,"\0");
m.payload = inflight[m.parts.id].a.join(jc);
} else { // leave it as an array
m.payload = inflight[m.parts.id].a;
}
m._msgid = m.parts.id;
clearTimeout(inflight[m.parts.id].timeout); // unset any timer
delete inflight[m.parts.id]; // remove from the keep track object
if (m.parts.hasOwnProperty("parts")) {
m.parts = m.parts.parts; // pop existing parts
}
else { delete m.parts; } // remove the parts flags
node.send(m);
}
// check all elements of the array are strings (or claim to be).
var onlyString = function(a) { // check if the array is all strings
for (var i = 0; i < a.length; i++) {
if (typeof a[i] !== "string") { return false; }
}
return true;
}
// send array of misc message that arrived. (convert to string if all were strings and need joining)
var sendMisc = function(m) {
if (tout) { clearTimeout(tout); tout = null; }
m.payload = misc;
if (node.joiner && onlyString(misc)) { // if the array is all strings and there is a join char set
m.payload = misc.join(node.joiner.replace(/\\n/,"\n").replace(/\\r/,"\r").replace(/\\t/,"\t").replace(/\\e/,"\e").replace(/\\f/,"\c").replace(/\\0/,"\0"));
}
if (node.build === "array") { misc = []; }
else if (node.build === "object") { misc = {}; }
2016-06-04 01:40:40 +02:00
node.send(m);
}
this.on("input", function(msg) {
var property;
if (node.propertyType == "full") {
property = msg;
} else {
try {
property = RED.util.getMessageProperty(msg,node.property);
} catch(err) {
node.warn("Message property "+node.property+" not found");
return;
}
}
if (msg.hasOwnProperty("parts")) {
// only act if it has parts
var count = node.count || msg.parts.count || 1;
if (msg.parts.hasOwnProperty("index")) {
// it's a numbered part (from a split node)
if (!inflight[msg.parts.id]) {
// New message - create new empty array of correct size
2016-06-04 01:40:40 +02:00
if (msg.parts.type === "object") {
inflight[msg.parts.id] = {i:0, a:{}, c:msg.parts.count, ch:msg.parts.ch, t:msg.parts.type};
} else {
// it's an array or string
inflight[msg.parts.id] = {i:0, a:new Array(msg.parts.count), ch:msg.parts.ch, t:msg.parts.type};
2016-06-04 01:40:40 +02:00
}
if (node.timer !== 0) { // If there is a timer to set start it now
inflight[msg.parts.id].timeout = setTimeout(function() {
if (node.timerr === "send") { sendIt(msg); }
else if (node.timerr === "error") { node.error("Incomplete",msg); }
delete inflight[msg.parts.id];
}, node.timer);
}
}
if (msg.parts.type === "object") {
// Add to the tracking array
inflight[msg.parts.id].a[msg.parts.key] = property;
inflight[msg.parts.id].i = Object.keys(inflight[msg.parts.id].a).length;
} else {
// it's an array or string
// Add to the tracking array
inflight[msg.parts.id].a[msg.parts.index] = property;
// Increment the count
inflight[msg.parts.id].i += 1;
}
if (inflight[msg.parts.id].i >= count) {
// All arrived - send
2016-06-04 01:40:40 +02:00
sendIt(msg);
}
} // otherwise ignore it
if (msg.hasOwnProperty("complete")) { // if set then send right away anyway...
delete(msg.complete);
sendIt(msg);
2016-06-04 01:40:40 +02:00
}
} else {
2016-06-04 01:40:40 +02:00
// The case for any messages arriving without parts - ie random messages you want to join.
var l;
if (node.build === "array" || node.build === "string") {
// simple case of build the array
// Add the payload to an array
misc.push(property);
l = misc.length;
} else {
// OK so let's build an object
if (msg.hasOwnProperty(node.key) && msg[node.key] !== "") {
misc[msg[node.key]] = property;
2016-06-04 01:40:40 +02:00
}
l = Object.keys(misc).length;
}
if (l >= node.count) {
// if it's long enough send it
sendMisc(msg);
} else if (msg.hasOwnProperty("complete")) {
// if set then send right away anyway...
delete(msg.complete);
sendMisc(msg);
} else if ((node.timer !== 0) && !tout) {
// if not start the timer if there is one.
tout = setTimeout(function() {
if (node.timerr === "send") { sendMisc(msg); }
else if (node.timerr === "error") { node.error("Timeout",msg); }
if (node.build === "array") { misc = []; }
else if (node.build === "object") { misc = {}; }
}, node.timer);
2016-06-04 01:40:40 +02:00
}
}
});
this.on("close", function() {
if (tout) { clearTimeout(tout); }
for (var i in inflight) {
if (inflight[i].timeout) { clearTimeout(inflight[i].timeout); }
}
});
}
RED.nodes.registerType("join",JoinNode);
}