From e994b9d5e123f09e8d0b4446b040c5c338abe9aa Mon Sep 17 00:00:00 2001 From: Kunihiko Toumura Date: Thu, 2 Sep 2021 14:33:14 +0900 Subject: [PATCH] correlate joined/split messages --- .../@node-red/nodes/core/sequence/17-split.js | 114 ++++++++++++++---- 1 file changed, 88 insertions(+), 26 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/sequence/17-split.js b/packages/node_modules/@node-red/nodes/core/sequence/17-split.js index bfc84a8d2..12e08f8ae 100644 --- a/packages/node_modules/@node-red/nodes/core/sequence/17-split.js +++ b/packages/node_modules/@node-red/nodes/core/sequence/17-split.js @@ -18,20 +18,26 @@ module.exports = function(RED) { "use strict"; function sendArray(node,msg,array,send) { + const corrIds = []; for (var i = 0; i < array.length-1; i++) { msg.payload = array[i]; msg.parts.index = node.c++; if (node.stream !== true) { msg.parts.count = array.length; } + msg._msgid = RED.util.generateId(); + corrIds.push(msg._msgid); send(RED.util.cloneMessage(msg)); } if (node.stream !== true) { msg.payload = array[i]; msg.parts.index = node.c++; msg.parts.count = array.length; + msg._msgid = RED.util.generateId(); + corrIds.push(msg._msgid); send(RED.util.cloneMessage(msg)); node.c = 0; } else { node.remainder = array[i]; } + return corrIds; } function SplitNode(n) { @@ -69,15 +75,28 @@ module.exports = function(RED) { node.buffer = Buffer.from([]); node.pendingDones = []; this.on("input", function(msg, send, done) { + function swapIdThenDone(msgInfo, sentMsgIds) { + const currentMsgid = msgInfo.msg._msgid; + msgInfo.msg._msgid = msgInfo.msgid; + msgInfo.done(); + let c = []; + if (msgInfo.corrMsgids) { Array.prototype.push.apply(c, msgInfo.corrMsgids); } + if (sentMsgIds) { Array.prototype.push.apply(c, sentMsgIds); } + if (c.length > 0) { node.metric("correlate",msgInfo.msg,{split: c}); } + msgInfo.msg._msgid = currentMsgid; + } + if (msg.hasOwnProperty("payload")) { if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack else { msg.parts = {}; } msg.parts.id = RED.util.generateId(); // generate a random id + const origMsgid = msg._msgid; delete msg._msgid; if (typeof msg.payload === "string") { // Split String into array msg.payload = (node.remainder || "") + msg.payload; msg.parts.type = "string"; if (node.spltType === "len") { + const corrIds = []; msg.parts.ch = ""; msg.parts.len = node.splt; var count = msg.payload.length/node.splt; @@ -94,23 +113,28 @@ module.exports = function(RED) { msg.payload = data.substring(pos,pos+node.splt); msg.parts.index = node.c++; pos += node.splt; + msg._msgid = RED.util.generateId(); + corrIds.push(msg._msgid); send(RED.util.cloneMessage(msg)); } if (count > 1) { - node.pendingDones.forEach(d => d()); + node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, [corrIds[0]])); node.pendingDones = []; } node.remainder = data.substring(pos); if ((node.stream !== true) || (node.remainder.length === node.splt)) { msg.payload = node.remainder; msg.parts.index = node.c++; + msg._msgid = RED.util.generateId(); + corrIds.push(msg._msgid); send(RED.util.cloneMessage(msg)); - node.pendingDones.forEach(d => d()); + node.pendingDones.push({done,msg,msgid:origMsgid}); + node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, corrIds)); node.pendingDones = []; - done(); node.remainder = ""; } else { - node.pendingDones.push(done); + node.pendingDones.push({done,msg,msgid:origMsgid, + corrMsgids: corrIds}); } } else { @@ -125,8 +149,22 @@ module.exports = function(RED) { a = msg.payload.split(node.splt); msg.parts.ch = node.splt; // pass the split char to other end for rejoin } - sendArray(node,msg,a,send); - done(); + const corrIds = sendArray(node,msg,a,send); + if (node.stream) { + if (a.length > 1) { + node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, [corrIds[0]])); + node.pendingDones = []; + if (a[a.length-1] == '') { + swapIdThenDone({done,msg,msgid:origMsgid}, corrIds); + } else { + node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgId:corrIds}); + } + } else { + node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgId:corrIds}); + } + } else { + swapIdThenDone({done,msg,msgid:origMsgid}, corrIds); + } } } else if (Array.isArray(msg.payload)) { // then split array into messages @@ -139,6 +177,7 @@ module.exports = function(RED) { var pos = 0; var data = msg.payload; msg.parts.len = node.arraySplt; + const corrIds = [] for (var i=0; i 1) { - node.pendingDones.forEach(d => d()); + node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, [corrIds[0]])); node.pendingDones = []; } node.buffer = buff.slice(pos); if ((node.stream !== true) || (node.buffer.length === node.splt)) { msg.payload = node.buffer; msg.parts.index = node.c++; + msg._msgid = RED.util.generateId(); + corrIds.push(msg._msgid); send(RED.util.cloneMessage(msg)); - node.pendingDones.forEach(d => d()); + node.pendingDones.push({done,msg,msgid:origMsgid}); + node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, corrIds)); node.pendingDones = []; - done(); node.buffer = Buffer.from([]); } else { - node.pendingDones.push(done); + node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgids:corrIds}); } } else { + const corrIds = []; var count = 0; if (node.spltType === "bin") { msg.parts.ch = node.spltBuffer; @@ -232,33 +282,37 @@ module.exports = function(RED) { while (pos > -1) { msg.payload = buff.slice(p,pos); msg.parts.index = node.c++; + msg._msgid = RED.util.generateId(); + corrIds.push(msg._msgid); send(RED.util.cloneMessage(msg)); i++; p = pos+node.splt.length; pos = buff.indexOf(node.splt,p); } if (count > 1) { - node.pendingDones.forEach(d => d()); + node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, [corrIds[0]])); node.pendingDones = []; } if ((node.stream !== true) && (p < buff.length)) { msg.payload = buff.slice(p,buff.length); msg.parts.index = node.c++; msg.parts.count = node.c++; + msg._msgid = RED.util.generateId(); + corrIds.push(msg._msgid); send(RED.util.cloneMessage(msg)); - node.pendingDones.forEach(d => d()); + node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, corrIds)); node.pendingDones = []; } else { node.buffer = buff.slice(p,buff.length); - node.pendingDones.push(done); + node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgids:corrIds}); } if (node.buffer.length == 0) { - done(); + swapIdThenDone({done,msg,msgid:origMsgid},corrIds); } } } else { // otherwise drop the message. - done(); + swapIdThenDone({done,msg,msgid:origMsgid}); } } }); @@ -304,7 +358,7 @@ module.exports = function(RED) { exp.assign("A", accumulator); RED.util.evaluateJSONataExpression(exp, msgInfo.msg, (err,result) => { if (err) { - return done(err); + return done(err, null); } if (msgInfos.length === 0) { if (fixup) { @@ -312,14 +366,16 @@ module.exports = function(RED) { fixup.assign("A", result); RED.util.evaluateJSONataExpression(fixup, {}, (err, result) => { if (err) { - return done(err); + return done(err, null); } - msgInfo.send({payload: result}); - done(); + const _msgid = RED.util.generateId() + msgInfo.send({payload: result, _msgid}); + done(null, _msgid); }); } else { - msgInfo.send({payload: result}); - done(); + const _msgid = RED.util.generateId() + msgInfo.send({payload: result, _msgid}); + done(null, _msgid); } } else { reduceMessageGroup(node,msgInfos,exp,fixup,count,result,done); @@ -350,7 +406,9 @@ module.exports = function(RED) { done(err); return; } else { - preservedMsgInfos.forEach(mInfo => mInfo.done()); + const corrIds = []; + preservedMsgInfos.forEach(mInfo => { corrIds.push(mInfo.msg._msgid); mInfo.done()}); + if (corrIds.length > 0) { node.metric("correlate", {_msgid:result}, {join: corrIds}); } done(); } }) @@ -529,6 +587,7 @@ module.exports = function(RED) { group.send(RED.util.cloneMessage(group.msg)); group.dones.forEach(f => f()); group.dones = []; + node.metric("correlate",group.msg,{join: group.corrIds}); } var pendingMessages = []; @@ -675,7 +734,8 @@ module.exports = function(RED) { type:"object", msg:RED.util.cloneMessage(msg), send: send, - dones: [] + dones: [], + corrIds: [] }; } else { @@ -686,7 +746,8 @@ module.exports = function(RED) { type:payloadType, msg:RED.util.cloneMessage(msg), send: send, - dones: [] + dones: [], + corrIds: [] }; if (payloadType === 'string') { inflight[partId].joinChar = joinChar; @@ -704,6 +765,7 @@ module.exports = function(RED) { } } inflight[partId].dones.push(done); + inflight[partId].corrIds.push(msg._msgid); var group = inflight[partId]; if (payloadType === 'buffer') {