/** * Copyright JS Foundation and other contributors, http://js.foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. **/ module.exports = function(RED) { "use strict"; function sendArray(node,msg,array,send) { for (var i = 0; i < array.length-1; i++) { msg.payload = array[i]; msg.parts.index = node.c++; if (node.stream !== true) { msg.parts.count = array.length; } send(RED.util.cloneMessage(msg)); } if (node.stream !== true) { msg.payload = array[i]; msg.parts.index = node.c++; msg.parts.count = array.length; send(RED.util.cloneMessage(msg)); node.c = 0; } else { node.remainder = array[i]; } } function SplitNode(n) { RED.nodes.createNode(this,n); var node = this; node.stream = n.stream; node.spltType = n.spltType || "str"; node.addname = n.addname || ""; try { if (node.spltType === "str") { this.splt = (n.splt || "\\n").replace(/\\n/g,"\n").replace(/\\r/g,"\r").replace(/\\t/g,"\t").replace(/\\e/g,"\e").replace(/\\f/g,"\f").replace(/\\0/g,"\0"); } else if (node.spltType === "bin") { var spltArray = JSON.parse(n.splt); if (Array.isArray(spltArray)) { this.splt = Buffer.from(spltArray); } else { throw new Error("not an array"); } this.spltBuffer = spltArray; } else if (node.spltType === "len") { this.splt = parseInt(n.splt); if (isNaN(this.splt) || this.splt < 1) { throw new Error("invalid split length: "+n.splt); } } this.arraySplt = (n.arraySplt === undefined)?1:parseInt(n.arraySplt); if (isNaN(this.arraySplt) || this.arraySplt < 1) { throw new Error("invalid array split length: "+n.arraySplt); } } catch(err) { this.error("Invalid split property: "+err.toString()); return; } node.c = 0; node.buffer = Buffer.from([]); node.pendingDones = []; this.on("input", function(msg, send, done) { if (msg.hasOwnProperty("payload")) { if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack else { msg.parts = {}; } msg.parts.id = RED.util.generateId(); // generate a random id delete msg._msgid; if (typeof msg.payload === "string") { // Split String into array msg.payload = (node.remainder || "") + msg.payload; msg.parts.type = "string"; if (node.spltType === "len") { msg.parts.ch = ""; msg.parts.len = node.splt; var count = msg.payload.length/node.splt; if (Math.floor(count) !== count) { count = Math.ceil(count); } if (node.stream !== true) { msg.parts.count = count; node.c = 0; } var pos = 0; var data = msg.payload; for (var i=0; i 1) { node.pendingDones.forEach(d => d()); node.pendingDones = []; } node.remainder = data.substring(pos); if ((node.stream !== true) || (node.remainder.length === node.splt)) { msg.payload = node.remainder; msg.parts.index = node.c++; send(RED.util.cloneMessage(msg)); node.pendingDones.forEach(d => d()); node.pendingDones = []; done(); node.remainder = ""; } else { node.pendingDones.push(done); } } else { var a = []; if (node.spltType === "bin") { if (!node.spltBufferString) { node.spltBufferString = node.splt.toString(); } a = msg.payload.split(node.spltBufferString); msg.parts.ch = node.spltBuffer; // pass the split char to other end for rejoin } else if (node.spltType === "str") { a = msg.payload.split(node.splt); msg.parts.ch = node.splt; // pass the split char to other end for rejoin } sendArray(node,msg,a,send); done(); } } else if (Array.isArray(msg.payload)) { // then split array into messages msg.parts.type = "array"; var count = msg.payload.length/node.arraySplt; if (Math.floor(count) !== count) { count = Math.ceil(count); } msg.parts.count = count; var pos = 0; var data = msg.payload; msg.parts.len = node.arraySplt; for (var i=0; i 1) { node.pendingDones.forEach(d => d()); node.pendingDones = []; } node.buffer = buff.slice(pos); if ((node.stream !== true) || (node.buffer.length === node.splt)) { msg.payload = node.buffer; msg.parts.index = node.c++; send(RED.util.cloneMessage(msg)); node.pendingDones.forEach(d => d()); node.pendingDones = []; done(); node.buffer = Buffer.from([]); } else { node.pendingDones.push(done); } } else { var count = 0; if (node.spltType === "bin") { msg.parts.ch = node.spltBuffer; } else if (node.spltType === "str") { msg.parts.ch = node.splt; } var pos = buff.indexOf(node.splt); var end; while (pos > -1) { count++; end = pos+node.splt.length; pos = buff.indexOf(node.splt,end); } count++; if (node.stream !== true) { msg.parts.count = count; node.c = 0; } var i = 0, p = 0; pos = buff.indexOf(node.splt); while (pos > -1) { msg.payload = buff.slice(p,pos); msg.parts.index = node.c++; send(RED.util.cloneMessage(msg)); i++; p = pos+node.splt.length; pos = buff.indexOf(node.splt,p); } if (count > 1) { node.pendingDones.forEach(d => d()); node.pendingDones = []; } if ((node.stream !== true) && (p < buff.length)) { msg.payload = buff.slice(p,buff.length); msg.parts.index = node.c++; msg.parts.count = node.c++; send(RED.util.cloneMessage(msg)); node.pendingDones.forEach(d => d()); node.pendingDones = []; } else { node.buffer = buff.slice(p,buff.length); if (node.buffer.length > 0) { node.pendingDones.push(done); } } if (node.buffer.length == 0) { done(); } } } else { // otherwise drop the message. done(); } } }); } RED.nodes.registerType("split",SplitNode); var _maxKeptMsgsCount; function maxKeptMsgsCount(node) { if (_maxKeptMsgsCount === undefined) { var name = "nodeMessageBufferMaxLength"; if (RED.settings.hasOwnProperty(name)) { _maxKeptMsgsCount = RED.settings[name]; } else { _maxKeptMsgsCount = 0; } } return _maxKeptMsgsCount; } function applyReduce(exp, accum, msg, index, count, done) { exp.assign("I", index); exp.assign("N", count); exp.assign("A", accum); RED.util.evaluateJSONataExpression(exp, msg, done); } function exp_or_undefined(exp) { if((exp === "") || (exp === null)) { return undefined; } return exp } function reduceMessageGroup(node,msgInfos,exp,fixup,count,accumulator,done) { var msgInfo = msgInfos.shift(); exp.assign("I", msgInfo.msg.parts.index); exp.assign("N", count); exp.assign("A", accumulator); RED.util.evaluateJSONataExpression(exp, msgInfo.msg, (err,result) => { if (err) { return done(err); } if (msgInfos.length === 0) { if (fixup) { fixup.assign("N", count); fixup.assign("A", result); RED.util.evaluateJSONataExpression(fixup, {}, (err, result) => { if (err) { return done(err); } msgInfo.msg.payload = result; msgInfo.send(msgInfo.msg); done(); }); } else { msgInfo.msg.payload = result; msgInfo.send(msgInfo.msg); done(); } } else { reduceMessageGroup(node,msgInfos,exp,fixup,count,result,done); } }); } function reduceAndSendGroup(node, group, done) { var is_right = node.reduce_right; var flag = is_right ? -1 : 1; var msgInfos = group.msgs; const preservedMsgInfos = [...msgInfos]; try { RED.util.evaluateNodeProperty(node.exp_init, node.exp_init_type, node, {}, (err,accum) => { var reduceExpression = node.reduceExpression; var fixupExpression = node.fixupExpression; var count = group.count; msgInfos.sort(function(x,y) { var ix = x.msg.parts.index; var iy = y.msg.parts.index; if (ix < iy) {return -flag;} if (ix > iy) {return flag;} return 0; }); reduceMessageGroup(node, msgInfos,reduceExpression,fixupExpression,count,accum,(err,result) => { if (err) { preservedMsgInfos.pop(); // omit last message to emit error message preservedMsgInfos.forEach(mInfo => mInfo.done()); done(err); return; } else { preservedMsgInfos.forEach(mInfo => mInfo.done()); done(); } }) }); } catch(err) { done(new Error(RED._("join.errors.invalid-expr",{error:err.message}))); } } function reduceMessage(node, msgInfo, done) { let msg = msgInfo.msg; if (msg.hasOwnProperty('parts')) { var parts = msg.parts; var pending = node.pending; var pending_count = node.pending_count; var gid = msg.parts.id; var count; if (!pending.hasOwnProperty(gid)) { if(parts.hasOwnProperty('count')) { count = msg.parts.count; } pending[gid] = { count: count, msgs: [] }; } var group = pending[gid]; var msgs = group.msgs; if (parts.hasOwnProperty('count') && (group.count === undefined)) { group.count = parts.count; } msgs.push(msgInfo); pending_count++; var completeProcess = function(err) { if (err) { return done(err); } node.pending_count = pending_count; var max_msgs = maxKeptMsgsCount(node); if ((max_msgs > 0) && (pending_count > max_msgs)) { Object.values(node.pending).forEach(group => { group.msgs.forEach(mInfo => { if (mInfo.msg._msgid !== msgInfo.msg._msgid) { mInfo.done(); } }); }); node.pending = {}; node.pending_count = 0; done(RED._("join.too-many")); return; } return done(); } if (msgs.length === group.count) { delete pending[gid]; pending_count -= msgs.length; reduceAndSendGroup(node, group, completeProcess) } else { completeProcess(); } } else { msgInfo.send(msg); msgInfo.done(); done(); } } function JoinNode(n) { RED.nodes.createNode(this,n); this.mode = n.mode||"auto"; this.property = n.property||"payload"; this.propertyType = n.propertyType||"msg"; if (this.propertyType === 'full') { this.property = "payload"; } this.key = n.key||"topic"; this.timer = (this.mode === "auto") ? 0 : Number(n.timeout || 0)*1000; this.count = Number(n.count || 0); this.joiner = n.joiner||""; this.joinerType = n.joinerType||"str"; this.reduce = (this.mode === "reduce"); if (this.reduce) { this.exp_init = n.reduceInit; this.exp_init_type = n.reduceInitType; var exp_reduce = n.reduceExp; var exp_fixup = exp_or_undefined(n.reduceFixup); this.reduce_right = n.reduceRight; try { this.reduceExpression = RED.util.prepareJSONataExpression(exp_reduce, this); this.fixupExpression = (exp_fixup !== undefined) ? RED.util.prepareJSONataExpression(exp_fixup, this) : undefined; } catch(e) { this.error(RED._("join.errors.invalid-expr",{error:e.message})); return; } } if (this.joinerType === "str") { this.joiner = this.joiner.replace(/\\n/g,"\n").replace(/\\r/g,"\r").replace(/\\t/g,"\t").replace(/\\e/g,"\e").replace(/\\f/g,"\f").replace(/\\0/g,"\0"); } else if (this.joinerType === "bin") { var joinArray = JSON.parse(n.joiner || "[]"); if (Array.isArray(joinArray)) { this.joiner = Buffer.from(joinArray); } else { throw new Error("not an array"); } } this.build = n.build || "array"; this.accumulate = n.accumulate || "false"; this.output = n.output || "stream"; this.pending = {}; this.pending_count = 0; //this.topic = n.topic; var node = this; var inflight = {}; var completeSend = function(partId) { var group = inflight[partId]; if (group.timeout) { clearTimeout(group.timeout); } if (node.mode === 'auto' || node.accumulate !== true || group.msg.hasOwnProperty("complete")) { delete inflight[partId]; } if (group.type === 'array' && group.arrayLen > 1) { var newArray = []; group.payload.forEach(function(n) { newArray = newArray.concat(n); }) group.payload = newArray; } else if (group.type === 'buffer') { var buffers = []; var bufferLen = 0; if (group.joinChar !== undefined) { var joinBuffer = Buffer.from(group.joinChar); for (var i=0; i 0) { buffers.push(joinBuffer); bufferLen += joinBuffer.length; } if (!Buffer.isBuffer(group.payload[i])) { group.payload[i] = Buffer.from(group.payload[i]); } buffers.push(group.payload[i]); bufferLen += group.payload[i].length; } } else { bufferLen = group.bufferLen; buffers = group.payload; } group.payload = Buffer.concat(buffers,bufferLen); } if (group.type === 'string') { var groupJoinChar = group.joinChar; if (typeof group.joinChar !== 'string') { groupJoinChar = group.joinChar.toString(); } RED.util.setMessageProperty(group.msg,node.property,group.payload.join(groupJoinChar)); } else { if (node.propertyType === 'full') { group.msg = RED.util.cloneMessage(group.msg); } RED.util.setMessageProperty(group.msg,node.property,group.payload); } if (group.msg.hasOwnProperty('parts') && group.msg.parts.hasOwnProperty('parts')) { group.msg.parts = group.msg.parts.parts; } else { delete group.msg.parts; } delete group.msg.complete; group.send(RED.util.cloneMessage(group.msg)); group.dones.forEach(f => f()); group.dones = []; } var pendingMessages = []; var activeMessage = null; // In reduce mode, we must process messages fully in order otherwise // groups may overlap and cause unexpected results. The use of JSONata // means some async processing *might* occur if flow/global context is // accessed. var processReduceMessageQueue = function(msgInfo) { if (msgInfo) { // A new message has arrived - add it to the message queue pendingMessages.push(msgInfo); if (activeMessage !== null) { // The node is currently processing a message, so do nothing // more with this message return; } } if (pendingMessages.length === 0) { // There are no more messages to process, clear the active flag // and return activeMessage = null; return; } // There are more messages to process. Get the next message and // start processing it. Recurse back in to check for any more var nextMsgInfo = pendingMessages.shift(); activeMessage = true; reduceMessage(node, nextMsgInfo, err => { if (err) { nextMsgInfo.done(err);//.error(err,nextMsg); } activeMessage = null; processReduceMessageQueue(); }) } this.on("input", function(msg, send, done) { try { var property; var partId = "_"; if (node.propertyType == "full") { property = msg; } else { try { property = RED.util.getMessageProperty(msg,node.property); } catch(err) { node.warn("Message property "+node.property+" not found"); done(); return; } } if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) { // if a blank reset messag erest it all. if (msg.hasOwnProperty("reset")) { if (inflight && inflight.hasOwnProperty("partId") && inflight[partId].timeout) { clearTimeout(inflight[partId].timeout); } inflight = {}; } else { node.warn("Message missing msg.parts property - cannot join in 'auto' mode") } done(); return; } var payloadType; var propertyKey; var targetCount; var joinChar; var arrayLen; var propertyIndex; if (node.mode === "auto") { // Use msg.parts to identify all of the group information partId = msg.parts.id; payloadType = msg.parts.type; targetCount = msg.parts.count; joinChar = msg.parts.ch; propertyKey = msg.parts.key; arrayLen = msg.parts.len; propertyIndex = msg.parts.index; } else if (node.mode === 'reduce') { return processReduceMessageQueue({msg, send, done}); } else { // Use the node configuration to identify all of the group information payloadType = node.build; targetCount = node.count; joinChar = node.joiner; if (n.count === "" && msg.hasOwnProperty('parts')) { targetCount = msg.parts.count || 0; if (msg.parts.hasOwnProperty('id')) { partId = msg.parts.id; } } if (node.build === 'object') { propertyKey = RED.util.getMessageProperty(msg,node.key); } } if (msg.hasOwnProperty("restartTimeout")) { if (inflight[partId]) { if (inflight[partId].timeout) { clearTimeout(inflight[partId].timeout); } if (node.timer > 0) { inflight[partId].timeout = setTimeout(function() { completeSend(partId) }, node.timer) } } } if (msg.hasOwnProperty("reset")) { if (inflight[partId]) { if (inflight[partId].timeout) { clearTimeout(inflight[partId].timeout); } inflight[partId].dones.forEach(f => f()); delete inflight[partId] } done(); return; } if ((payloadType === 'object') && (propertyKey === null || propertyKey === undefined || propertyKey === "")) { if (node.mode === "auto") { node.warn("Message missing 'msg.parts.key' property - cannot add to object"); } else { if (msg.hasOwnProperty('complete')) { if (inflight[partId]) { inflight[partId].msg.complete = msg.complete; inflight[partId].send = send; completeSend(partId); } } else { node.warn("Message missing key property 'msg."+node.key+"' - cannot add to object") } } done(); return; } if (!inflight.hasOwnProperty(partId)) { if (payloadType === 'object' || payloadType === 'merged') { inflight[partId] = { currentCount:0, payload:{}, targetCount:targetCount, type:"object", msg:RED.util.cloneMessage(msg), send: send, dones: [] }; } else { inflight[partId] = { currentCount:0, payload:[], targetCount:targetCount, type:payloadType, msg:RED.util.cloneMessage(msg), send: send, dones: [] }; if (payloadType === 'string') { inflight[partId].joinChar = joinChar; } else if (payloadType === 'array') { inflight[partId].arrayLen = arrayLen; } else if (payloadType === 'buffer') { inflight[partId].bufferLen = 0; inflight[partId].joinChar = joinChar; } } if (node.timer > 0) { inflight[partId].timeout = setTimeout(function() { completeSend(partId) }, node.timer) } } inflight[partId].dones.push(done); var group = inflight[partId]; if (payloadType === 'buffer') { if (property !== undefined) { if (Buffer.isBuffer(property) || (typeof property === "string") || Array.isArray(property)) { inflight[partId].bufferLen += property.length; } else { done(RED._("join.errors.invalid-type",{error:(typeof property)})); return; } } } if (payloadType === 'object') { group.payload[propertyKey] = property; group.currentCount = Object.keys(group.payload).length; } else if (payloadType === 'merged') { if (Array.isArray(property) || typeof property !== 'object') { if (!msg.hasOwnProperty("complete")) { node.warn("Cannot merge non-object types"); } } else { for (propertyKey in property) { if (property.hasOwnProperty(propertyKey) && propertyKey !== '_msgid') { group.payload[propertyKey] = property[propertyKey]; } } group.currentCount = Object.keys(group.payload).length; //group.currentCount++; } } else { if (!isNaN(propertyIndex)) { if (group.payload[propertyIndex] == undefined) { group.currentCount++; } group.payload[propertyIndex] = property; } else { if (property !== undefined) { group.payload.push(property); group.currentCount++; } } } group.msg = Object.assign(group.msg, msg); group.send = send; var tcnt = group.targetCount; if (msg.hasOwnProperty("parts")) { tcnt = group.targetCount || msg.parts.count; group.targetCount = tcnt; } if ((tcnt > 0 && group.currentCount >= tcnt) || msg.hasOwnProperty('complete')) { completeSend(partId); } } catch(err) { done(err); console.log(err.stack); } }); this.on("close", function() { for (var i in inflight) { if (inflight.hasOwnProperty(i)) { clearTimeout(inflight[i].timeout); inflight[i].dones.forEach(d => d()); } } }); } RED.nodes.registerType("join",JoinNode); }