Merge pull request #4829 from node-red/Let-Batch-node-honour-parts-to-close-early

Let batch node terminate "early" if msg.parts set to end of sequence
This commit is contained in:
Nick O'Leary 2024-07-18 16:34:46 +01:00 committed by GitHub
commit 998219ae9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 37 additions and 6 deletions

View File

@ -36,6 +36,10 @@
<label style="margin-left: 10px; width: 175px;" for="node-input-overlap" data-i18n="batch.count.overlap"></label> <label style="margin-left: 10px; width: 175px;" for="node-input-overlap" data-i18n="batch.count.overlap"></label>
<input type="text" id="node-input-overlap" style="width: 50px;"> <input type="text" id="node-input-overlap" style="width: 50px;">
</div> </div>
<div class="form-row">
<input type="checkbox" id="node-input-honourParts" style="margin-left: 10px; margin-right:10px; vertical-align:top; width:auto;">
<label for="node-input-honourParts" style="width:auto;" data-i18n="batch.honourParts"></label>
</div>
</div> </div>
<div class="node-row-msg-interval"> <div class="node-row-msg-interval">
@ -45,7 +49,7 @@
<span data-i18n="batch.interval.seconds"></span> <span data-i18n="batch.interval.seconds"></span>
</div> </div>
<div class="form-row"> <div class="form-row">
<input type="checkbox" id="node-input-allowEmptySequence" style="margin-left:20px; margin-right: 10px; vertical-align:top; width:auto;"> <input type="checkbox" id="node-input-allowEmptySequence" style="margin-left:20px; margin-right:10px; vertical-align:top; width:auto;">
<label for="node-input-allowEmptySequence" style="width:auto;" data-i18n="batch.interval.empty"></label> <label for="node-input-allowEmptySequence" style="width:auto;" data-i18n="batch.interval.empty"></label>
</div> </div>
</div> </div>
@ -101,6 +105,7 @@
} }
}, },
allowEmptySequence: {value:false}, allowEmptySequence: {value:false},
honourParts: {value:false},
topics: {value:[{topic:""}]} topics: {value:[{topic:""}]}
}, },
inputs:1, inputs:1,

View File

@ -181,6 +181,8 @@ module.exports = function(RED) {
RED.nodes.createNode(this,n); RED.nodes.createNode(this,n);
var node = this; var node = this;
var mode = n.mode || "count"; var mode = n.mode || "count";
var eof = false;
node.honourParts = n.honourParts || false;
node.pending_count = 0; node.pending_count = 0;
if (mode === "count") { if (mode === "count") {
@ -201,9 +203,12 @@ module.exports = function(RED) {
return; return;
} }
var queue = node.pending; var queue = node.pending;
if (node.honourParts && msg.hasOwnProperty("parts")) {
if (msg.parts.index + 1 === msg.parts.count) { eof = true; }
}
queue.push({msg, send, done}); queue.push({msg, send, done});
node.pending_count++; node.pending_count++;
if (queue.length === count) { if (queue.length === count || eof === true) {
send_msgs(node, queue, is_overlap); send_msgs(node, queue, is_overlap);
for (let i = 0; i < queue.length-overlap; i++) { for (let i = 0; i < queue.length-overlap; i++) {
queue[i].done(); queue[i].done();
@ -211,6 +216,7 @@ module.exports = function(RED) {
node.pending = node.pending =
(overlap === 0) ? [] : queue.slice(-overlap); (overlap === 0) ? [] : queue.slice(-overlap);
node.pending_count = 0; node.pending_count = 0;
eof = false;
} }
var max_msgs = max_kept_msgs_count(node); var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) { if ((max_msgs > 0) && (node.pending_count > max_msgs)) {

View File

@ -1114,6 +1114,7 @@
"too-many": "too many pending messages in batch node", "too-many": "too many pending messages in batch node",
"unexpected": "unexpected mode", "unexpected": "unexpected mode",
"no-parts": "no parts property in message", "no-parts": "no parts property in message",
"honourParts": "Allow msg.parts to also complete batch operation.",
"error": { "error": {
"invalid-count": "Invalid count", "invalid-count": "Invalid count",
"invalid-overlap": "Invalid overlap", "invalid-overlap": "Invalid overlap",

View File

@ -98,7 +98,7 @@ describe('BATCH node', function() {
var n2 = helper.getNode("n2"); var n2 = helper.getNode("n2");
check_data(n1, n2, results, done); check_data(n1, n2, results, done);
for(var i = 0; i < 6; i++) { for(var i = 0; i < 6; i++) {
n1.receive({payload: i}); n1.receive({payload: i, parts: { count:6, index:i }});
} }
}); });
} }
@ -168,6 +168,25 @@ describe('BATCH node', function() {
check_count(flow, results, done); check_count(flow, results, done);
}); });
it('should create seq. with count (more sent than count)', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 4, overlap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
[0, 1, 2, 3]
];
check_count(flow, results, done);
});
it('should create seq. with count and terminate early if parts honoured', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 4, overlap: 0, interval: 10, allowEmptySequence:false, honourParts:true, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
[0, 1, 2, 3],
[4, 5]
];
check_count(flow, results, done);
});
it('should create seq. with count and overlap', function(done) { it('should create seq. with count and overlap', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 3, overlap: 2, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]}, var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 3, overlap: 2, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}]; {id:"n2", type:"helper"}];
@ -455,7 +474,7 @@ describe('BATCH node', function() {
function mapiDoneTestHelper(done, mode, count, overlap, interval, allowEmptySequence, msgAndTimings) { function mapiDoneTestHelper(done, mode, count, overlap, interval, allowEmptySequence, msgAndTimings) {
const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js");
const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js"); const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js");
const flow = [{id:"batchNode1", type:"batch", name: "BatchNode", mode, count, overlap, interval, const flow = [{id:"batchNode1", type:"batch", name: "BatchNode", mode, count, overlap, interval,
allowEmptySequence, topics: [{topic: "TA"}], wires:[[]]}, allowEmptySequence, topics: [{topic: "TA"}], wires:[[]]},
{id:"completeNode1",type:"complete",scope: ["batchNode1"],uncaught:false,wires:[["helperNode1"]]}, {id:"completeNode1",type:"complete",scope: ["batchNode1"],uncaught:false,wires:[["helperNode1"]]},
{id:"catchNode1", type:"catch",scope: ["batchNode1"],uncaught:false,wires:[["helperNode1"]]}, {id:"catchNode1", type:"catch",scope: ["batchNode1"],uncaught:false,wires:[["helperNode1"]]},
@ -482,13 +501,13 @@ describe('BATCH node', function() {
} }
it('should call done() when message is sent (mode: count)', function(done) { it('should call done() when message is sent (mode: count)', function(done) {
mapiDoneTestHelper(done, "count", 2, 0, 2, false, [ mapiDoneTestHelper(done, "count", 2, 0, 2, false, [
{ msg: {payload: 0}, delay: 0, avr: 0, var: 100}, { msg: {payload: 0}, delay: 0, avr: 0, var: 100},
{ msg: {payload: 1}, delay: 0, avr: 0, var: 100} { msg: {payload: 1}, delay: 0, avr: 0, var: 100}
]); ]);
}); });
it('should call done() when reset (mode: count)', function(done) { it('should call done() when reset (mode: count)', function(done) {
mapiDoneTestHelper(done, "count", 2, 0, 2, false, [ mapiDoneTestHelper(done, "count", 2, 0, 2, false, [
{ msg: {payload: 0}, delay: 0, avr: 200, var: 100}, { msg: {payload: 0}, delay: 0, avr: 200, var: 100},
{ msg: {payload: 1, reset:true}, delay: 200, avr: 200, var: 100} { msg: {payload: 1, reset:true}, delay: 200, avr: 200, var: 100}
]); ]);