diff --git a/packages/node_modules/@node-red/nodes/core/sequence/17-split.js b/packages/node_modules/@node-red/nodes/core/sequence/17-split.js index 12e08f8ae..784eb5de8 100644 --- a/packages/node_modules/@node-red/nodes/core/sequence/17-split.js +++ b/packages/node_modules/@node-red/nodes/core/sequence/17-split.js @@ -157,10 +157,10 @@ module.exports = function(RED) { if (a[a.length-1] == '') { swapIdThenDone({done,msg,msgid:origMsgid}, corrIds); } else { - node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgId:corrIds}); + node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgids:corrIds}); } } else { - node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgId:corrIds}); + node.pendingDones.push({done,msg,msgid:origMsgid,corrMsgids:corrIds}); } } else { swapIdThenDone({done,msg,msgid:origMsgid}, corrIds); diff --git a/test/nodes/core/sequence/17-split_spec.js b/test/nodes/core/sequence/17-split_spec.js index 0e4a07345..444644427 100644 --- a/test/nodes/core/sequence/17-split_spec.js +++ b/test/nodes/core/sequence/17-split_spec.js @@ -1865,6 +1865,219 @@ describe('JOIN node', function() { n1.receive(_msg); }); - }) + }); + describe('Correlation log (SPLIT node)', function() { + function nonStreamSplit(done, inputPayload, config) { + const flow = [{ id: "n1", type: "split", stream: false, wires: [["n2"]], ...config }, + { id: "n2", type: "helper" }]; + helper.load(splitNode, flow, function () { + const n1 = helper.getNode("n1"); + const n2 = helper.getNode("n2"); + let n2recvmsgid = []; + n2.on("input", (msg) => { + n2recvmsgid.push(msg._msgid); + }); + n1.receive({ payload: inputPayload }); + setTimeout(function () { + try { + var logEvents = helper.log().args.filter((evt) => { + return (evt[0].event == "node.split.correlate"); + }).map((e) => e[0]); + logEvents[0].value.split.sort().should.eql(n2recvmsgid.sort()); + done(); + } catch (err) { + done(err); + } + }, 20); + }); + + } + it('should emit correlate log when message is splitted (string, mode=len)', function(done) { + // "abcdefg" -> "ab", "cd", "ef", "g": "abcdefg" - ["ab","cd","ef","g"] + nonStreamSplit(done, "abcdefg", {splt:2, spltType:"len"}); + }); + it('should emit correlate logs when messages are splitted/joined (string, mode=len, stream)', function(done) { + // "a","b","c","d" -> "ab","cd": "a" - ["ab"], "b" - ["ab"], "c" - ["cd"], "d" - ["cd"] + const flow = [{ id: "n1", type: "split", splt: 2, spltType: "len", stream: true, wires: [["n2"]]}, + { id: "n2", type: "helper"}]; + helper.load(splitNode, flow, function() { + const n1 = helper.getNode("n1"); + const n2 = helper.getNode("n2"); + let n2recvmsgid = []; + n2.on("input", (msg) => { + n2recvmsgid.push(msg._msgid); + }); + n1.receive({payload: "a", _msgid: "A"}); + n1.receive({payload: "b", _msgid: "B"}); + n1.receive({payload: "c", _msgid: "C"}); + n1.receive({payload: "d", _msgid: "D"}); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter((evt) => { + return(evt[0].event == "node.split.correlate"); + }).map((e) => e[0]); + logEvents.find((e)=>e.msgid=="A").value.split.should.eql([n2recvmsgid[0]]); + logEvents.find((e)=>e.msgid=="B").value.split.should.eql([n2recvmsgid[0]]); + logEvents.find((e)=>e.msgid=="C").value.split.should.eql([n2recvmsgid[1]]); + logEvents.find((e)=>e.msgid=="D").value.split.should.eql([n2recvmsgid[1]]); + done(); + } catch(err) { + done(err); + } + }, 20); + }); + }); + it('should emit correlate logs when message are splitted (string, mode=str)', function (done) { + // "a-b-c-d" -> "a","b","c","d": "a-b-c-d": ["a","b","c","d"] + nonStreamSplit(done, "a-b-c-d", {splt:"-", spliType:"str"}); + }); + it('should emit correlate logs when messages are splitted/joined (string, mode=str, stream)', function(done) { + // "a","b-c","-" -> "ab","c": "a" - ["ab"], "b-c" - ["ab","c"], "-" - ["c"] + const flow = [{ id: "n1", type: "split", splt: "-", spltType: "str", stream: true, wires: [["n2"]]}, + { id: "n2", type: "helper"}]; + helper.load(splitNode, flow, function() { + const n1 = helper.getNode("n1"); + const n2 = helper.getNode("n2"); + let n2recvmsgid = []; + n2.on("input", (msg) => { + n2recvmsgid.push(msg._msgid); + }); + n1.receive({payload: "a", _msgid: "A"}); + n1.receive({payload: "b-c", _msgid: "B"}); + n1.receive({payload: "-", _msgid: "d"}); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter((evt) => { + return(evt[0].event == "node.split.correlate"); + }).map((e) => e[0]); + logEvents.find((e)=>e.msgid=="A").value.split.should.eql([n2recvmsgid[0]]); + logEvents.find((e)=>e.msgid=="B").value.split.should.eql([n2recvmsgid[0],n2recvmsgid[1]]); + logEvents.find((e)=>e.msgid=="d").value.split.should.eql([n2recvmsgid[1]]); + done(); + } catch(err) { + done(err); + } + }, 20); + }); + }); + it('should emit correlate log when a message is splitted (array)', function(done) { + // "[1,2,3]" -> "1","2","3": "[1,2,3]" - ["1","2","3"] + nonStreamSplit(done, [1,2,3], {arraySplt:1, arraySpltType:"len"}); + }); + it('should emit correlate log when a message is splitted (object)', function(done) { + // "{a:1,b:2,c:3}" -> "1","2","3": "{a:1,b:2,c:3}" - ["1","2","3"] + nonStreamSplit(done, {a:1,b:2,c:3}, {}); + }); + it('should emit correlate log when a message is splitted (buffer, mode=len)', function(done) { + // Buffer([1,2,3]) -> Buffer([1]),Buffer([2]),Buffer([3]) + nonStreamSplit(done, Buffer.from([1,2,3]), {splt:1, spltType: "len"}); + }); + it('should emit correlate logs when messages are splitted/joined (buffer, mode=len, stream)', function(done) { + // Buffer([1,2,3]),Buffer([4,5,6]) -> Buffer([1,2]),Buffer([3,4]),Buffer([5,6]): + // [1,2,3] -> [1,2],[3,4], [4,5,6]->[3,4],[5,6] + const flow = [{ id: "n1", type: "split", splt: 2, spltType: "len", stream: true, wires: [["n2"]]}, + { id: "n2", type: "helper"}]; + helper.load(splitNode, flow, function() { + const n1 = helper.getNode("n1"); + const n2 = helper.getNode("n2"); + let n2recvmsgid = []; + n2.on("input", (msg) => { + n2recvmsgid.push(msg._msgid); + }); + n1.receive({payload: Buffer.from([1,2,3]), _msgid: "A"}); + n1.receive({payload: Buffer.from([4,5,6]), _msgid: "B"}); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter((evt) => { + return(evt[0].event == "node.split.correlate"); + }).map((e) => e[0]); + logEvents.find((e)=>e.msgid=="A").value.split.should.eql([n2recvmsgid[0],n2recvmsgid[1]]); + logEvents.find((e)=>e.msgid=="B").value.split.should.eql([n2recvmsgid[1],n2recvmsgid[2]]); + done(); + } catch(err) { + done(err); + } + }, 20); + }); + }); + it('should emit correlate log when a message is splitted (buffer, mode=bin)', function(done) { + // Buffer([1,2,3]) -> Buffer([1]),Buffer([2]),Buffer([3]) + nonStreamSplit(done, Buffer.from([1,2,3]), {splt:"[2]", spltType: "bin"}); + }); + it('should emit correlate logs when messages are splitted/joined (buffer, mode=bin, stream)', function(done) { + // Buffer([1]),Buffer([1,2,3]),Buffer([2]) -> Buffer([1,1]),Buffer([3]): + // [1] - [1,2], [1,2,3] - [1,1],[3], [2] -> [3] + const flow = [{ id: "n1", type: "split", splt: "[2]", spltType: "bin", stream: true, wires: [["n2"]]}, + { id: "n2", type: "helper"}]; + helper.load(splitNode, flow, function() { + const n1 = helper.getNode("n1"); + const n2 = helper.getNode("n2"); + let n2recvmsgid = []; + n2.on("input", (msg) => { + n2recvmsgid.push(msg._msgid); + }); + n1.receive({payload: Buffer.from([1]), _msgid: "A"}); + n1.receive({payload: Buffer.from([1,2,3]), _msgid: "B"}); + n1.receive({payload: Buffer.from([2]), _msgid: "d"}); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter((evt) => { + return(evt[0].event == "node.split.correlate"); + }).map((e) => e[0]); + logEvents.find((e)=>e.msgid=="A").value.split.should.eql([n2recvmsgid[0]]); + logEvents.find((e)=>e.msgid=="B").value.split.should.eql([n2recvmsgid[0],n2recvmsgid[1]]); + logEvents.find((e)=>e.msgid=="d").value.split.should.eql([n2recvmsgid[1]]); + done(); + } catch(err) { + done(err); + } + }, 20); + }); + }); + }); + describe('Correlation log (JOIN node)', function() { + it('should emit correlate log when messages are joined (automatic)', function(done) { + const flow = [ { id: "n1", type: "join", mode: "auto"}]; + helper.load(joinNode, flow, function() { + const n1 = helper.getNode("n1"); + n1.receive({payload:1,parts:{id:"X",type:"array",count:3,len:1,index:0}, _msgid: "A"}); + n1.receive({payload:2,parts:{id:"X",type:"array",count:3,len:1,index:1}, _msgid: "B"}); + n1.receive({payload:3,parts:{id:"X",type:"array",count:3,len:1,index:2}, _msgid: "C"}); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter((evt) => { + return(evt[0].event == "node.join.correlate"); + }).map((e) => e[0]); + logEvents[0].value.join.should.eql(["A","B","C"]); + done(); + } catch(err) { + done(err); + } + }, 20); + }) + }); + it('should emit correlate log when messages are joined (reduce)', function(done) { + const flow = [ { id: "n1", type: "join", mode: "reduce", + reduceRight:false, reduceExp: "$A+payload", reduceInit: "0", + reduceInitType: "num", reduceFixup: "$A/$N" }]; + helper.load(joinNode, flow, function() { + const n1 = helper.getNode("n1"); + n1.receive({payload:1,parts:{id:"X",type:"array",count:3,len:1,index:0}, _msgid: "A"}); + n1.receive({payload:2,parts:{id:"X",type:"array",count:3,len:1,index:1}, _msgid: "B"}); + n1.receive({payload:3,parts:{id:"X",type:"array",count:3,len:1,index:2}, _msgid: "C"}); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter((evt) => { + return(evt[0].event == "node.join.correlate"); + }).map((e) => e[0]); + logEvents[0].value.join.should.eql(["A","B","C"]); + done(); + } catch(err) { + done(err); + } + }, 20); + }) + }); + }) });