From dbfbd54e1f66e68d341d91669f95b6bdfef8ab1e Mon Sep 17 00:00:00 2001 From: Kunihiko Toumura Date: Thu, 29 Oct 2020 16:16:03 +0900 Subject: [PATCH] Messaging API support in Batch node --- .../@node-red/nodes/core/sequence/19-batch.js | 96 +++++++++++++------ test/nodes/core/sequence/19-batch_spec.js | 88 +++++++++++++++++ 2 files changed, 155 insertions(+), 29 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/sequence/19-batch.js b/packages/node_modules/@node-red/nodes/core/sequence/19-batch.js index 77574f222..f3f29df6a 100644 --- a/packages/node_modules/@node-red/nodes/core/sequence/19-batch.js +++ b/packages/node_modules/@node-red/nodes/core/sequence/19-batch.js @@ -32,11 +32,11 @@ module.exports = function(RED) { return _max_kept_msgs_count; } - function send_msgs(node, msgs, clone_msg) { - var count = msgs.length; - var msg_id = msgs[0]._msgid; + function send_msgs(node, msgInfos, clone_msg) { + var count = msgInfos.length; + var msg_id = msgInfos[0].msg._msgid; for (var i = 0; i < count; i++) { - var msg = clone_msg ? RED.util.cloneMessage(msgs[i]) : msgs[i]; + var msg = clone_msg ? RED.util.cloneMessage(msgInfos[i].msg) : msgInfos[i].msg; if (!msg.hasOwnProperty("parts")) { msg.parts = {}; } @@ -44,14 +44,16 @@ module.exports = function(RED) { parts.id = msg_id; parts.index = i; parts.count = count; - node.send(msg); + msgInfos[i].send(msg); + //msgInfos[i].done(); } } function send_interval(node, allow_empty_seq) { - let msgs = node.pending; - if (msgs.length > 0) { - send_msgs(node, msgs, false); + let msgInfos = node.pending; + if (msgInfos.length > 0) { + send_msgs(node, msgInfos, false); + msgInfos.forEach(e => e.done()); node.pending = []; } else { @@ -108,19 +110,20 @@ module.exports = function(RED) { return; } } - var msgs = []; + var msgInfos = []; for (var topic of topics) { - var t_msgs = get_msgs_of_topic(pending, topic); - msgs = msgs.concat(t_msgs); + var t_msgInfos = get_msgs_of_topic(pending, topic); + msgInfos = msgInfos.concat(t_msgInfos); } for (var topic of topics) { remove_topic(pending, topic); } - send_msgs(node, msgs, true); - node.pending_count -= msgs.length; + send_msgs(node, msgInfos, true); + msgInfos.forEach(e => e.done() ); + node.pending_count -= msgInfos.length; } - function add_to_topic_group(pending, topic, gid, msg) { + function add_to_topic_group(pending, topic, gid, msgInfo) { if (!pending.hasOwnProperty(topic)) { pending[topic] = { groups: {}, gids: [] }; } @@ -132,32 +135,43 @@ module.exports = function(RED) { gids.push(gid); } var group = groups[gid]; - group.msgs.push(msg); + group.msgs.push(msgInfo); if ((group.count === undefined) && - msg.parts.hasOwnProperty('count')) { - group.count = msg.parts.count; + msgInfo.msg.parts.hasOwnProperty('count')) { + group.count = msgInfo.msg.parts.count; } } - function concat_msg(node, msg) { + function concat_msg(node, msg, send, done) { var topic = msg.topic; if(node.topics.indexOf(topic) >= 0) { if (!msg.hasOwnProperty("parts") || !msg.parts.hasOwnProperty("id") || !msg.parts.hasOwnProperty("index") || !msg.parts.hasOwnProperty("count")) { - node.error(RED._("batch.no-parts"), msg); + done(RED._("batch.no-parts")); return; } var gid = msg.parts.id; var pending = node.pending; - add_to_topic_group(pending, topic, gid, msg); + add_to_topic_group(pending, topic, gid, {msg, send, done}); node.pending_count++; var max_msgs = max_kept_msgs_count(node); if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + Object.values(node.pending).forEach(p_topic => { + Object.values(p_topic.groups).forEach(group => { + group.msgs.forEach(msgInfo => { + if (msgInfo.msg.id === msg.id) { + // the message that caused the overflow + msgInfo.done(RED._("batch.too-many")); + } else { + msgInfo.done(); + } + }) + }) + }); node.pending = {}; node.pending_count = 0; - node.error(RED._("batch.too-many"), msg); } try_concat(node, pending); } @@ -178,29 +192,37 @@ module.exports = function(RED) { return; } node.pending = []; - this.on("input", function(msg) { + this.on("input", function(msg, send, done) { if (msg.hasOwnProperty("reset")) { + node.pending.forEach(e => e.done()); node.pending = []; node.pending_count = 0; + done(); return; } var queue = node.pending; - queue.push(msg); + queue.push({msg, send, done}); node.pending_count++; if (queue.length === count) { send_msgs(node, queue, is_overlap); + for (let i = 0; i < queue.length-overlap; i++) { + queue[i].done(); + } node.pending = (overlap === 0) ? [] : queue.slice(-overlap); node.pending_count = 0; } var max_msgs = max_kept_msgs_count(node); if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + let lastMInfo = node.pending.pop(); + lastMInfo.done(RED._("batch.too-many")); + node.pending.forEach(e => e.done()); node.pending = []; node.pending_count = 0; - node.error(RED._("batch.too-many"), msg); } }); this.on("close", function() { + node.pending.forEach(e=> e.done()); node.pending_count = 0; node.pending = []; }); @@ -217,31 +239,36 @@ module.exports = function(RED) { if (interval > 0) { timer = setInterval(msgHandler, interval); } - this.on("input", function(msg) { + this.on("input", function(msg, send, done) { if (msg.hasOwnProperty("reset")) { if (timer !== undefined) { clearInterval(timer); } + node.pending.forEach(e => e.done()); node.pending = []; node.pending_count = 0; + done(); if (interval > 0) { timer = setInterval(msgHandler, interval); } return; } - node.pending.push(msg); + node.pending.push({msg, send, done}); node.pending_count++; var max_msgs = max_kept_msgs_count(node); if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + let lastMInfo = node.pending.pop(); + lastMInfo.done(RED._("batch.too-many")); + node.pending.forEach(e => e.done()); node.pending = []; node.pending_count = 0; - node.error(RED._("batch.too-many"), msg); } }); this.on("close", function() { if (timer !== undefined) { clearInterval(timer); } + node.pending.forEach(e => e.done()); node.pending = []; node.pending_count = 0; }); @@ -251,15 +278,26 @@ module.exports = function(RED) { return x.topic; }); node.pending = {}; - this.on("input", function(msg) { + this.on("input", function(msg, send, done) { if (msg.hasOwnProperty("reset")) { + Object.values(node.pending).forEach(p_topic => { + Object.values(p_topic.groups).forEach(group => { + group.msgs.forEach(e => e.done()); + }); + }); node.pending = {}; node.pending_count = 0; + done(); return; } - concat_msg(node, msg); + concat_msg(node, msg, send, done); }); this.on("close", function() { + Object.values(node.pending).forEach(p_topic => { + Object.values(p_topic.groups).forEach(group => { + group.msgs.forEach(e => e.done()); + }); + }); node.pending = {}; node.pending_count = 0; }); diff --git a/test/nodes/core/sequence/19-batch_spec.js b/test/nodes/core/sequence/19-batch_spec.js index b2e1e6de2..2ebcb8d4d 100644 --- a/test/nodes/core/sequence/19-batch_spec.js +++ b/test/nodes/core/sequence/19-batch_spec.js @@ -451,4 +451,92 @@ describe('BATCH node', function() { }); }); + describe('messaging API', function() { + function mapiDoneTestHelper(done, mode, count, overlap, interval, allowEmptySequence, msgAndTimings) { + const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); + const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js"); + const flow = [{id:"batchNode1", type:"batch", name: "BatchNode", mode, count, overlap, interval, + allowEmptySequence, topics: [{topic: "TA"}], wires:[[]]}, + {id:"completeNode1",type:"complete",scope: ["batchNode1"],uncaught:false,wires:[["helperNode1"]]}, + {id:"catchNode1", type:"catch",scope: ["batchNode1"],uncaught:false,wires:[["helperNode1"]]}, + {id:"helperNode1",type:"helper", wires:[[]]}]; + const numMsgs = msgAndTimings.length; + helper.load([batchNode, completeNode, catchNode], flow, function () { + const batchNode1 = helper.getNode("batchNode1"); + const helperNode1 = helper.getNode("helperNode1"); + RED.settings.nodeMessageBufferMaxLength = 2; + const t = Date.now(); + let c = 0; + helperNode1.on("input", function (msg) { + msg.should.have.a.property('payload'); + (Date.now() - t).should.be.approximately(msgAndTimings[msg.payload].avr, msgAndTimings[msg.payload].var); + c += 1; + if ( c === numMsgs) { + done(); + } + }); + for (let i = 0; i < numMsgs; i++) { + setTimeout( function() { batchNode1.receive(msgAndTimings[i].msg); }, msgAndTimings[i].delay); + } + }); + } + + it('should call done() when message is sent (mode: count)', function(done) { + mapiDoneTestHelper(done, "count", 2, 0, 2, false, [ + { msg: {payload: 0}, delay: 0, avr: 0, var: 100}, + { msg: {payload: 1}, delay: 0, avr: 0, var: 100} + ]); + }); + it('should call done() when reset (mode: count)', function(done) { + mapiDoneTestHelper(done, "count", 2, 0, 2, false, [ + { msg: {payload: 0}, delay: 0, avr: 200, var: 100}, + { msg: {payload: 1, reset:true}, delay: 200, avr: 200, var: 100} + ]); + }); + it('should call done() regardless of buffer overflow (mode: count)', function(done) { + mapiDoneTestHelper(done, "count", 10, 0, 2, false, [ + { msg: {payload: 0}, delay: 0, avr: 500, var: 100}, + { msg: {payload: 1}, delay: 100, avr: 500, var: 100}, + { msg: {payload: 2}, delay: 500, avr: 500, var: 100} + ]); + }); + it('should call done() when message is sent (mode: interval)', function(done) { + mapiDoneTestHelper(done, "interval", 2, 0, 2, false, [ + { msg: {payload: 0}, delay: 0, avr: 2000, var: 100}, + { msg: {payload: 1}, delay: 500, avr: 2000, var: 100} + ]); + }); + it('should call done() when reset (mode: interval)', function(done) { + mapiDoneTestHelper(done, "interval", 2, 0, 2, false, [ + { msg: {payload: 0}, delay: 0, avr: 200, var: 100}, + { msg: {payload: 1, reset:true}, delay: 200, avr: 200, var: 100} + ]); + }); + it('should call done() regardless of buffer overflow (mode: interval)', function(done) { + mapiDoneTestHelper(done, "interval", 2, 0, 2, false, [ + { msg: {payload: 0}, delay: 0, avr: 500, var: 100}, + { msg: {payload: 1}, delay: 100, avr: 500, var: 100}, + { msg: {payload: 2}, delay: 500, avr: 500, var: 100} + ]); + }); + it('should call done() when message is sent (mode: concat)', function(done) { + mapiDoneTestHelper(done, "concat", 2, 0, 2, false, [ + { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 0, count: 2}}, delay: 0, avr: 1000, var: 100}, + { msg: {topic:"TA", payload: 1, parts: {id: "TA", index: 1, count: 2}}, delay: 1000, avr: 1000, var: 100}, + ]); + }); + it('should call done() when reset (mode: concat)', function(done) { + mapiDoneTestHelper(done, "concat", 2, 0, 2, false, [ + { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 0, count: 2}}, delay: 0, avr: 1000, var: 100}, + { msg: {payload: 1, reset:true}, delay: 1000, avr: 1000, var: 100}, + ]); + }); + it('should call done() regardless of buffer overflow (mode: concat)', function(done) { + mapiDoneTestHelper(done, "concat", 2, 0, 2, false, [ + { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 0, count: 3}}, delay: 0, avr: 1000, var: 100}, + { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 1, count: 3}}, delay: 500, avr: 1000, var: 100}, + { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 2, count: 3}}, delay: 1000, avr: 1000, var: 100} + ]); + }); + }); });