Messaging API support in Split/Join nodes

This commit is contained in:
Kunihiko Toumura 2020-11-04 21:43:20 +09:00
parent 15a600c763
commit 407cb3e7d5
2 changed files with 246 additions and 45 deletions

View File

@ -17,18 +17,18 @@
module.exports = function(RED) {
"use strict";
function sendArray(node,msg,array) {
function sendArray(node,msg,array,send) {
for (var i = 0; i < array.length-1; i++) {
msg.payload = array[i];
msg.parts.index = node.c++;
if (node.stream !== true) { msg.parts.count = array.length; }
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
if (node.stream !== true) {
msg.payload = array[i];
msg.parts.index = node.c++;
msg.parts.count = array.length;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.c = 0;
}
else { node.remainder = array[i]; }
@ -67,7 +67,8 @@ module.exports = function(RED) {
}
node.c = 0;
node.buffer = Buffer.from([]);
this.on("input", function(msg) {
node.pendingDones = [];
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("payload")) {
if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack
else { msg.parts = {}; }
@ -93,14 +94,23 @@ module.exports = function(RED) {
msg.payload = data.substring(pos,pos+node.splt);
msg.parts.index = node.c++;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
node.remainder = data.substring(pos);
if ((node.stream !== true) || (node.remainder.length === node.splt)) {
msg.payload = node.remainder;
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones = [];
done();
node.remainder = "";
} else {
node.pendingDones.push(done);
}
}
else {
@ -115,7 +125,8 @@ module.exports = function(RED) {
a = msg.payload.split(node.splt);
msg.parts.ch = node.splt; // pass the split char to other end for rejoin
}
sendArray(node,msg,a);
sendArray(node,msg,a,send);
done();
}
}
else if (Array.isArray(msg.payload)) { // then split array into messages
@ -135,8 +146,9 @@ module.exports = function(RED) {
}
msg.parts.index = i;
pos += node.arraySplt;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
done();
}
else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
var j = 0;
@ -152,10 +164,11 @@ module.exports = function(RED) {
msg.parts.key = p;
msg.parts.index = j;
msg.parts.count = l;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
j += 1;
}
}
done();
}
else if (Buffer.isBuffer(msg.payload)) {
var len = node.buffer.length + msg.payload.length;
@ -176,14 +189,23 @@ module.exports = function(RED) {
msg.payload = buff.slice(pos,pos+node.splt);
msg.parts.index = node.c++;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
node.buffer = buff.slice(pos);
if ((node.stream !== true) || (node.buffer.length === node.splt)) {
msg.payload = node.buffer;
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones = [];
done();
node.buffer = Buffer.from([]);
} else {
node.pendingDones.push(done);
}
}
else {
@ -210,23 +232,34 @@ module.exports = function(RED) {
while (pos > -1) {
msg.payload = buff.slice(p,pos);
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
i++;
p = pos+node.splt.length;
pos = buff.indexOf(node.splt,p);
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
if ((node.stream !== true) && (p < buff.length)) {
msg.payload = buff.slice(p,buff.length);
msg.parts.index = node.c++;
msg.parts.count = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
else {
node.buffer = buff.slice(p,buff.length);
node.pendingDones.push(done);
}
if (node.buffer.length == 0) {
done();
}
}
}
//else { } // otherwise drop the message.
} else { // otherwise drop the message.
done();
}
}
});
}
@ -264,16 +297,16 @@ module.exports = function(RED) {
}
function reduceMessageGroup(node,msgs,exp,fixup,count,accumulator,done) {
var msg = msgs.shift();
exp.assign("I", msg.parts.index);
function reduceMessageGroup(node,msgInfos,exp,fixup,count,accumulator,done) {
var msgInfo = msgInfos.shift();
exp.assign("I", msgInfo.msg.parts.index);
exp.assign("N", count);
exp.assign("A", accumulator);
RED.util.evaluateJSONataExpression(exp, msg, (err,result) => {
RED.util.evaluateJSONataExpression(exp, msgInfo.msg, (err,result) => {
if (err) {
return done(err);
}
if (msgs.length === 0) {
if (msgInfos.length === 0) {
if (fixup) {
fixup.assign("N", count);
fixup.assign("A", result);
@ -281,39 +314,43 @@ module.exports = function(RED) {
if (err) {
return done(err);
}
node.send({payload: result});
msgInfo.send({payload: result});
done();
});
} else {
node.send({payload: result});
msgInfo.send({payload: result});
done();
}
} else {
reduceMessageGroup(node,msgs,exp,fixup,count,result,done);
reduceMessageGroup(node,msgInfos,exp,fixup,count,result,done);
}
});
}
function reduceAndSendGroup(node, group, done) {
var is_right = node.reduce_right;
var flag = is_right ? -1 : 1;
var msgs = group.msgs;
var msgInfos = group.msgs;
const preservedMsgInfos = [...msgInfos];
try {
RED.util.evaluateNodeProperty(node.exp_init, node.exp_init_type, node, {}, (err,accum) => {
var reduceExpression = node.reduceExpression;
var fixupExpression = node.fixupExpression;
var count = group.count;
msgs.sort(function(x,y) {
var ix = x.parts.index;
var iy = y.parts.index;
msgInfos.sort(function(x,y) {
var ix = x.msg.parts.index;
var iy = y.msg.parts.index;
if (ix < iy) {return -flag;}
if (ix > iy) {return flag;}
return 0;
});
reduceMessageGroup(node, msgs,reduceExpression,fixupExpression,count,accum,(err,result) => {
reduceMessageGroup(node, msgInfos,reduceExpression,fixupExpression,count,accum,(err,result) => {
if (err) {
preservedMsgInfos.pop(); // omit last message to emit error message
preservedMsgInfos.forEach(mInfo => mInfo.done());
done(err);
return;
} else {
preservedMsgInfos.forEach(mInfo => mInfo.done());
done();
}
})
@ -323,7 +360,8 @@ module.exports = function(RED) {
}
}
function reduceMessage(node, msg, done) {
function reduceMessage(node, msgInfo, done) {
let msg = msgInfo.msg;
if (msg.hasOwnProperty('parts')) {
var parts = msg.parts;
var pending = node.pending;
@ -344,7 +382,7 @@ module.exports = function(RED) {
if (parts.hasOwnProperty('count') && (group.count === undefined)) {
group.count = parts.count;
}
msgs.push(msg);
msgs.push(msgInfo);
pending_count++;
var completeProcess = function(err) {
if (err) {
@ -353,6 +391,13 @@ module.exports = function(RED) {
node.pending_count = pending_count;
var max_msgs = maxKeptMsgsCount(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
Object.values(node.pending).forEach(group => {
group.msgs.forEach(mInfo => {
if (mInfo.msg._msgid !== msgInfo.msg._msgid) {
mInfo.done();
}
});
});
node.pending = {};
node.pending_count = 0;
done(RED._("join.too-many"));
@ -368,7 +413,8 @@ module.exports = function(RED) {
completeProcess();
}
} else {
node.send(msg);
msgInfo.send(msg);
msgInfo.done();
done();
}
}
@ -480,7 +526,9 @@ module.exports = function(RED) {
delete group.msg.parts;
}
delete group.msg.complete;
node.send(RED.util.cloneMessage(group.msg));
group.send(RED.util.cloneMessage(group.msg));
group.dones.forEach(f => f());
group.dones = [];
}
var pendingMessages = [];
@ -489,10 +537,10 @@ module.exports = function(RED) {
// groups may overlap and cause unexpected results. The use of JSONata
// means some async processing *might* occur if flow/global context is
// accessed.
var processReduceMessageQueue = function(msg) {
if (msg) {
var processReduceMessageQueue = function(msgInfo) {
if (msgInfo) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
pendingMessages.push(msgInfo);
if (activeMessage !== null) {
// The node is currently processing a message, so do nothing
// more with this message
@ -508,22 +556,23 @@ module.exports = function(RED) {
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
var nextMsgInfo = pendingMessages.shift();
activeMessage = true;
reduceMessage(node, nextMsg, err => {
reduceMessage(node, nextMsgInfo, err => {
if (err) {
node.error(err,nextMsg);
}
nextMsgInfo.done(err);//.error(err,nextMsg);
}
activeMessage = null;
processReduceMessageQueue();
})
}
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
try {
var property;
if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
done();
return;
}
@ -535,6 +584,7 @@ module.exports = function(RED) {
property = RED.util.getMessageProperty(msg,node.property);
} catch(err) {
node.warn("Message property "+node.property+" not found");
done();
return;
}
}
@ -557,7 +607,7 @@ module.exports = function(RED) {
propertyIndex = msg.parts.index;
}
else if (node.mode === 'reduce') {
return processReduceMessageQueue(msg);
return processReduceMessageQueue({msg, send, done});
}
else {
// Use the node configuration to identify all of the group information
@ -578,9 +628,11 @@ module.exports = function(RED) {
if (inflight[partId].timeout) {
clearTimeout(inflight[partId].timeout);
}
inflight[partId].dones.forEach(f => f());
delete inflight[partId]
}
return
done();
return;
}
if ((payloadType === 'object') && (propertyKey === null || propertyKey === undefined || propertyKey === "")) {
@ -591,6 +643,7 @@ module.exports = function(RED) {
if (msg.hasOwnProperty('complete')) {
if (inflight[partId]) {
inflight[partId].msg.complete = msg.complete;
inflight[partId].send = send;
completeSend(partId);
}
}
@ -598,6 +651,7 @@ module.exports = function(RED) {
node.warn("Message missing key property 'msg."+node.key+"' - cannot add to object")
}
}
done();
return;
}
@ -608,7 +662,9 @@ module.exports = function(RED) {
payload:{},
targetCount:targetCount,
type:"object",
msg:RED.util.cloneMessage(msg)
msg:RED.util.cloneMessage(msg),
send: send,
dones: []
};
}
else {
@ -617,7 +673,9 @@ module.exports = function(RED) {
payload:[],
targetCount:targetCount,
type:payloadType,
msg:RED.util.cloneMessage(msg)
msg:RED.util.cloneMessage(msg),
send: send,
dones: []
};
if (payloadType === 'string') {
inflight[partId].joinChar = joinChar;
@ -634,6 +692,7 @@ module.exports = function(RED) {
}, node.timer)
}
}
inflight[partId].dones.push(done);
var group = inflight[partId];
if (payloadType === 'buffer') {
@ -642,7 +701,7 @@ module.exports = function(RED) {
inflight[partId].bufferLen += property.length;
}
else {
node.error(RED._("join.errors.invalid-type",{error:(typeof property)}),msg);
done(RED._("join.errors.invalid-type",{error:(typeof property)}));
return;
}
}
@ -676,6 +735,7 @@ module.exports = function(RED) {
}
}
group.msg = Object.assign(group.msg, msg);
group.send = send;
var tcnt = group.targetCount;
if (msg.hasOwnProperty("parts")) { tcnt = group.targetCount || msg.parts.count; }
if ((tcnt > 0 && group.currentCount >= tcnt) || msg.hasOwnProperty('complete')) {
@ -683,6 +743,7 @@ module.exports = function(RED) {
}
}
catch(err) {
done(err);
console.log(err.stack);
}
});
@ -691,6 +752,7 @@ module.exports = function(RED) {
for (var i in inflight) {
if (inflight.hasOwnProperty(i)) {
clearTimeout(inflight[i].timeout);
inflight[i].dones.forEach(d => d());
}
}
});

View File

@ -1646,4 +1646,143 @@ describe('JOIN node', function() {
});
});
describe('messaging API', function() {
function mapiDoneSplitTestHelper(done, splt, spltType, stream, msgAndTimings) {
const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js");
const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js");
const flow = [
{ id: "splitNode1", type:"split", splt, spltType, stream, wires: [[]]},
{ id: "completeNode1", type: "complete", scope: ["splitNode1"], uncaught: false, wires: [["helperNode1"]] },
{ id: "catchNode1", type: "catch", scope: ["splitNode1"], uncaught: false, wires: [["helperNode1"]] },
{ id: "helperNode1", type: "helper", wires: [[]] }];
const numMsgs = msgAndTimings.length;
helper.load([splitNode, completeNode, catchNode], flow, function () {
const splitNode1 = helper.getNode("splitNode1");
const helperNode1 = helper.getNode("helperNode1");
RED.settings.nodeMessageBufferMaxLength = 2;
const t = Date.now();
let c = 0;
helperNode1.on("input", function (msg) {
msg.should.have.a.property('payload');
(Date.now() - t).should.be.approximately(msgAndTimings[msg.seq].avr, msgAndTimings[msg.seq].var);
c += 1;
if (c === numMsgs) {
done();
}
});
for (let i = 0; i < numMsgs; i++) {
setTimeout(function () { splitNode1.receive(msgAndTimings[i].msg); }, msgAndTimings[i].delay);
}
});
}
it('should call done() when message is sent (string)', function (done) {
mapiDoneSplitTestHelper(done, 2, "len", false, [
{ msg: { seq: 0, payload: "12345" }, delay: 0, avr: 0, var: 100 },
]);
});
it('should call done() when message is sent (array)', function (done) {
mapiDoneSplitTestHelper(done, 2, "len", false, [
{ msg: { seq: 0, payload: [0,1,2,3,4] }, delay: 0, avr: 0, var: 100 },
]);
});
it('should call done() when message is sent (object)', function (done) {
mapiDoneSplitTestHelper(done, 2, "len", false, [
{ msg: { seq: 0, payload: {a:1,b:2}}, delay: 0, avr: 0, var: 100 },
]);
});
it('should call done() when consolidated message is emitted (string, len)', function (done) {
mapiDoneSplitTestHelper(done, 5, "len", true, [
{ msg: { seq: 0, payload: "12"}, delay: 0, avr: 500, var: 100 },
{ msg: { seq: 1, payload: "34"}, delay: 200, avr: 500, var: 100 },
{ msg: { seq: 2, payload: "5"}, delay: 500, avr: 500, var: 100 }
]);
});
it('should call done() when consolidated message is emitted (Buffer, len)', function (done) {
mapiDoneSplitTestHelper(done, 5, "len", true, [
{ msg: { seq: 0, payload: Buffer.from("12")}, delay: 0, avr: 500, var: 100 },
{ msg: { seq: 1, payload: Buffer.from("34")}, delay: 200, avr: 500, var: 100 },
{ msg: { seq: 2, payload: Buffer.from("5")}, delay: 500, avr: 500, var: 100 }
]);
});
it('should call done() when consolidated message is emitted (Buffer, str)', function (done) {
mapiDoneSplitTestHelper(done, "5", "str", true, [
{ msg: { seq: 0, payload: Buffer.from("12")}, delay: 0, avr: 500, var: 100 },
{ msg: { seq: 1, payload: Buffer.from("34")}, delay: 200, avr: 500, var: 100 },
{ msg: { seq: 2, payload: Buffer.from("5")}, delay: 500, avr: 500, var: 100 }
]);
});
it('should call done() when consolidated message is emitted (Buffer, bin)', function (done) {
mapiDoneSplitTestHelper(done, "[53]", "bin", true, [
{ msg: { seq: 0, payload: Buffer.from("12")}, delay: 0, avr: 500, var: 100 },
{ msg: { seq: 1, payload: Buffer.from("34")}, delay: 200, avr: 500, var: 100 },
{ msg: { seq: 2, payload: Buffer.from("5")}, delay: 500, avr: 500, var: 100 }
]);
});
function mapiDoneJoinTestHelper(done, joinNodeSetting, msgAndTimings) {
const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js");
const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js");
const flow = [
{ ...joinNodeSetting, id: "joinNode1", type:"join", wires: [[]]},
{ id: "completeNode1", type: "complete", scope: ["joinNode1"], uncaught: false, wires: [["helperNode1"]] },
{ id: "catchNode1", type: "catch", scope: ["joinNode1"], uncaught: false, wires: [["helperNode1"]] },
{ id: "helperNode1", type: "helper", wires: [[]] }];
const numMsgs = msgAndTimings.length;
helper.load([joinNode, completeNode, catchNode], flow, function () {
const joinNode1 = helper.getNode("joinNode1");
const helperNode1 = helper.getNode("helperNode1");
RED.settings.nodeMessageBufferMaxLength = 3;
const t = Date.now();
let c = 0;
helperNode1.on("input", function (msg) {
msg.should.have.a.property('payload');
(Date.now() - t).should.be.approximately(msgAndTimings[msg.seq].avr, msgAndTimings[msg.seq].var);
c += 1;
if (c === numMsgs) {
done();
}
});
for (let i = 0; i < numMsgs; i++) {
setTimeout(function () { joinNode1.receive(msgAndTimings[i].msg); }, msgAndTimings[i].delay);
}
});
}
it('should call done() when all messages are joined', function (done) {
mapiDoneJoinTestHelper(done, {mode:"auto", timeout:1}, [
{ msg: {seq:0, payload:"A", parts:{id:1, type:"string", ch:",", index:0, count:3}}, delay:0, avr:500, var:100},
{ msg: {seq:1, payload:"B", parts:{id:1, type:"string", ch:",", index:1, count:3}}, delay:200, avr:500, var:100},
{ msg: {seq:2, payload:"C", parts:{id:1, type:"string", ch:",", index:2, count:3}}, delay:500, avr:500, var:100}
]);
});
it('should call done() when the node is reset', function (done) {
mapiDoneJoinTestHelper(done, {mode:"auto", timeout:1}, [
{ msg: {seq:0, payload:"A", parts:{id:1, type:"string", ch:",", index:0, count:3}}, delay:0, avr:500, var:100},
{ msg: {seq:1, payload:"B", parts:{id:1, type:"string", ch:",", index:1, count:3}}, delay:200, avr:500, var:100},
{ msg: {seq:2, payload:"dummy", reset: true, parts:{id:1}}, delay:500, avr:500, var:100}
]);
});
it('should call done() when timed out', function (done) {
mapiDoneJoinTestHelper(done, {mode:"custom", joiner:",", build:"string", timeout:0.5}, [
{ msg: {seq:0, payload:"A"}, delay:0, avr:500, var:100},
{ msg: {seq:1, payload:"B"}, delay:200, avr:500, var:100},
]);
});
it('should call done() when all messages are reduced', function (done) {
mapiDoneJoinTestHelper(done, {mode:"reduce", reduceRight:false, reduceExp:"$A+payload", reduceInit:"0",
reduceInitType:"num", reduceFixup:undefined}, [
{ msg: {seq:0, payload:3, parts: {index:2, count:3, id:222}}, delay:0, avr:500, var:100},
{ msg: {seq:1, payload:2, parts: {index:1, count:3, id:222}}, delay:200, avr:500, var:100},
{ msg: {seq:2, payload:4, parts: {index:0, count:3, id:222}}, delay:500, avr:500, var:100}
]);
});
it('should call done() regardless of buffer overflow', function (done) {
mapiDoneJoinTestHelper(done, {mode:"reduce", reduceRight:false, reduceExp:"$A+payload", reduceInit:"0",
reduceInitType:"num", reduceFixup:undefined}, [
{ msg: {seq:0, payload:3, parts: {index:2, count:5, id:222}}, delay:0, avr:600, var:100},
{ msg: {seq:1, payload:2, parts: {index:1, count:5, id:222}}, delay:200, avr:600, var:100},
{ msg: {seq:2, payload:4, parts: {index:0, count:5, id:222}}, delay:400, avr:600, var:100},
{ msg: {seq:3, payload:1, parts: {index:3, count:5, id:222}}, delay:600, avr:600, var:100},
]);
});
});
});