1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

fix join node in array mode with repeated messages, and rallow reset all

to close #2866
This commit is contained in:
Dave Conway-Jones 2021-02-09 17:27:58 +00:00
parent f5da2eb633
commit 4cd9b7b050
No known key found for this signature in database
GPG Key ID: 88BA2B8A411BE9FF
2 changed files with 68 additions and 12 deletions

View File

@ -259,7 +259,7 @@ module.exports = function(RED) {
} }
} else { // otherwise drop the message. } else { // otherwise drop the message.
done(); done();
} }
} }
}); });
} }
@ -561,7 +561,7 @@ module.exports = function(RED) {
reduceMessage(node, nextMsgInfo, err => { reduceMessage(node, nextMsgInfo, err => {
if (err) { if (err) {
nextMsgInfo.done(err);//.error(err,nextMsg); nextMsgInfo.done(err);//.error(err,nextMsg);
} }
activeMessage = null; activeMessage = null;
processReduceMessageQueue(); processReduceMessageQueue();
}) })
@ -570,12 +570,7 @@ module.exports = function(RED) {
this.on("input", function(msg, send, done) { this.on("input", function(msg, send, done) {
try { try {
var property; var property;
if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) { var partId = "_";
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
done();
return;
}
if (node.propertyType == "full") { if (node.propertyType == "full") {
property = msg; property = msg;
} }
@ -589,7 +584,21 @@ module.exports = function(RED) {
} }
} }
var partId; if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) {
// if a blank reset messag erest it all.
if (msg.hasOwnProperty("reset")) {
if (inflight && inflight.hasOwnProperty("partId") && inflight[partId].timeout) {
clearTimeout(inflight[partId].timeout);
}
inflight = {};
}
else {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
}
done();
return;
}
var payloadType; var payloadType;
var propertyKey; var propertyKey;
var targetCount; var targetCount;
@ -611,7 +620,6 @@ module.exports = function(RED) {
} }
else { else {
// Use the node configuration to identify all of the group information // Use the node configuration to identify all of the group information
partId = "_";
payloadType = node.build; payloadType = node.build;
targetCount = node.count; targetCount = node.count;
joinChar = node.joiner; joinChar = node.joiner;
@ -621,6 +629,9 @@ module.exports = function(RED) {
if (node.build === 'object') { if (node.build === 'object') {
propertyKey = RED.util.getMessageProperty(msg,node.key); propertyKey = RED.util.getMessageProperty(msg,node.key);
} }
if (msg.hasOwnProperty("parts")) {
propertyIndex = msg.parts.index;
}
} }
if (msg.hasOwnProperty("reset")) { if (msg.hasOwnProperty("reset")) {
@ -725,8 +736,8 @@ module.exports = function(RED) {
} }
} else { } else {
if (!isNaN(propertyIndex)) { if (!isNaN(propertyIndex)) {
if (group.payload[propertyIndex] == undefined) { group.currentCount++; }
group.payload[propertyIndex] = property; group.payload[propertyIndex] = property;
group.currentCount++;
} else { } else {
if (property !== undefined) { if (property !== undefined) {
group.payload.push(property); group.payload.push(property);
@ -762,4 +773,3 @@ module.exports = function(RED) {
} }
RED.nodes.registerType("join",JoinNode); RED.nodes.registerType("join",JoinNode);
} }

View File

@ -1646,6 +1646,51 @@ describe('JOIN node', function() {
}); });
}); });
it('should handle join an array when using msg.parts and duplicate indexed parts arrive', function (done) {
var flow = [{ id: "n1", type: "join", wires: [["n2"]], joiner: "[44]", joinerType: "bin", build: "array", mode: "auto" },
{ id: "n2", type: "helper" }];
helper.load(joinNode, flow, function () {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
n2.on("input", function (msg) {
try {
msg.should.have.property("payload");
msg.payload[0].should.equal("C");
msg.payload[1].should.equal("D");
done();
}
catch (e) { done(e); }
});
n1.receive({ payload: "A", parts: { id:1, type:"array", ch:",", index:0, count:2 } });
n1.receive({ payload: "B", parts: { id:1, type:"array", ch:",", index:0, count:2 } });
n1.receive({ payload: "C", parts: { id:1, type:"array", ch:",", index:0, count:2 } });
n1.receive({ payload: "D", parts: { id:1, type:"array", ch:",", index:1, count:2 } });
});
});
it('should handle join an array when using msg.parts and duplicate indexed parts arrive and being reset halfway', function (done) {
var flow = [{ id: "n1", type: "join", wires: [["n2"]], joiner: "[44]", joinerType: "bin", build: "array", mode: "auto" },
{ id: "n2", type: "helper" }];
helper.load(joinNode, flow, function () {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
n2.on("input", function (msg) {
try {
msg.should.have.property("payload");
msg.payload[0].should.equal("D");
msg.payload[1].should.equal("C");
done();
}
catch (e) { done(e); }
});
n1.receive({ payload: "A", parts: { id:1, type:"array", ch:",", index:0, count:2 } });
n1.receive({ payload: "B", parts: { id:1, type:"array", ch:",", index:0, count:2 } });
n1.receive({ reset:true });
n1.receive({ payload: "C", parts: { id:1, type:"array", ch:",", index:1, count:2 } });
n1.receive({ payload: "D", parts: { id:1, type:"array", ch:",", index:0, count:2 } });
});
});
describe('messaging API', function() { describe('messaging API', function() {
function mapiDoneSplitTestHelper(done, splt, spltType, stream, msgAndTimings) { function mapiDoneSplitTestHelper(done, splt, spltType, stream, msgAndTimings) {
const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js");
@ -1821,4 +1866,5 @@ describe('JOIN node', function() {
}); });
}) })
}); });