1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

correlate joined/split messages

This commit is contained in:
Kunihiko Toumura 2021-09-02 14:33:14 +09:00
parent 40f816c311
commit e994b9d5e1

View File

@ -18,20 +18,26 @@ module.exports = function(RED) {
"use strict";
function sendArray(node,msg,array,send) {
const corrIds = [];
for (var i = 0; i < array.length-1; i++) {
msg.payload = array[i];
msg.parts.index = node.c++;
if (node.stream !== true) { msg.parts.count = array.length; }
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
}
if (node.stream !== true) {
msg.payload = array[i];
msg.parts.index = node.c++;
msg.parts.count = array.length;
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
node.c = 0;
}
else { node.remainder = array[i]; }
return corrIds;
}
function SplitNode(n) {
@ -69,15 +75,28 @@ module.exports = function(RED) {
node.buffer = Buffer.from([]);
node.pendingDones = [];
this.on("input", function(msg, send, done) {
function swapIdThenDone(msgInfo, sentMsgIds) {
const currentMsgid = msgInfo.msg._msgid;
msgInfo.msg._msgid = msgInfo.msgid;
msgInfo.done();
let c = [];
if (msgInfo.corrMsgids) { Array.prototype.push.apply(c, msgInfo.corrMsgids); }
if (sentMsgIds) { Array.prototype.push.apply(c, sentMsgIds); }
if (c.length > 0) { node.metric("correlate",msgInfo.msg,{split: c}); }
msgInfo.msg._msgid = currentMsgid;
}
if (msg.hasOwnProperty("payload")) {
if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack
else { msg.parts = {}; }
msg.parts.id = RED.util.generateId(); // generate a random id
const origMsgid = msg._msgid;
delete msg._msgid;
if (typeof msg.payload === "string") { // Split String into array
msg.payload = (node.remainder || "") + msg.payload;
msg.parts.type = "string";
if (node.spltType === "len") {
const corrIds = [];
msg.parts.ch = "";
msg.parts.len = node.splt;
var count = msg.payload.length/node.splt;
@ -94,23 +113,28 @@ module.exports = function(RED) {
msg.payload = data.substring(pos,pos+node.splt);
msg.parts.index = node.c++;
pos += node.splt;
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, [corrIds[0]]));
node.pendingDones = [];
}
node.remainder = data.substring(pos);
if ((node.stream !== true) || (node.remainder.length === node.splt)) {
msg.payload = node.remainder;
msg.parts.index = node.c++;
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones.push({done,msg,msgid:origMsgid});
node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, corrIds));
node.pendingDones = [];
done();
node.remainder = "";
} else {
node.pendingDones.push(done);
node.pendingDones.push({done,msg,msgid:origMsgid,
corrMsgids: corrIds});
}
}
else {
@ -125,8 +149,22 @@ module.exports = function(RED) {
a = msg.payload.split(node.splt);
msg.parts.ch = node.splt; // pass the split char to other end for rejoin
}
sendArray(node,msg,a,send);
done();
const corrIds = sendArray(node,msg,a,send);
if (node.stream) {
if (a.length > 1) {
node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, [corrIds[0]]));
node.pendingDones = [];
if (a[a.length-1] == '') {
swapIdThenDone({done,msg,msgid:origMsgid}, corrIds);
} else {
node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgId:corrIds});
}
} else {
node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgId:corrIds});
}
} else {
swapIdThenDone({done,msg,msgid:origMsgid}, corrIds);
}
}
}
else if (Array.isArray(msg.payload)) { // then split array into messages
@ -139,6 +177,7 @@ module.exports = function(RED) {
var pos = 0;
var data = msg.payload;
msg.parts.len = node.arraySplt;
const corrIds = []
for (var i=0; i<count; i++) {
msg.payload = data.slice(pos,pos+node.arraySplt);
if (node.arraySplt === 1) {
@ -146,15 +185,18 @@ module.exports = function(RED) {
}
msg.parts.index = i;
pos += node.arraySplt;
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
}
done();
swapIdThenDone({done,msg,msgid:origMsgid}, corrIds);
}
else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
var j = 0;
var l = Object.keys(msg.payload).length;
var pay = msg.payload;
msg.parts.type = "object";
const corrIds = [];
for (var p in pay) {
if (pay.hasOwnProperty(p)) {
msg.payload = pay[p];
@ -164,17 +206,20 @@ module.exports = function(RED) {
msg.parts.key = p;
msg.parts.index = j;
msg.parts.count = l;
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
j += 1;
}
}
done();
swapIdThenDone({done,msg,msgid:origMsgid}, corrIds);
}
else if (Buffer.isBuffer(msg.payload)) {
var len = node.buffer.length + msg.payload.length;
var buff = Buffer.concat([node.buffer, msg.payload], len);
msg.parts.type = "buffer";
if (node.spltType === "len") {
const corrIds = [];
var count = buff.length/node.splt;
if (Math.floor(count) !== count) {
count = Math.ceil(count);
@ -189,26 +234,31 @@ module.exports = function(RED) {
msg.payload = buff.slice(pos,pos+node.splt);
msg.parts.index = node.c++;
pos += node.splt;
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, [corrIds[0]]));
node.pendingDones = [];
}
node.buffer = buff.slice(pos);
if ((node.stream !== true) || (node.buffer.length === node.splt)) {
msg.payload = node.buffer;
msg.parts.index = node.c++;
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones.push({done,msg,msgid:origMsgid});
node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, corrIds));
node.pendingDones = [];
done();
node.buffer = Buffer.from([]);
} else {
node.pendingDones.push(done);
node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgids:corrIds});
}
}
else {
const corrIds = [];
var count = 0;
if (node.spltType === "bin") {
msg.parts.ch = node.spltBuffer;
@ -232,33 +282,37 @@ module.exports = function(RED) {
while (pos > -1) {
msg.payload = buff.slice(p,pos);
msg.parts.index = node.c++;
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
i++;
p = pos+node.splt.length;
pos = buff.indexOf(node.splt,p);
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, [corrIds[0]]));
node.pendingDones = [];
}
if ((node.stream !== true) && (p < buff.length)) {
msg.payload = buff.slice(p,buff.length);
msg.parts.index = node.c++;
msg.parts.count = node.c++;
msg._msgid = RED.util.generateId();
corrIds.push(msg._msgid);
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones.forEach(msgInfo => swapIdThenDone(msgInfo, corrIds));
node.pendingDones = [];
}
else {
node.buffer = buff.slice(p,buff.length);
node.pendingDones.push(done);
node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgids:corrIds});
}
if (node.buffer.length == 0) {
done();
swapIdThenDone({done,msg,msgid:origMsgid},corrIds);
}
}
} else { // otherwise drop the message.
done();
swapIdThenDone({done,msg,msgid:origMsgid});
}
}
});
@ -304,7 +358,7 @@ module.exports = function(RED) {
exp.assign("A", accumulator);
RED.util.evaluateJSONataExpression(exp, msgInfo.msg, (err,result) => {
if (err) {
return done(err);
return done(err, null);
}
if (msgInfos.length === 0) {
if (fixup) {
@ -312,14 +366,16 @@ module.exports = function(RED) {
fixup.assign("A", result);
RED.util.evaluateJSONataExpression(fixup, {}, (err, result) => {
if (err) {
return done(err);
return done(err, null);
}
msgInfo.send({payload: result});
done();
const _msgid = RED.util.generateId()
msgInfo.send({payload: result, _msgid});
done(null, _msgid);
});
} else {
msgInfo.send({payload: result});
done();
const _msgid = RED.util.generateId()
msgInfo.send({payload: result, _msgid});
done(null, _msgid);
}
} else {
reduceMessageGroup(node,msgInfos,exp,fixup,count,result,done);
@ -350,7 +406,9 @@ module.exports = function(RED) {
done(err);
return;
} else {
preservedMsgInfos.forEach(mInfo => mInfo.done());
const corrIds = [];
preservedMsgInfos.forEach(mInfo => { corrIds.push(mInfo.msg._msgid); mInfo.done()});
if (corrIds.length > 0) { node.metric("correlate", {_msgid:result}, {join: corrIds}); }
done();
}
})
@ -529,6 +587,7 @@ module.exports = function(RED) {
group.send(RED.util.cloneMessage(group.msg));
group.dones.forEach(f => f());
group.dones = [];
node.metric("correlate",group.msg,{join: group.corrIds});
}
var pendingMessages = [];
@ -675,7 +734,8 @@ module.exports = function(RED) {
type:"object",
msg:RED.util.cloneMessage(msg),
send: send,
dones: []
dones: [],
corrIds: []
};
}
else {
@ -686,7 +746,8 @@ module.exports = function(RED) {
type:payloadType,
msg:RED.util.cloneMessage(msg),
send: send,
dones: []
dones: [],
corrIds: []
};
if (payloadType === 'string') {
inflight[partId].joinChar = joinChar;
@ -704,6 +765,7 @@ module.exports = function(RED) {
}
}
inflight[partId].dones.push(done);
inflight[partId].corrIds.push(msg._msgid);
var group = inflight[partId];
if (payloadType === 'buffer') {