2016-06-04 00:40:40 +01:00
|
|
|
/**
|
2017-01-11 15:24:33 +00:00
|
|
|
* Copyright JS Foundation and other contributors, http://js.foundation
|
2016-06-04 00:40:40 +01:00
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
**/
|
|
|
|
|
|
|
|
module.exports = function(RED) {
|
|
|
|
"use strict";
|
|
|
|
|
2017-06-13 21:01:04 +01:00
|
|
|
function sendArray(node,msg,array) {
|
2017-06-16 22:26:14 +01:00
|
|
|
for (var i = 0; i < array.length-1; i++) {
|
2017-06-13 21:01:04 +01:00
|
|
|
msg.payload = array[i];
|
2017-06-16 22:26:14 +01:00
|
|
|
msg.parts.index = node.c++;
|
|
|
|
if (node.stream !== true) { msg.parts.count = array.length; }
|
|
|
|
node.send(RED.util.cloneMessage(msg));
|
|
|
|
}
|
|
|
|
if (node.stream !== true) {
|
|
|
|
msg.payload = array[i];
|
|
|
|
msg.parts.index = node.c++;
|
2017-06-13 21:01:04 +01:00
|
|
|
msg.parts.count = array.length;
|
|
|
|
node.send(RED.util.cloneMessage(msg));
|
2017-06-16 22:26:14 +01:00
|
|
|
node.c = 0;
|
2017-06-13 21:01:04 +01:00
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
else { node.remainder = array[i]; }
|
2017-06-13 21:01:04 +01:00
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
|
2016-06-04 00:40:40 +01:00
|
|
|
function SplitNode(n) {
|
|
|
|
RED.nodes.createNode(this,n);
|
2017-06-13 21:01:04 +01:00
|
|
|
var node = this;
|
|
|
|
node.stream = n.stream;
|
|
|
|
node.spltType = n.spltType || "str";
|
2017-06-26 16:51:27 +01:00
|
|
|
node.addname = n.addname || "";
|
2017-06-05 17:04:17 +01:00
|
|
|
try {
|
2017-06-13 21:01:04 +01:00
|
|
|
if (node.spltType === "str") {
|
|
|
|
this.splt = (n.splt || "\\n").replace(/\\n/,"\n").replace(/\\r/,"\r").replace(/\\t/,"\t").replace(/\\e/,"\e").replace(/\\f/,"\f").replace(/\\0/,"\0");
|
|
|
|
} else if (node.spltType === "bin") {
|
|
|
|
var spltArray = JSON.parse(n.splt);
|
|
|
|
if (Array.isArray(spltArray)) {
|
|
|
|
this.splt = Buffer.from(spltArray);
|
|
|
|
} else {
|
|
|
|
throw new Error("not an array");
|
|
|
|
}
|
|
|
|
this.spltBuffer = spltArray;
|
|
|
|
} else if (node.spltType === "len") {
|
|
|
|
this.splt = parseInt(n.splt);
|
|
|
|
if (isNaN(this.splt) || this.splt < 1) {
|
|
|
|
throw new Error("invalid split length: "+n.splt);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
this.arraySplt = (n.arraySplt === undefined)?1:parseInt(n.arraySplt);
|
|
|
|
if (isNaN(this.arraySplt) || this.arraySplt < 1) {
|
|
|
|
throw new Error("invalid array split length: "+n.arraySplt);
|
|
|
|
}
|
|
|
|
} catch(err) {
|
|
|
|
this.error("Invalid split property: "+err.toString());
|
|
|
|
return;
|
2017-06-05 17:04:17 +01:00
|
|
|
}
|
|
|
|
node.c = 0;
|
2017-06-27 17:10:52 +01:00
|
|
|
node.buffer = Buffer.from([]);
|
2016-06-04 00:40:40 +01:00
|
|
|
this.on("input", function(msg) {
|
|
|
|
if (msg.hasOwnProperty("payload")) {
|
|
|
|
if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack
|
|
|
|
else { msg.parts = {}; }
|
|
|
|
msg.parts.id = msg._msgid; // use the existing _msgid by default.
|
|
|
|
if (typeof msg.payload === "string") { // Split String into array
|
2017-06-16 22:26:14 +01:00
|
|
|
msg.payload = (node.remainder || "") + msg.payload;
|
2016-06-04 00:40:40 +01:00
|
|
|
msg.parts.type = "string";
|
2017-06-13 21:01:04 +01:00
|
|
|
if (node.spltType === "len") {
|
|
|
|
msg.parts.ch = "";
|
2017-06-26 16:51:27 +01:00
|
|
|
msg.parts.len = node.splt;
|
2017-06-13 21:01:04 +01:00
|
|
|
var count = msg.payload.length/node.splt;
|
|
|
|
if (Math.floor(count) !== count) {
|
|
|
|
count = Math.ceil(count);
|
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
if (node.stream !== true) {
|
|
|
|
msg.parts.count = count;
|
|
|
|
node.c = 0;
|
|
|
|
}
|
2017-06-13 21:01:04 +01:00
|
|
|
var pos = 0;
|
|
|
|
var data = msg.payload;
|
2017-06-16 22:26:14 +01:00
|
|
|
for (var i=0; i<count-1; i++) {
|
2017-06-13 21:01:04 +01:00
|
|
|
msg.payload = data.substring(pos,pos+node.splt);
|
2017-06-16 22:26:14 +01:00
|
|
|
msg.parts.index = node.c++;
|
2017-06-13 21:01:04 +01:00
|
|
|
pos += node.splt;
|
|
|
|
node.send(RED.util.cloneMessage(msg));
|
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
node.remainder = data.substring(pos);
|
|
|
|
if ((node.stream !== true) || (node.remainder.length === node.splt)) {
|
|
|
|
msg.payload = node.remainder;
|
|
|
|
msg.parts.index = node.c++;
|
|
|
|
node.send(RED.util.cloneMessage(msg));
|
|
|
|
node.remainder = "";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else {
|
2017-06-13 21:01:04 +01:00
|
|
|
var a = [];
|
|
|
|
if (node.spltType === "bin") {
|
|
|
|
if (!node.spltBufferString) {
|
|
|
|
node.spltBufferString = node.splt.toString();
|
|
|
|
}
|
|
|
|
a = msg.payload.split(node.spltBufferString);
|
|
|
|
msg.parts.ch = node.spltBuffer; // pass the split char to other end for rejoin
|
|
|
|
} else if (node.spltType === "str") {
|
|
|
|
a = msg.payload.split(node.splt);
|
|
|
|
msg.parts.ch = node.splt; // pass the split char to other end for rejoin
|
|
|
|
}
|
|
|
|
sendArray(node,msg,a);
|
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
}
|
|
|
|
else if (Array.isArray(msg.payload)) { // then split array into messages
|
2017-06-13 21:01:04 +01:00
|
|
|
msg.parts.type = "array";
|
|
|
|
var count = msg.payload.length/node.arraySplt;
|
|
|
|
if (Math.floor(count) !== count) {
|
|
|
|
count = Math.ceil(count);
|
|
|
|
}
|
|
|
|
msg.parts.count = count;
|
|
|
|
var pos = 0;
|
|
|
|
var data = msg.payload;
|
|
|
|
msg.parts.len = node.arraySplt;
|
2017-06-16 09:21:53 +01:00
|
|
|
for (var i=0; i<count; i++) {
|
2017-06-13 21:01:04 +01:00
|
|
|
msg.payload = data.slice(pos,pos+node.arraySplt);
|
|
|
|
if (node.arraySplt === 1) {
|
|
|
|
msg.payload = msg.payload[0];
|
|
|
|
}
|
2016-06-04 00:40:40 +01:00
|
|
|
msg.parts.index = i;
|
2017-06-13 21:01:04 +01:00
|
|
|
pos += node.arraySplt;
|
2016-06-10 22:51:57 +01:00
|
|
|
node.send(RED.util.cloneMessage(msg));
|
2016-06-04 00:40:40 +01:00
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
}
|
|
|
|
else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
|
2016-06-04 00:40:40 +01:00
|
|
|
var j = 0;
|
|
|
|
var l = Object.keys(msg.payload).length;
|
|
|
|
var pay = msg.payload;
|
|
|
|
msg.parts.type = "object";
|
|
|
|
for (var p in pay) {
|
|
|
|
if (pay.hasOwnProperty(p)) {
|
|
|
|
msg.payload = pay[p];
|
2017-06-26 16:51:27 +01:00
|
|
|
if (node.addname !== "") {
|
|
|
|
msg[node.addname] = p;
|
|
|
|
}
|
2016-06-04 00:40:40 +01:00
|
|
|
msg.parts.key = p;
|
|
|
|
msg.parts.index = j;
|
|
|
|
msg.parts.count = l;
|
2016-06-10 22:51:57 +01:00
|
|
|
node.send(RED.util.cloneMessage(msg));
|
2016-06-04 00:40:40 +01:00
|
|
|
j += 1;
|
|
|
|
}
|
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
}
|
|
|
|
else if (Buffer.isBuffer(msg.payload)) {
|
|
|
|
var len = node.buffer.length + msg.payload.length;
|
|
|
|
var buff = Buffer.concat([node.buffer, msg.payload], len);
|
2017-06-05 17:04:17 +01:00
|
|
|
msg.parts.type = "buffer";
|
2017-06-13 21:01:04 +01:00
|
|
|
if (node.spltType === "len") {
|
2017-06-16 22:26:14 +01:00
|
|
|
var count = buff.length/node.splt;
|
2017-06-13 21:01:04 +01:00
|
|
|
if (Math.floor(count) !== count) {
|
|
|
|
count = Math.ceil(count);
|
2017-06-05 17:04:17 +01:00
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
if (node.stream !== true) {
|
|
|
|
msg.parts.count = count;
|
|
|
|
node.c = 0;
|
|
|
|
}
|
2017-06-13 21:01:04 +01:00
|
|
|
var pos = 0;
|
|
|
|
msg.parts.len = node.splt;
|
2017-06-16 22:26:14 +01:00
|
|
|
for (var i=0; i<count-1; i++) {
|
|
|
|
msg.payload = buff.slice(pos,pos+node.splt);
|
|
|
|
msg.parts.index = node.c++;
|
2017-06-13 21:01:04 +01:00
|
|
|
pos += node.splt;
|
|
|
|
node.send(RED.util.cloneMessage(msg));
|
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
node.buffer = buff.slice(pos);
|
|
|
|
if ((node.stream !== true) || (node.buffer.length === node.splt)) {
|
|
|
|
msg.payload = node.buffer;
|
|
|
|
msg.parts.index = node.c++;
|
|
|
|
node.send(RED.util.cloneMessage(msg));
|
2017-06-27 17:10:52 +01:00
|
|
|
node.buffer = Buffer.from([]);
|
2017-06-16 22:26:14 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
else {
|
2017-06-13 21:01:04 +01:00
|
|
|
var count = 0;
|
|
|
|
if (node.spltType === "bin") {
|
|
|
|
msg.parts.ch = node.spltBuffer;
|
|
|
|
} else if (node.spltType === "str") {
|
|
|
|
msg.parts.ch = node.splt;
|
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
var pos = buff.indexOf(node.splt);
|
2017-06-13 21:01:04 +01:00
|
|
|
var end;
|
2017-06-16 09:21:53 +01:00
|
|
|
while (pos > -1) {
|
2017-06-13 21:01:04 +01:00
|
|
|
count++;
|
|
|
|
end = pos+node.splt.length;
|
2017-06-16 22:26:14 +01:00
|
|
|
pos = buff.indexOf(node.splt,end);
|
2017-06-13 21:01:04 +01:00
|
|
|
}
|
2017-06-24 12:09:52 +01:00
|
|
|
count++;
|
2017-06-16 22:26:14 +01:00
|
|
|
if (node.stream !== true) {
|
|
|
|
msg.parts.count = count;
|
|
|
|
node.c = 0;
|
|
|
|
}
|
2017-06-13 21:01:04 +01:00
|
|
|
var i = 0, p = 0;
|
2017-06-16 22:26:14 +01:00
|
|
|
pos = buff.indexOf(node.splt);
|
2017-06-16 09:21:53 +01:00
|
|
|
while (pos > -1) {
|
2017-06-16 22:26:14 +01:00
|
|
|
msg.payload = buff.slice(p,pos);
|
|
|
|
msg.parts.index = node.c++;
|
2017-06-13 21:01:04 +01:00
|
|
|
node.send(RED.util.cloneMessage(msg));
|
|
|
|
i++;
|
|
|
|
p = pos+node.splt.length;
|
2017-06-16 22:26:14 +01:00
|
|
|
pos = buff.indexOf(node.splt,p);
|
2017-06-13 21:01:04 +01:00
|
|
|
}
|
2017-06-16 22:26:14 +01:00
|
|
|
if ((node.stream !== true) && (p < buff.length)) {
|
|
|
|
msg.payload = buff.slice(p,buff.length);
|
|
|
|
msg.parts.index = node.c++;
|
|
|
|
msg.parts.count = node.c++;
|
2017-06-13 21:01:04 +01:00
|
|
|
node.send(RED.util.cloneMessage(msg));
|
2017-06-16 22:26:14 +01:00
|
|
|
}
|
|
|
|
else {
|
|
|
|
node.buffer = buff.slice(p,buff.length);
|
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
}
|
|
|
|
}
|
2016-06-04 00:40:40 +01:00
|
|
|
//else { } // otherwise drop the message.
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
RED.nodes.registerType("split",SplitNode);
|
|
|
|
|
|
|
|
|
2018-01-17 19:08:23 +09:00
|
|
|
var _max_kept_msgs_count = undefined;
|
|
|
|
|
|
|
|
function max_kept_msgs_count(node) {
|
|
|
|
if (_max_kept_msgs_count === undefined) {
|
|
|
|
var name = "joinMaxKeptMsgsCount";
|
|
|
|
if (RED.settings.hasOwnProperty(name)) {
|
|
|
|
_max_kept_msgs_count = RED.settings[name];
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
_max_kept_msgs_count = 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return _max_kept_msgs_count;
|
|
|
|
}
|
|
|
|
|
|
|
|
function add_to_topic(node, pending, topic, msg) {
|
|
|
|
var merge_on_change = node.merge_on_change;
|
|
|
|
if (!pending.hasOwnProperty(topic)) {
|
|
|
|
pending[topic] = [];
|
|
|
|
}
|
|
|
|
var topics = pending[topic];
|
|
|
|
topics.push(msg);
|
|
|
|
if (merge_on_change) {
|
|
|
|
var counts = node.topic_counts;
|
|
|
|
if (topics.length > counts[topic]) {
|
|
|
|
topics.shift();
|
|
|
|
node.pending_count--;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function compute_topic_counts(topics) {
|
|
|
|
var counts = {};
|
|
|
|
for (var topic of topics) {
|
|
|
|
counts[topic] = (counts.hasOwnProperty(topic) ? counts[topic] : 0) +1;
|
|
|
|
}
|
|
|
|
return counts;
|
|
|
|
}
|
|
|
|
|
|
|
|
function try_merge(node, pending, merge_on_change) {
|
|
|
|
var topics = node.topics;
|
|
|
|
var counts = node.topic_counts;
|
|
|
|
for(var topic of topics) {
|
|
|
|
if(!pending.hasOwnProperty(topic) ||
|
|
|
|
(pending[topic].length < counts[topic])) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
var merge_on_change = node.merge_on_change;
|
|
|
|
var msgs = [];
|
|
|
|
var vals = [];
|
|
|
|
var new_msg = {payload: vals};
|
|
|
|
for (var topic of topics) {
|
|
|
|
var pmsgs = pending[topic];
|
|
|
|
var msg = pmsgs.shift();
|
|
|
|
if (merge_on_change) {
|
|
|
|
pmsgs.push(msg);
|
|
|
|
}
|
|
|
|
var pval = msg.payload;
|
|
|
|
var val = new_msg[topic];
|
|
|
|
msgs.push(msg);
|
|
|
|
vals.push(pval);
|
|
|
|
if (val instanceof Array) {
|
|
|
|
new_msg[topic].push(pval);
|
|
|
|
}
|
|
|
|
else if (val === undefined) {
|
|
|
|
new_msg[topic] = pval;
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
new_msg[topic] = [val, pval]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
node.send(new_msg);
|
|
|
|
if (!merge_on_change) {
|
|
|
|
node.pending_count -= topics.length;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function merge_msg(node, msg) {
|
|
|
|
var topics = node.topics;
|
|
|
|
var topic = msg.topic;
|
|
|
|
if(node.topics.indexOf(topic) >= 0) {
|
|
|
|
var pending = node.pending;
|
|
|
|
if(node.topic_counts == undefined) {
|
|
|
|
node.topic_counts = compute_topic_counts(topics)
|
|
|
|
}
|
|
|
|
add_to_topic(node, pending, topic, msg);
|
|
|
|
node.pending_count++;
|
|
|
|
var max_msgs = max_kept_msgs_count(node);
|
|
|
|
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
|
|
|
|
node.pending = {};
|
|
|
|
node.pending_count = 0;
|
|
|
|
node.error(RED._("join.too-many"), msg);
|
|
|
|
}
|
|
|
|
try_merge(node, pending);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function apply_r(exp, accum, msg, index, count) {
|
|
|
|
exp.assign("I", index);
|
|
|
|
exp.assign("N", count);
|
|
|
|
exp.assign("A", accum);
|
|
|
|
return RED.util.evaluateJSONataExpression(exp, msg);
|
|
|
|
}
|
|
|
|
|
|
|
|
function apply_f(exp, accum, count) {
|
|
|
|
exp.assign("N", count);
|
|
|
|
exp.assign("A", accum);
|
|
|
|
return RED.util.evaluateJSONataExpression(exp, {});
|
|
|
|
}
|
|
|
|
|
|
|
|
function exp_or_undefined(exp) {
|
|
|
|
if((exp === "") ||
|
|
|
|
(exp === null)) {
|
|
|
|
return undefined;
|
|
|
|
}
|
|
|
|
return exp
|
|
|
|
}
|
|
|
|
|
|
|
|
function reduce_and_send_group(node, group) {
|
|
|
|
var is_right = node.reduce_right;
|
|
|
|
var flag = is_right ? -1 : 1;
|
|
|
|
var msgs = group.msgs;
|
|
|
|
var accum = node.reduce_init;
|
|
|
|
var reduce_exp = node.reduce_exp;
|
|
|
|
var reduce_fixup = node.reduce_fixup;
|
|
|
|
var count = group.count;
|
|
|
|
msgs.sort(function(x,y) {
|
|
|
|
var ix = x.parts.index;
|
|
|
|
var iy = y.parts.index;
|
|
|
|
if (ix < iy) return -flag;
|
|
|
|
if (ix > iy) return flag;
|
|
|
|
return 0;
|
|
|
|
});
|
|
|
|
for(var msg of msgs) {
|
|
|
|
accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count);
|
|
|
|
}
|
|
|
|
if(reduce_fixup !== undefined) {
|
|
|
|
accum = apply_f(reduce_fixup, accum, count);
|
|
|
|
}
|
|
|
|
node.send({payload: accum});
|
|
|
|
}
|
|
|
|
|
|
|
|
function reduce_msg(node, msg) {
|
|
|
|
if(msg.hasOwnProperty('parts')) {
|
|
|
|
var parts = msg.parts;
|
|
|
|
var pending = node.pending;
|
|
|
|
var pending_count = node.pending_count;
|
|
|
|
var gid = msg.parts.id;
|
|
|
|
if(!pending.hasOwnProperty(gid)) {
|
|
|
|
var count = undefined;
|
|
|
|
if(parts.hasOwnProperty('count')) {
|
|
|
|
count = msg.parts.count;
|
|
|
|
}
|
|
|
|
pending[gid] = {
|
|
|
|
count: count,
|
|
|
|
msgs: []
|
|
|
|
};
|
|
|
|
}
|
|
|
|
var group = pending[gid];
|
|
|
|
var msgs = group.msgs;
|
|
|
|
if(parts.hasOwnProperty('count') &&
|
|
|
|
(group.count === undefined)) {
|
|
|
|
group.count = count;
|
|
|
|
}
|
|
|
|
msgs.push(msg);
|
|
|
|
pending_count++;
|
|
|
|
if(msgs.length === group.count) {
|
|
|
|
delete pending[gid];
|
|
|
|
try {
|
|
|
|
pending_count -= msgs.length;
|
|
|
|
reduce_and_send_group(node, group);
|
|
|
|
} catch(e) {
|
|
|
|
node.error(RED._("join.errors.invalid-expr",{error:e.message})); }
|
|
|
|
}
|
|
|
|
node.pending_count = pending_count;
|
|
|
|
var max_msgs = max_kept_msgs_count(node);
|
|
|
|
if ((max_msgs > 0) && (pending_count > max_msgs)) {
|
|
|
|
node.pending = {};
|
|
|
|
node.pending_count = 0;
|
|
|
|
node.error(RED._("join.too-many"), msg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
node.send(msg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function eval_exp(node, exp, exp_type) {
|
|
|
|
if(exp_type === "flow") {
|
|
|
|
return node.context().flow.get(exp);
|
|
|
|
}
|
|
|
|
else if(exp_type === "global") {
|
|
|
|
return node.context().global.get(exp);
|
|
|
|
}
|
|
|
|
else if(exp_type === "str") {
|
|
|
|
return exp;
|
|
|
|
}
|
|
|
|
else if(exp_type === "num") {
|
|
|
|
return Number(exp);
|
|
|
|
}
|
|
|
|
else if(exp_type === "bool") {
|
|
|
|
if (exp === 'true') {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
else if (exp === 'false') {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else if ((exp_type === "bin") ||
|
|
|
|
(exp_type === "json")) {
|
|
|
|
return JSON.parse(exp);
|
|
|
|
}
|
|
|
|
else if(exp_type === "date") {
|
|
|
|
return Date.now();
|
|
|
|
}
|
|
|
|
else if(exp_type === "jsonata") {
|
|
|
|
var jexp = RED.util.prepareJSONataExpression(exp, node);
|
|
|
|
return RED.util.evaluateJSONataExpression(jexp, {});
|
|
|
|
}
|
|
|
|
throw new Error("unexpected initial value type");
|
|
|
|
}
|
|
|
|
|
2016-06-04 00:40:40 +01:00
|
|
|
function JoinNode(n) {
|
|
|
|
RED.nodes.createNode(this,n);
|
2016-06-09 11:33:40 +01:00
|
|
|
this.mode = n.mode||"auto";
|
2016-06-05 23:32:03 +01:00
|
|
|
this.property = n.property||"payload";
|
|
|
|
this.propertyType = n.propertyType||"msg";
|
2016-06-10 22:51:57 +01:00
|
|
|
if (this.propertyType === 'full') {
|
|
|
|
this.property = "payload";
|
|
|
|
}
|
2016-06-05 23:32:03 +01:00
|
|
|
this.key = n.key||"topic";
|
2016-06-10 22:51:57 +01:00
|
|
|
this.timer = (this.mode === "auto") ? 0 : Number(n.timeout || 0)*1000;
|
2016-06-04 00:40:40 +01:00
|
|
|
this.count = Number(n.count || 0);
|
2017-06-27 17:10:52 +01:00
|
|
|
this.joiner = n.joiner||"";
|
|
|
|
this.joinerType = n.joinerType||"str";
|
|
|
|
|
2018-01-17 19:08:23 +09:00
|
|
|
this.reduce = (this.mode === "reduce");
|
|
|
|
if (this.reduce) {
|
|
|
|
var exp_init = n.reduceInit;
|
|
|
|
var exp_init_type = n.reduceInitType;
|
|
|
|
var exp_reduce = n.reduceExp;
|
|
|
|
var exp_fixup = exp_or_undefined(n.reduceFixup);
|
|
|
|
this.reduce_right = n.reduceRight;
|
|
|
|
try {
|
|
|
|
this.reduce_init = eval_exp(this, exp_init, exp_init_type);
|
|
|
|
this.reduce_exp = RED.util.prepareJSONataExpression(exp_reduce, this);
|
|
|
|
this.reduce_fixup = (exp_fixup !== undefined) ? RED.util.prepareJSONataExpression(exp_fixup, this) : undefined;
|
|
|
|
} catch(e) {
|
|
|
|
this.error(RED._("join.errors.invalid-expr",{error:e.message}));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-06-27 17:10:52 +01:00
|
|
|
if (this.joinerType === "str") {
|
|
|
|
this.joiner = this.joiner.replace(/\\n/g,"\n").replace(/\\r/g,"\r").replace(/\\t/g,"\t").replace(/\\e/g,"\e").replace(/\\f/g,"\f").replace(/\\0/g,"\0");
|
|
|
|
} else if (this.joinerType === "bin") {
|
|
|
|
var joinArray = JSON.parse(n.joiner)
|
|
|
|
if (Array.isArray(joinArray)) {
|
|
|
|
this.joiner = Buffer.from(joinArray);
|
|
|
|
} else {
|
|
|
|
throw new Error("not an array");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-04 00:40:40 +01:00
|
|
|
this.build = n.build || "array";
|
2017-06-05 17:04:17 +01:00
|
|
|
this.accumulate = n.accumulate || "false";
|
2018-01-17 19:08:23 +09:00
|
|
|
|
|
|
|
this.topics = (n.topics || []).map(function(x) { return x.topic; });
|
|
|
|
this.merge_on_change = n.mergeOnChange || false;
|
|
|
|
this.topic_counts = undefined;
|
|
|
|
this.output = n.output || "stream";
|
|
|
|
this.pending = {};
|
|
|
|
this.pending_count = 0;
|
|
|
|
|
2017-06-05 17:04:17 +01:00
|
|
|
//this.topic = n.topic;
|
2016-06-04 00:40:40 +01:00
|
|
|
var node = this;
|
|
|
|
var inflight = {};
|
|
|
|
|
2016-06-09 11:33:40 +01:00
|
|
|
var completeSend = function(partId) {
|
|
|
|
var group = inflight[partId];
|
|
|
|
clearTimeout(group.timeout);
|
2017-06-05 17:04:17 +01:00
|
|
|
if ((node.accumulate !== true) || group.msg.hasOwnProperty("complete")) { delete inflight[partId]; }
|
2017-06-13 21:01:04 +01:00
|
|
|
if (group.type === 'array' && group.arrayLen > 1) {
|
|
|
|
var newArray = [];
|
|
|
|
group.payload.forEach(function(n) {
|
|
|
|
newArray = newArray.concat(n);
|
|
|
|
})
|
|
|
|
group.payload = newArray;
|
|
|
|
} else if (group.type === 'buffer') {
|
|
|
|
var buffers = [];
|
|
|
|
var bufferLen = 0;
|
|
|
|
if (group.joinChar !== undefined) {
|
|
|
|
var joinBuffer = Buffer.from(group.joinChar);
|
2017-06-16 09:21:53 +01:00
|
|
|
for (var i=0; i<group.payload.length; i++) {
|
2017-06-13 21:01:04 +01:00
|
|
|
if (i > 0) {
|
|
|
|
buffers.push(joinBuffer);
|
|
|
|
bufferLen += joinBuffer.length;
|
|
|
|
}
|
|
|
|
buffers.push(group.payload[i]);
|
|
|
|
bufferLen += group.payload[i].length;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
bufferLen = group.bufferLen;
|
|
|
|
buffers = group.payload;
|
|
|
|
}
|
|
|
|
group.payload = Buffer.concat(buffers,bufferLen);
|
|
|
|
}
|
|
|
|
|
2016-06-09 11:33:40 +01:00
|
|
|
if (group.type === 'string') {
|
2017-06-27 17:10:52 +01:00
|
|
|
var groupJoinChar = group.joinChar;
|
|
|
|
if (typeof group.joinChar !== 'string') {
|
|
|
|
groupJoinChar = group.joinChar.toString();
|
|
|
|
}
|
|
|
|
RED.util.setMessageProperty(group.msg,node.property,group.payload.join(groupJoinChar));
|
2016-06-09 11:33:40 +01:00
|
|
|
} else {
|
2017-09-29 03:09:54 +09:00
|
|
|
if (node.propertyType === 'full') {
|
|
|
|
group.msg = RED.util.cloneMessage(group.msg);
|
|
|
|
}
|
2016-06-10 22:51:57 +01:00
|
|
|
RED.util.setMessageProperty(group.msg,node.property,group.payload);
|
2016-06-04 00:40:40 +01:00
|
|
|
}
|
2016-06-09 11:33:40 +01:00
|
|
|
if (group.msg.hasOwnProperty('parts') && group.msg.parts.hasOwnProperty('parts')) {
|
2016-06-13 17:43:22 +01:00
|
|
|
group.msg.parts = group.msg.parts.parts;
|
2016-06-09 11:33:40 +01:00
|
|
|
} else {
|
|
|
|
delete group.msg.parts;
|
2016-06-04 00:40:40 +01:00
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
delete group.msg.complete;
|
2016-06-09 11:33:40 +01:00
|
|
|
node.send(group.msg);
|
2016-06-04 00:40:40 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
this.on("input", function(msg) {
|
2017-06-05 17:04:17 +01:00
|
|
|
try {
|
|
|
|
var property;
|
|
|
|
if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) {
|
|
|
|
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
|
2016-06-05 23:32:03 +01:00
|
|
|
return;
|
|
|
|
}
|
2018-01-17 19:08:23 +09:00
|
|
|
if (node.mode === 'merge' && !msg.hasOwnProperty("topic")) {
|
|
|
|
node.warn("Message missing msg.topic property - cannot join in 'merge' mode");
|
|
|
|
return;
|
|
|
|
}
|
2017-06-23 21:48:13 +01:00
|
|
|
|
2017-06-05 17:04:17 +01:00
|
|
|
if (node.propertyType == "full") {
|
|
|
|
property = msg;
|
2016-06-09 11:33:40 +01:00
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
else {
|
|
|
|
try {
|
|
|
|
property = RED.util.getMessageProperty(msg,node.property);
|
|
|
|
} catch(err) {
|
|
|
|
node.warn("Message property "+node.property+" not found");
|
|
|
|
return;
|
|
|
|
}
|
2016-06-09 11:33:40 +01:00
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
|
|
|
|
var partId;
|
|
|
|
var payloadType;
|
|
|
|
var propertyKey;
|
|
|
|
var targetCount;
|
|
|
|
var joinChar;
|
2017-06-13 21:01:04 +01:00
|
|
|
var arrayLen;
|
2017-06-05 17:04:17 +01:00
|
|
|
var propertyIndex;
|
2016-06-10 22:51:57 +01:00
|
|
|
if (node.mode === "auto") {
|
2017-06-05 17:04:17 +01:00
|
|
|
// Use msg.parts to identify all of the group information
|
|
|
|
partId = msg.parts.id;
|
|
|
|
payloadType = msg.parts.type;
|
|
|
|
targetCount = msg.parts.count;
|
|
|
|
joinChar = msg.parts.ch;
|
|
|
|
propertyKey = msg.parts.key;
|
2017-06-13 21:01:04 +01:00
|
|
|
arrayLen = msg.parts.len;
|
2017-06-05 17:04:17 +01:00
|
|
|
propertyIndex = msg.parts.index;
|
2016-06-10 22:51:57 +01:00
|
|
|
}
|
2018-01-17 19:08:23 +09:00
|
|
|
else if (node.mode === 'merge') {
|
|
|
|
merge_msg(node, msg);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
else if (node.mode === 'reduce') {
|
|
|
|
reduce_msg(node, msg);
|
|
|
|
return;
|
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
else {
|
|
|
|
// Use the node configuration to identify all of the group information
|
|
|
|
partId = "_";
|
|
|
|
payloadType = node.build;
|
|
|
|
targetCount = node.count;
|
|
|
|
joinChar = node.joiner;
|
|
|
|
if (targetCount === 0 && msg.hasOwnProperty('parts')) {
|
|
|
|
targetCount = msg.parts.count || 0;
|
|
|
|
}
|
|
|
|
if (node.build === 'object') {
|
|
|
|
propertyKey = RED.util.getMessageProperty(msg,node.key);
|
2016-06-04 00:40:40 +01:00
|
|
|
}
|
2016-06-05 23:32:03 +01:00
|
|
|
}
|
2017-06-23 21:48:13 +01:00
|
|
|
|
2017-06-05 17:04:17 +01:00
|
|
|
if ((payloadType === 'object') && (propertyKey === null || propertyKey === undefined || propertyKey === "")) {
|
|
|
|
if (node.mode === "auto") {
|
|
|
|
node.warn("Message missing 'msg.parts.key' property - cannot add to object");
|
2017-06-23 21:48:13 +01:00
|
|
|
}
|
|
|
|
else {
|
|
|
|
if (msg.hasOwnProperty('complete')) {
|
|
|
|
completeSend(partId);
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
node.warn("Message missing key property 'msg."+node.key+"' - cannot add to object")
|
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (!inflight.hasOwnProperty(partId)) {
|
|
|
|
if (payloadType === 'object' || payloadType === 'merged') {
|
|
|
|
inflight[partId] = {
|
|
|
|
currentCount:0,
|
|
|
|
payload:{},
|
|
|
|
targetCount:targetCount,
|
|
|
|
type:"object",
|
|
|
|
msg:msg
|
|
|
|
};
|
|
|
|
}
|
|
|
|
else if (node.accumulate === true) {
|
|
|
|
if (msg.hasOwnProperty("reset")) { delete inflight[partId]; }
|
|
|
|
inflight[partId] = inflight[partId] || {
|
|
|
|
currentCount:0,
|
|
|
|
payload:{},
|
|
|
|
targetCount:targetCount,
|
2017-06-15 00:11:35 +01:00
|
|
|
type:payloadType,
|
2017-06-05 17:04:17 +01:00
|
|
|
msg:msg
|
|
|
|
}
|
2017-06-27 17:10:52 +01:00
|
|
|
if (payloadType === 'string' || payloadType === 'array' || payloadType === 'buffer') {
|
2017-06-15 00:11:35 +01:00
|
|
|
inflight[partId].payload = [];
|
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
}
|
|
|
|
else {
|
|
|
|
inflight[partId] = {
|
|
|
|
currentCount:0,
|
|
|
|
payload:[],
|
|
|
|
targetCount:targetCount,
|
|
|
|
type:payloadType,
|
|
|
|
msg:msg
|
|
|
|
};
|
|
|
|
if (payloadType === 'string') {
|
|
|
|
inflight[partId].joinChar = joinChar;
|
2017-06-13 21:01:04 +01:00
|
|
|
} else if (payloadType === 'array') {
|
|
|
|
inflight[partId].arrayLen = arrayLen;
|
|
|
|
} else if (payloadType === 'buffer') {
|
|
|
|
inflight[partId].bufferLen = 0;
|
2017-06-26 23:10:08 +01:00
|
|
|
inflight[partId].joinChar = joinChar;
|
2017-06-05 17:04:17 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if (node.timer > 0) {
|
|
|
|
inflight[partId].timeout = setTimeout(function() {
|
|
|
|
completeSend(partId)
|
|
|
|
}, node.timer)
|
|
|
|
}
|
2016-06-04 00:40:40 +01:00
|
|
|
}
|
2016-06-09 11:33:40 +01:00
|
|
|
|
2017-06-05 17:04:17 +01:00
|
|
|
var group = inflight[partId];
|
2017-06-13 21:01:04 +01:00
|
|
|
if (payloadType === 'buffer') {
|
|
|
|
inflight[partId].bufferLen += property.length;
|
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
if (payloadType === 'object') {
|
|
|
|
group.payload[propertyKey] = property;
|
|
|
|
group.currentCount = Object.keys(group.payload).length;
|
|
|
|
//msg.topic = node.topic || msg.topic;
|
2017-06-13 21:01:04 +01:00
|
|
|
} else if (payloadType === 'merged') {
|
2017-06-05 17:04:17 +01:00
|
|
|
if (Array.isArray(property) || typeof property !== 'object') {
|
|
|
|
if (!msg.hasOwnProperty("complete")) {
|
|
|
|
node.warn("Cannot merge non-object types");
|
|
|
|
}
|
2017-06-13 21:01:04 +01:00
|
|
|
} else {
|
2017-06-05 17:04:17 +01:00
|
|
|
for (propertyKey in property) {
|
2017-12-05 21:45:43 +00:00
|
|
|
if (property.hasOwnProperty(propertyKey) && propertyKey !== '_msgid') {
|
2017-06-05 17:04:17 +01:00
|
|
|
group.payload[propertyKey] = property[propertyKey];
|
|
|
|
}
|
2016-06-11 21:43:37 +01:00
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
group.currentCount = Object.keys(group.payload).length;
|
|
|
|
//group.currentCount++;
|
|
|
|
}
|
2017-06-13 21:01:04 +01:00
|
|
|
} else {
|
2017-06-05 17:04:17 +01:00
|
|
|
if (!isNaN(propertyIndex)) {
|
|
|
|
group.payload[propertyIndex] = property;
|
|
|
|
} else {
|
|
|
|
group.payload.push(property);
|
2016-06-11 21:43:37 +01:00
|
|
|
}
|
|
|
|
group.currentCount++;
|
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
// TODO: currently reuse the last received - add option to pick first received
|
|
|
|
group.msg = msg;
|
2017-06-16 09:21:53 +01:00
|
|
|
var tcnt = group.targetCount;
|
|
|
|
if (msg.hasOwnProperty("parts")) { tcnt = group.targetCount || msg.parts.count; }
|
2017-07-05 14:12:28 +01:00
|
|
|
if ((tcnt > 0 && group.currentCount >= tcnt) || msg.hasOwnProperty('complete')) {
|
2017-06-05 17:04:17 +01:00
|
|
|
completeSend(partId);
|
2016-06-09 11:33:40 +01:00
|
|
|
}
|
2017-06-05 17:04:17 +01:00
|
|
|
} catch(err) {
|
|
|
|
console.log(err.stack);
|
2016-06-09 11:33:40 +01:00
|
|
|
}
|
2016-06-04 00:40:40 +01:00
|
|
|
});
|
|
|
|
|
|
|
|
this.on("close", function() {
|
|
|
|
for (var i in inflight) {
|
2016-06-09 11:33:40 +01:00
|
|
|
if (inflight.hasOwnProperty(i)) {
|
|
|
|
clearTimeout(inflight[i].timeout);
|
|
|
|
}
|
2016-06-04 00:40:40 +01:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
RED.nodes.registerType("join",JoinNode);
|
|
|
|
}
|