From 4cd9b7b0504478f853b08da11a6a30b3bd3b824a Mon Sep 17 00:00:00 2001 From: Dave Conway-Jones Date: Tue, 9 Feb 2021 17:27:58 +0000 Subject: [PATCH] fix join node in array mode with repeated messages, and rallow reset all to close #2866 --- .../@node-red/nodes/core/sequence/17-split.js | 34 +++++++++----- test/nodes/core/sequence/17-split_spec.js | 46 +++++++++++++++++++ 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/sequence/17-split.js b/packages/node_modules/@node-red/nodes/core/sequence/17-split.js index 318319f3d..bfc84a8d2 100644 --- a/packages/node_modules/@node-red/nodes/core/sequence/17-split.js +++ b/packages/node_modules/@node-red/nodes/core/sequence/17-split.js @@ -259,7 +259,7 @@ module.exports = function(RED) { } } else { // otherwise drop the message. done(); - } + } } }); } @@ -561,7 +561,7 @@ module.exports = function(RED) { reduceMessage(node, nextMsgInfo, err => { if (err) { nextMsgInfo.done(err);//.error(err,nextMsg); - } + } activeMessage = null; processReduceMessageQueue(); }) @@ -570,12 +570,7 @@ module.exports = function(RED) { this.on("input", function(msg, send, done) { try { var property; - if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) { - node.warn("Message missing msg.parts property - cannot join in 'auto' mode") - done(); - return; - } - + var partId = "_"; if (node.propertyType == "full") { property = msg; } @@ -589,7 +584,21 @@ module.exports = function(RED) { } } - var partId; + if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) { + // if a blank reset messag erest it all. + if (msg.hasOwnProperty("reset")) { + if (inflight && inflight.hasOwnProperty("partId") && inflight[partId].timeout) { + clearTimeout(inflight[partId].timeout); + } + inflight = {}; + } + else { + node.warn("Message missing msg.parts property - cannot join in 'auto' mode") + } + done(); + return; + } + var payloadType; var propertyKey; var targetCount; @@ -611,7 +620,6 @@ module.exports = function(RED) { } else { // Use the node configuration to identify all of the group information - partId = "_"; payloadType = node.build; targetCount = node.count; joinChar = node.joiner; @@ -621,6 +629,9 @@ module.exports = function(RED) { if (node.build === 'object') { propertyKey = RED.util.getMessageProperty(msg,node.key); } + if (msg.hasOwnProperty("parts")) { + propertyIndex = msg.parts.index; + } } if (msg.hasOwnProperty("reset")) { @@ -725,8 +736,8 @@ module.exports = function(RED) { } } else { if (!isNaN(propertyIndex)) { + if (group.payload[propertyIndex] == undefined) { group.currentCount++; } group.payload[propertyIndex] = property; - group.currentCount++; } else { if (property !== undefined) { group.payload.push(property); @@ -762,4 +773,3 @@ module.exports = function(RED) { } RED.nodes.registerType("join",JoinNode); } - diff --git a/test/nodes/core/sequence/17-split_spec.js b/test/nodes/core/sequence/17-split_spec.js index a216be90b..0e4a07345 100644 --- a/test/nodes/core/sequence/17-split_spec.js +++ b/test/nodes/core/sequence/17-split_spec.js @@ -1646,6 +1646,51 @@ describe('JOIN node', function() { }); }); + it('should handle join an array when using msg.parts and duplicate indexed parts arrive', function (done) { + var flow = [{ id: "n1", type: "join", wires: [["n2"]], joiner: "[44]", joinerType: "bin", build: "array", mode: "auto" }, + { id: "n2", type: "helper" }]; + helper.load(joinNode, flow, function () { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + n2.on("input", function (msg) { + try { + msg.should.have.property("payload"); + msg.payload[0].should.equal("C"); + msg.payload[1].should.equal("D"); + done(); + } + catch (e) { done(e); } + }); + n1.receive({ payload: "A", parts: { id:1, type:"array", ch:",", index:0, count:2 } }); + n1.receive({ payload: "B", parts: { id:1, type:"array", ch:",", index:0, count:2 } }); + n1.receive({ payload: "C", parts: { id:1, type:"array", ch:",", index:0, count:2 } }); + n1.receive({ payload: "D", parts: { id:1, type:"array", ch:",", index:1, count:2 } }); + }); + }); + + it('should handle join an array when using msg.parts and duplicate indexed parts arrive and being reset halfway', function (done) { + var flow = [{ id: "n1", type: "join", wires: [["n2"]], joiner: "[44]", joinerType: "bin", build: "array", mode: "auto" }, + { id: "n2", type: "helper" }]; + helper.load(joinNode, flow, function () { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + n2.on("input", function (msg) { + try { + msg.should.have.property("payload"); + msg.payload[0].should.equal("D"); + msg.payload[1].should.equal("C"); + done(); + } + catch (e) { done(e); } + }); + n1.receive({ payload: "A", parts: { id:1, type:"array", ch:",", index:0, count:2 } }); + n1.receive({ payload: "B", parts: { id:1, type:"array", ch:",", index:0, count:2 } }); + n1.receive({ reset:true }); + n1.receive({ payload: "C", parts: { id:1, type:"array", ch:",", index:1, count:2 } }); + n1.receive({ payload: "D", parts: { id:1, type:"array", ch:",", index:0, count:2 } }); + }); + }); + describe('messaging API', function() { function mapiDoneSplitTestHelper(done, splt, spltType, stream, msgAndTimings) { const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); @@ -1821,4 +1866,5 @@ describe('JOIN node', function() { }); }) + });