diff --git a/packages/node_modules/@node-red/nodes/core/sequence/17-split.js b/packages/node_modules/@node-red/nodes/core/sequence/17-split.js index 3f774d68c..dc422f3da 100644 --- a/packages/node_modules/@node-red/nodes/core/sequence/17-split.js +++ b/packages/node_modules/@node-red/nodes/core/sequence/17-split.js @@ -17,18 +17,18 @@ module.exports = function(RED) { "use strict"; - function sendArray(node,msg,array) { + function sendArray(node,msg,array,send) { for (var i = 0; i < array.length-1; i++) { msg.payload = array[i]; msg.parts.index = node.c++; if (node.stream !== true) { msg.parts.count = array.length; } - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); } if (node.stream !== true) { msg.payload = array[i]; msg.parts.index = node.c++; msg.parts.count = array.length; - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); node.c = 0; } else { node.remainder = array[i]; } @@ -67,7 +67,8 @@ module.exports = function(RED) { } node.c = 0; node.buffer = Buffer.from([]); - this.on("input", function(msg) { + node.pendingDones = []; + this.on("input", function(msg, send, done) { if (msg.hasOwnProperty("payload")) { if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack else { msg.parts = {}; } @@ -93,14 +94,23 @@ module.exports = function(RED) { msg.payload = data.substring(pos,pos+node.splt); msg.parts.index = node.c++; pos += node.splt; - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); + } + if (count > 1) { + node.pendingDones.forEach(d => d()); + node.pendingDones = []; } node.remainder = data.substring(pos); if ((node.stream !== true) || (node.remainder.length === node.splt)) { msg.payload = node.remainder; msg.parts.index = node.c++; - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); + node.pendingDones.forEach(d => d()); + node.pendingDones = []; + done(); node.remainder = ""; + } else { + node.pendingDones.push(done); } } else { @@ -115,7 +125,8 @@ module.exports = function(RED) { a = msg.payload.split(node.splt); msg.parts.ch = node.splt; // pass the split char to other end for rejoin } - sendArray(node,msg,a); + sendArray(node,msg,a,send); + done(); } } else if (Array.isArray(msg.payload)) { // then split array into messages @@ -135,8 +146,9 @@ module.exports = function(RED) { } msg.parts.index = i; pos += node.arraySplt; - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); } + done(); } else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) { var j = 0; @@ -152,10 +164,11 @@ module.exports = function(RED) { msg.parts.key = p; msg.parts.index = j; msg.parts.count = l; - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); j += 1; } } + done(); } else if (Buffer.isBuffer(msg.payload)) { var len = node.buffer.length + msg.payload.length; @@ -176,14 +189,23 @@ module.exports = function(RED) { msg.payload = buff.slice(pos,pos+node.splt); msg.parts.index = node.c++; pos += node.splt; - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); + } + if (count > 1) { + node.pendingDones.forEach(d => d()); + node.pendingDones = []; } node.buffer = buff.slice(pos); if ((node.stream !== true) || (node.buffer.length === node.splt)) { msg.payload = node.buffer; msg.parts.index = node.c++; - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); + node.pendingDones.forEach(d => d()); + node.pendingDones = []; + done(); node.buffer = Buffer.from([]); + } else { + node.pendingDones.push(done); } } else { @@ -210,23 +232,34 @@ module.exports = function(RED) { while (pos > -1) { msg.payload = buff.slice(p,pos); msg.parts.index = node.c++; - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); i++; p = pos+node.splt.length; pos = buff.indexOf(node.splt,p); } + if (count > 1) { + node.pendingDones.forEach(d => d()); + node.pendingDones = []; + } if ((node.stream !== true) && (p < buff.length)) { msg.payload = buff.slice(p,buff.length); msg.parts.index = node.c++; msg.parts.count = node.c++; - node.send(RED.util.cloneMessage(msg)); + send(RED.util.cloneMessage(msg)); + node.pendingDones.forEach(d => d()); + node.pendingDones = []; } else { node.buffer = buff.slice(p,buff.length); + node.pendingDones.push(done); + } + if (node.buffer.length == 0) { + done(); } } - } - //else { } // otherwise drop the message. + } else { // otherwise drop the message. + done(); + } } }); } @@ -264,16 +297,16 @@ module.exports = function(RED) { } - function reduceMessageGroup(node,msgs,exp,fixup,count,accumulator,done) { - var msg = msgs.shift(); - exp.assign("I", msg.parts.index); + function reduceMessageGroup(node,msgInfos,exp,fixup,count,accumulator,done) { + var msgInfo = msgInfos.shift(); + exp.assign("I", msgInfo.msg.parts.index); exp.assign("N", count); exp.assign("A", accumulator); - RED.util.evaluateJSONataExpression(exp, msg, (err,result) => { + RED.util.evaluateJSONataExpression(exp, msgInfo.msg, (err,result) => { if (err) { return done(err); } - if (msgs.length === 0) { + if (msgInfos.length === 0) { if (fixup) { fixup.assign("N", count); fixup.assign("A", result); @@ -281,39 +314,43 @@ module.exports = function(RED) { if (err) { return done(err); } - node.send({payload: result}); + msgInfo.send({payload: result}); done(); }); } else { - node.send({payload: result}); + msgInfo.send({payload: result}); done(); } } else { - reduceMessageGroup(node,msgs,exp,fixup,count,result,done); + reduceMessageGroup(node,msgInfos,exp,fixup,count,result,done); } }); } function reduceAndSendGroup(node, group, done) { var is_right = node.reduce_right; var flag = is_right ? -1 : 1; - var msgs = group.msgs; + var msgInfos = group.msgs; + const preservedMsgInfos = [...msgInfos]; try { RED.util.evaluateNodeProperty(node.exp_init, node.exp_init_type, node, {}, (err,accum) => { var reduceExpression = node.reduceExpression; var fixupExpression = node.fixupExpression; var count = group.count; - msgs.sort(function(x,y) { - var ix = x.parts.index; - var iy = y.parts.index; + msgInfos.sort(function(x,y) { + var ix = x.msg.parts.index; + var iy = y.msg.parts.index; if (ix < iy) {return -flag;} if (ix > iy) {return flag;} return 0; }); - reduceMessageGroup(node, msgs,reduceExpression,fixupExpression,count,accum,(err,result) => { + reduceMessageGroup(node, msgInfos,reduceExpression,fixupExpression,count,accum,(err,result) => { if (err) { + preservedMsgInfos.pop(); // omit last message to emit error message + preservedMsgInfos.forEach(mInfo => mInfo.done()); done(err); return; } else { + preservedMsgInfos.forEach(mInfo => mInfo.done()); done(); } }) @@ -323,7 +360,8 @@ module.exports = function(RED) { } } - function reduceMessage(node, msg, done) { + function reduceMessage(node, msgInfo, done) { + let msg = msgInfo.msg; if (msg.hasOwnProperty('parts')) { var parts = msg.parts; var pending = node.pending; @@ -344,7 +382,7 @@ module.exports = function(RED) { if (parts.hasOwnProperty('count') && (group.count === undefined)) { group.count = parts.count; } - msgs.push(msg); + msgs.push(msgInfo); pending_count++; var completeProcess = function(err) { if (err) { @@ -353,6 +391,13 @@ module.exports = function(RED) { node.pending_count = pending_count; var max_msgs = maxKeptMsgsCount(node); if ((max_msgs > 0) && (pending_count > max_msgs)) { + Object.values(node.pending).forEach(group => { + group.msgs.forEach(mInfo => { + if (mInfo.msg._msgid !== msgInfo.msg._msgid) { + mInfo.done(); + } + }); + }); node.pending = {}; node.pending_count = 0; done(RED._("join.too-many")); @@ -368,7 +413,8 @@ module.exports = function(RED) { completeProcess(); } } else { - node.send(msg); + msgInfo.send(msg); + msgInfo.done(); done(); } } @@ -480,7 +526,9 @@ module.exports = function(RED) { delete group.msg.parts; } delete group.msg.complete; - node.send(RED.util.cloneMessage(group.msg)); + group.send(RED.util.cloneMessage(group.msg)); + group.dones.forEach(f => f()); + group.dones = []; } var pendingMessages = []; @@ -489,10 +537,10 @@ module.exports = function(RED) { // groups may overlap and cause unexpected results. The use of JSONata // means some async processing *might* occur if flow/global context is // accessed. - var processReduceMessageQueue = function(msg) { - if (msg) { + var processReduceMessageQueue = function(msgInfo) { + if (msgInfo) { // A new message has arrived - add it to the message queue - pendingMessages.push(msg); + pendingMessages.push(msgInfo); if (activeMessage !== null) { // The node is currently processing a message, so do nothing // more with this message @@ -508,22 +556,23 @@ module.exports = function(RED) { // There are more messages to process. Get the next message and // start processing it. Recurse back in to check for any more - var nextMsg = pendingMessages.shift(); + var nextMsgInfo = pendingMessages.shift(); activeMessage = true; - reduceMessage(node, nextMsg, err => { + reduceMessage(node, nextMsgInfo, err => { if (err) { - node.error(err,nextMsg); - } + nextMsgInfo.done(err);//.error(err,nextMsg); + } activeMessage = null; processReduceMessageQueue(); }) } - this.on("input", function(msg) { + this.on("input", function(msg, send, done) { try { var property; if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) { node.warn("Message missing msg.parts property - cannot join in 'auto' mode") + done(); return; } @@ -535,6 +584,7 @@ module.exports = function(RED) { property = RED.util.getMessageProperty(msg,node.property); } catch(err) { node.warn("Message property "+node.property+" not found"); + done(); return; } } @@ -557,7 +607,7 @@ module.exports = function(RED) { propertyIndex = msg.parts.index; } else if (node.mode === 'reduce') { - return processReduceMessageQueue(msg); + return processReduceMessageQueue({msg, send, done}); } else { // Use the node configuration to identify all of the group information @@ -578,9 +628,11 @@ module.exports = function(RED) { if (inflight[partId].timeout) { clearTimeout(inflight[partId].timeout); } + inflight[partId].dones.forEach(f => f()); delete inflight[partId] } - return + done(); + return; } if ((payloadType === 'object') && (propertyKey === null || propertyKey === undefined || propertyKey === "")) { @@ -591,6 +643,7 @@ module.exports = function(RED) { if (msg.hasOwnProperty('complete')) { if (inflight[partId]) { inflight[partId].msg.complete = msg.complete; + inflight[partId].send = send; completeSend(partId); } } @@ -598,6 +651,7 @@ module.exports = function(RED) { node.warn("Message missing key property 'msg."+node.key+"' - cannot add to object") } } + done(); return; } @@ -608,7 +662,9 @@ module.exports = function(RED) { payload:{}, targetCount:targetCount, type:"object", - msg:RED.util.cloneMessage(msg) + msg:RED.util.cloneMessage(msg), + send: send, + dones: [] }; } else { @@ -617,7 +673,9 @@ module.exports = function(RED) { payload:[], targetCount:targetCount, type:payloadType, - msg:RED.util.cloneMessage(msg) + msg:RED.util.cloneMessage(msg), + send: send, + dones: [] }; if (payloadType === 'string') { inflight[partId].joinChar = joinChar; @@ -634,6 +692,7 @@ module.exports = function(RED) { }, node.timer) } } + inflight[partId].dones.push(done); var group = inflight[partId]; if (payloadType === 'buffer') { @@ -642,7 +701,7 @@ module.exports = function(RED) { inflight[partId].bufferLen += property.length; } else { - node.error(RED._("join.errors.invalid-type",{error:(typeof property)}),msg); + done(RED._("join.errors.invalid-type",{error:(typeof property)})); return; } } @@ -676,6 +735,7 @@ module.exports = function(RED) { } } group.msg = Object.assign(group.msg, msg); + group.send = send; var tcnt = group.targetCount; if (msg.hasOwnProperty("parts")) { tcnt = group.targetCount || msg.parts.count; } if ((tcnt > 0 && group.currentCount >= tcnt) || msg.hasOwnProperty('complete')) { @@ -683,6 +743,7 @@ module.exports = function(RED) { } } catch(err) { + done(err); console.log(err.stack); } }); @@ -691,6 +752,7 @@ module.exports = function(RED) { for (var i in inflight) { if (inflight.hasOwnProperty(i)) { clearTimeout(inflight[i].timeout); + inflight[i].dones.forEach(d => d()); } } }); diff --git a/test/nodes/core/sequence/17-split_spec.js b/test/nodes/core/sequence/17-split_spec.js index 2c45f0e02..3ffa424b6 100644 --- a/test/nodes/core/sequence/17-split_spec.js +++ b/test/nodes/core/sequence/17-split_spec.js @@ -1646,4 +1646,143 @@ describe('JOIN node', function() { }); }); + describe('messaging API', function() { + function mapiDoneSplitTestHelper(done, splt, spltType, stream, msgAndTimings) { + const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); + const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js"); + const flow = [ + { id: "splitNode1", type:"split", splt, spltType, stream, wires: [[]]}, + { id: "completeNode1", type: "complete", scope: ["splitNode1"], uncaught: false, wires: [["helperNode1"]] }, + { id: "catchNode1", type: "catch", scope: ["splitNode1"], uncaught: false, wires: [["helperNode1"]] }, + { id: "helperNode1", type: "helper", wires: [[]] }]; + const numMsgs = msgAndTimings.length; + helper.load([splitNode, completeNode, catchNode], flow, function () { + const splitNode1 = helper.getNode("splitNode1"); + const helperNode1 = helper.getNode("helperNode1"); + RED.settings.nodeMessageBufferMaxLength = 2; + const t = Date.now(); + let c = 0; + helperNode1.on("input", function (msg) { + msg.should.have.a.property('payload'); + (Date.now() - t).should.be.approximately(msgAndTimings[msg.seq].avr, msgAndTimings[msg.seq].var); + c += 1; + if (c === numMsgs) { + done(); + } + }); + for (let i = 0; i < numMsgs; i++) { + setTimeout(function () { splitNode1.receive(msgAndTimings[i].msg); }, msgAndTimings[i].delay); + } + }); + } + it('should call done() when message is sent (string)', function (done) { + mapiDoneSplitTestHelper(done, 2, "len", false, [ + { msg: { seq: 0, payload: "12345" }, delay: 0, avr: 0, var: 100 }, + ]); + }); + it('should call done() when message is sent (array)', function (done) { + mapiDoneSplitTestHelper(done, 2, "len", false, [ + { msg: { seq: 0, payload: [0,1,2,3,4] }, delay: 0, avr: 0, var: 100 }, + ]); + }); + it('should call done() when message is sent (object)', function (done) { + mapiDoneSplitTestHelper(done, 2, "len", false, [ + { msg: { seq: 0, payload: {a:1,b:2}}, delay: 0, avr: 0, var: 100 }, + ]); + }); + it('should call done() when consolidated message is emitted (string, len)', function (done) { + mapiDoneSplitTestHelper(done, 5, "len", true, [ + { msg: { seq: 0, payload: "12"}, delay: 0, avr: 500, var: 100 }, + { msg: { seq: 1, payload: "34"}, delay: 200, avr: 500, var: 100 }, + { msg: { seq: 2, payload: "5"}, delay: 500, avr: 500, var: 100 } + ]); + }); + it('should call done() when consolidated message is emitted (Buffer, len)', function (done) { + mapiDoneSplitTestHelper(done, 5, "len", true, [ + { msg: { seq: 0, payload: Buffer.from("12")}, delay: 0, avr: 500, var: 100 }, + { msg: { seq: 1, payload: Buffer.from("34")}, delay: 200, avr: 500, var: 100 }, + { msg: { seq: 2, payload: Buffer.from("5")}, delay: 500, avr: 500, var: 100 } + ]); + }); + it('should call done() when consolidated message is emitted (Buffer, str)', function (done) { + mapiDoneSplitTestHelper(done, "5", "str", true, [ + { msg: { seq: 0, payload: Buffer.from("12")}, delay: 0, avr: 500, var: 100 }, + { msg: { seq: 1, payload: Buffer.from("34")}, delay: 200, avr: 500, var: 100 }, + { msg: { seq: 2, payload: Buffer.from("5")}, delay: 500, avr: 500, var: 100 } + ]); + }); + it('should call done() when consolidated message is emitted (Buffer, bin)', function (done) { + mapiDoneSplitTestHelper(done, "[53]", "bin", true, [ + { msg: { seq: 0, payload: Buffer.from("12")}, delay: 0, avr: 500, var: 100 }, + { msg: { seq: 1, payload: Buffer.from("34")}, delay: 200, avr: 500, var: 100 }, + { msg: { seq: 2, payload: Buffer.from("5")}, delay: 500, avr: 500, var: 100 } + ]); + }); + + function mapiDoneJoinTestHelper(done, joinNodeSetting, msgAndTimings) { + const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); + const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js"); + const flow = [ + { ...joinNodeSetting, id: "joinNode1", type:"join", wires: [[]]}, + { id: "completeNode1", type: "complete", scope: ["joinNode1"], uncaught: false, wires: [["helperNode1"]] }, + { id: "catchNode1", type: "catch", scope: ["joinNode1"], uncaught: false, wires: [["helperNode1"]] }, + { id: "helperNode1", type: "helper", wires: [[]] }]; + const numMsgs = msgAndTimings.length; + helper.load([joinNode, completeNode, catchNode], flow, function () { + const joinNode1 = helper.getNode("joinNode1"); + const helperNode1 = helper.getNode("helperNode1"); + RED.settings.nodeMessageBufferMaxLength = 3; + const t = Date.now(); + let c = 0; + helperNode1.on("input", function (msg) { + msg.should.have.a.property('payload'); + (Date.now() - t).should.be.approximately(msgAndTimings[msg.seq].avr, msgAndTimings[msg.seq].var); + c += 1; + if (c === numMsgs) { + done(); + } + }); + for (let i = 0; i < numMsgs; i++) { + setTimeout(function () { joinNode1.receive(msgAndTimings[i].msg); }, msgAndTimings[i].delay); + } + }); + } + it('should call done() when all messages are joined', function (done) { + mapiDoneJoinTestHelper(done, {mode:"auto", timeout:1}, [ + { msg: {seq:0, payload:"A", parts:{id:1, type:"string", ch:",", index:0, count:3}}, delay:0, avr:500, var:100}, + { msg: {seq:1, payload:"B", parts:{id:1, type:"string", ch:",", index:1, count:3}}, delay:200, avr:500, var:100}, + { msg: {seq:2, payload:"C", parts:{id:1, type:"string", ch:",", index:2, count:3}}, delay:500, avr:500, var:100} + ]); + }); + it('should call done() when the node is reset', function (done) { + mapiDoneJoinTestHelper(done, {mode:"auto", timeout:1}, [ + { msg: {seq:0, payload:"A", parts:{id:1, type:"string", ch:",", index:0, count:3}}, delay:0, avr:500, var:100}, + { msg: {seq:1, payload:"B", parts:{id:1, type:"string", ch:",", index:1, count:3}}, delay:200, avr:500, var:100}, + { msg: {seq:2, payload:"dummy", reset: true, parts:{id:1}}, delay:500, avr:500, var:100} + ]); + }); + it('should call done() when timed out', function (done) { + mapiDoneJoinTestHelper(done, {mode:"custom", joiner:",", build:"string", timeout:0.5}, [ + { msg: {seq:0, payload:"A"}, delay:0, avr:500, var:100}, + { msg: {seq:1, payload:"B"}, delay:200, avr:500, var:100}, + ]); + }); + it('should call done() when all messages are reduced', function (done) { + mapiDoneJoinTestHelper(done, {mode:"reduce", reduceRight:false, reduceExp:"$A+payload", reduceInit:"0", + reduceInitType:"num", reduceFixup:undefined}, [ + { msg: {seq:0, payload:3, parts: {index:2, count:3, id:222}}, delay:0, avr:500, var:100}, + { msg: {seq:1, payload:2, parts: {index:1, count:3, id:222}}, delay:200, avr:500, var:100}, + { msg: {seq:2, payload:4, parts: {index:0, count:3, id:222}}, delay:500, avr:500, var:100} + ]); + }); + it('should call done() regardless of buffer overflow', function (done) { + mapiDoneJoinTestHelper(done, {mode:"reduce", reduceRight:false, reduceExp:"$A+payload", reduceInit:"0", + reduceInitType:"num", reduceFixup:undefined}, [ + { msg: {seq:0, payload:3, parts: {index:2, count:5, id:222}}, delay:0, avr:600, var:100}, + { msg: {seq:1, payload:2, parts: {index:1, count:5, id:222}}, delay:200, avr:600, var:100}, + { msg: {seq:2, payload:4, parts: {index:0, count:5, id:222}}, delay:400, avr:600, var:100}, + { msg: {seq:3, payload:1, parts: {index:3, count:5, id:222}}, delay:600, avr:600, var:100}, + ]); + }); + }); });