Merge pull request #2750 from node-red-hitachi/split-join-node-mapi

Messaging API support in Split/Join nodes
This commit is contained in:
Nick O'Leary
2020-12-07 13:54:47 +00:00
committed by GitHub
2 changed files with 246 additions and 45 deletions

View File

@@ -17,18 +17,18 @@
module.exports = function(RED) {
"use strict";
function sendArray(node,msg,array) {
function sendArray(node,msg,array,send) {
for (var i = 0; i < array.length-1; i++) {
msg.payload = array[i];
msg.parts.index = node.c++;
if (node.stream !== true) { msg.parts.count = array.length; }
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
if (node.stream !== true) {
msg.payload = array[i];
msg.parts.index = node.c++;
msg.parts.count = array.length;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.c = 0;
}
else { node.remainder = array[i]; }
@@ -67,7 +67,8 @@ module.exports = function(RED) {
}
node.c = 0;
node.buffer = Buffer.from([]);
this.on("input", function(msg) {
node.pendingDones = [];
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("payload")) {
if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack
else { msg.parts = {}; }
@@ -93,14 +94,23 @@ module.exports = function(RED) {
msg.payload = data.substring(pos,pos+node.splt);
msg.parts.index = node.c++;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
node.remainder = data.substring(pos);
if ((node.stream !== true) || (node.remainder.length === node.splt)) {
msg.payload = node.remainder;
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones = [];
done();
node.remainder = "";
} else {
node.pendingDones.push(done);
}
}
else {
@@ -115,7 +125,8 @@ module.exports = function(RED) {
a = msg.payload.split(node.splt);
msg.parts.ch = node.splt; // pass the split char to other end for rejoin
}
sendArray(node,msg,a);
sendArray(node,msg,a,send);
done();
}
}
else if (Array.isArray(msg.payload)) { // then split array into messages
@@ -135,8 +146,9 @@ module.exports = function(RED) {
}
msg.parts.index = i;
pos += node.arraySplt;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
done();
}
else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
var j = 0;
@@ -152,10 +164,11 @@ module.exports = function(RED) {
msg.parts.key = p;
msg.parts.index = j;
msg.parts.count = l;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
j += 1;
}
}
done();
}
else if (Buffer.isBuffer(msg.payload)) {
var len = node.buffer.length + msg.payload.length;
@@ -176,14 +189,23 @@ module.exports = function(RED) {
msg.payload = buff.slice(pos,pos+node.splt);
msg.parts.index = node.c++;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
node.buffer = buff.slice(pos);
if ((node.stream !== true) || (node.buffer.length === node.splt)) {
msg.payload = node.buffer;
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones = [];
done();
node.buffer = Buffer.from([]);
} else {
node.pendingDones.push(done);
}
}
else {
@@ -210,23 +232,34 @@ module.exports = function(RED) {
while (pos > -1) {
msg.payload = buff.slice(p,pos);
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
i++;
p = pos+node.splt.length;
pos = buff.indexOf(node.splt,p);
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
if ((node.stream !== true) && (p < buff.length)) {
msg.payload = buff.slice(p,buff.length);
msg.parts.index = node.c++;
msg.parts.count = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
else {
node.buffer = buff.slice(p,buff.length);
node.pendingDones.push(done);
}
if (node.buffer.length == 0) {
done();
}
}
}
//else { } // otherwise drop the message.
} else { // otherwise drop the message.
done();
}
}
});
}
@@ -264,16 +297,16 @@ module.exports = function(RED) {
}
function reduceMessageGroup(node,msgs,exp,fixup,count,accumulator,done) {
var msg = msgs.shift();
exp.assign("I", msg.parts.index);
function reduceMessageGroup(node,msgInfos,exp,fixup,count,accumulator,done) {
var msgInfo = msgInfos.shift();
exp.assign("I", msgInfo.msg.parts.index);
exp.assign("N", count);
exp.assign("A", accumulator);
RED.util.evaluateJSONataExpression(exp, msg, (err,result) => {
RED.util.evaluateJSONataExpression(exp, msgInfo.msg, (err,result) => {
if (err) {
return done(err);
}
if (msgs.length === 0) {
if (msgInfos.length === 0) {
if (fixup) {
fixup.assign("N", count);
fixup.assign("A", result);
@@ -281,39 +314,43 @@ module.exports = function(RED) {
if (err) {
return done(err);
}
node.send({payload: result});
msgInfo.send({payload: result});
done();
});
} else {
node.send({payload: result});
msgInfo.send({payload: result});
done();
}
} else {
reduceMessageGroup(node,msgs,exp,fixup,count,result,done);
reduceMessageGroup(node,msgInfos,exp,fixup,count,result,done);
}
});
}
function reduceAndSendGroup(node, group, done) {
var is_right = node.reduce_right;
var flag = is_right ? -1 : 1;
var msgs = group.msgs;
var msgInfos = group.msgs;
const preservedMsgInfos = [...msgInfos];
try {
RED.util.evaluateNodeProperty(node.exp_init, node.exp_init_type, node, {}, (err,accum) => {
var reduceExpression = node.reduceExpression;
var fixupExpression = node.fixupExpression;
var count = group.count;
msgs.sort(function(x,y) {
var ix = x.parts.index;
var iy = y.parts.index;
msgInfos.sort(function(x,y) {
var ix = x.msg.parts.index;
var iy = y.msg.parts.index;
if (ix < iy) {return -flag;}
if (ix > iy) {return flag;}
return 0;
});
reduceMessageGroup(node, msgs,reduceExpression,fixupExpression,count,accum,(err,result) => {
reduceMessageGroup(node, msgInfos,reduceExpression,fixupExpression,count,accum,(err,result) => {
if (err) {
preservedMsgInfos.pop(); // omit last message to emit error message
preservedMsgInfos.forEach(mInfo => mInfo.done());
done(err);
return;
} else {
preservedMsgInfos.forEach(mInfo => mInfo.done());
done();
}
})
@@ -323,7 +360,8 @@ module.exports = function(RED) {
}
}
function reduceMessage(node, msg, done) {
function reduceMessage(node, msgInfo, done) {
let msg = msgInfo.msg;
if (msg.hasOwnProperty('parts')) {
var parts = msg.parts;
var pending = node.pending;
@@ -344,7 +382,7 @@ module.exports = function(RED) {
if (parts.hasOwnProperty('count') && (group.count === undefined)) {
group.count = parts.count;
}
msgs.push(msg);
msgs.push(msgInfo);
pending_count++;
var completeProcess = function(err) {
if (err) {
@@ -353,6 +391,13 @@ module.exports = function(RED) {
node.pending_count = pending_count;
var max_msgs = maxKeptMsgsCount(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
Object.values(node.pending).forEach(group => {
group.msgs.forEach(mInfo => {
if (mInfo.msg._msgid !== msgInfo.msg._msgid) {
mInfo.done();
}
});
});
node.pending = {};
node.pending_count = 0;
done(RED._("join.too-many"));
@@ -368,7 +413,8 @@ module.exports = function(RED) {
completeProcess();
}
} else {
node.send(msg);
msgInfo.send(msg);
msgInfo.done();
done();
}
}
@@ -480,7 +526,9 @@ module.exports = function(RED) {
delete group.msg.parts;
}
delete group.msg.complete;
node.send(RED.util.cloneMessage(group.msg));
group.send(RED.util.cloneMessage(group.msg));
group.dones.forEach(f => f());
group.dones = [];
}
var pendingMessages = [];
@@ -489,10 +537,10 @@ module.exports = function(RED) {
// groups may overlap and cause unexpected results. The use of JSONata
// means some async processing *might* occur if flow/global context is
// accessed.
var processReduceMessageQueue = function(msg) {
if (msg) {
var processReduceMessageQueue = function(msgInfo) {
if (msgInfo) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
pendingMessages.push(msgInfo);
if (activeMessage !== null) {
// The node is currently processing a message, so do nothing
// more with this message
@@ -508,22 +556,23 @@ module.exports = function(RED) {
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
var nextMsgInfo = pendingMessages.shift();
activeMessage = true;
reduceMessage(node, nextMsg, err => {
reduceMessage(node, nextMsgInfo, err => {
if (err) {
node.error(err,nextMsg);
}
nextMsgInfo.done(err);//.error(err,nextMsg);
}
activeMessage = null;
processReduceMessageQueue();
})
}
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
try {
var property;
if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
done();
return;
}
@@ -535,6 +584,7 @@ module.exports = function(RED) {
property = RED.util.getMessageProperty(msg,node.property);
} catch(err) {
node.warn("Message property "+node.property+" not found");
done();
return;
}
}
@@ -557,7 +607,7 @@ module.exports = function(RED) {
propertyIndex = msg.parts.index;
}
else if (node.mode === 'reduce') {
return processReduceMessageQueue(msg);
return processReduceMessageQueue({msg, send, done});
}
else {
// Use the node configuration to identify all of the group information
@@ -578,9 +628,11 @@ module.exports = function(RED) {
if (inflight[partId].timeout) {
clearTimeout(inflight[partId].timeout);
}
inflight[partId].dones.forEach(f => f());
delete inflight[partId]
}
return
done();
return;
}
if ((payloadType === 'object') && (propertyKey === null || propertyKey === undefined || propertyKey === "")) {
@@ -591,6 +643,7 @@ module.exports = function(RED) {
if (msg.hasOwnProperty('complete')) {
if (inflight[partId]) {
inflight[partId].msg.complete = msg.complete;
inflight[partId].send = send;
completeSend(partId);
}
}
@@ -598,6 +651,7 @@ module.exports = function(RED) {
node.warn("Message missing key property 'msg."+node.key+"' - cannot add to object")
}
}
done();
return;
}
@@ -608,7 +662,9 @@ module.exports = function(RED) {
payload:{},
targetCount:targetCount,
type:"object",
msg:RED.util.cloneMessage(msg)
msg:RED.util.cloneMessage(msg),
send: send,
dones: []
};
}
else {
@@ -617,7 +673,9 @@ module.exports = function(RED) {
payload:[],
targetCount:targetCount,
type:payloadType,
msg:RED.util.cloneMessage(msg)
msg:RED.util.cloneMessage(msg),
send: send,
dones: []
};
if (payloadType === 'string') {
inflight[partId].joinChar = joinChar;
@@ -634,6 +692,7 @@ module.exports = function(RED) {
}, node.timer)
}
}
inflight[partId].dones.push(done);
var group = inflight[partId];
if (payloadType === 'buffer') {
@@ -642,7 +701,7 @@ module.exports = function(RED) {
inflight[partId].bufferLen += property.length;
}
else {
node.error(RED._("join.errors.invalid-type",{error:(typeof property)}),msg);
done(RED._("join.errors.invalid-type",{error:(typeof property)}));
return;
}
}
@@ -676,6 +735,7 @@ module.exports = function(RED) {
}
}
group.msg = Object.assign(group.msg, msg);
group.send = send;
var tcnt = group.targetCount;
if (msg.hasOwnProperty("parts")) { tcnt = group.targetCount || msg.parts.count; }
if ((tcnt > 0 && group.currentCount >= tcnt) || msg.hasOwnProperty('complete')) {
@@ -683,6 +743,7 @@ module.exports = function(RED) {
}
}
catch(err) {
done(err);
console.log(err.stack);
}
});
@@ -691,6 +752,7 @@ module.exports = function(RED) {
for (var i in inflight) {
if (inflight.hasOwnProperty(i)) {
clearTimeout(inflight[i].timeout);
inflight[i].dones.forEach(d => d());
}
}
});