diff --git a/editor/icons/batch.png b/editor/icons/batch.png new file mode 100644 index 000000000..44803d185 Binary files /dev/null and b/editor/icons/batch.png differ diff --git a/nodes/core/locales/en-US/messages.json b/nodes/core/locales/en-US/messages.json index 6665387ae..f77858f20 100644 --- a/nodes/core/locales/en-US/messages.json +++ b/nodes/core/locales/en-US/messages.json @@ -875,5 +875,32 @@ "invalid-exp" : "invalid JSONata expression in sort node", "too-many" : "too many pending messages in sort node", "clear" : "clear pending message in sort node" + }, + "batch" : { + "mode": { + "label" : "Mode", + "num-msgs" : "number of messages", + "interval" : "interval in seconds", + "concat" : "concatenate sequences" + }, + "count": { + "label" : "Number of msgs", + "overwrap" : "Overwrap", + "count" : "count", + "invalid" : "Invalid count and overwrap" + }, + "interval": { + "label" : "Interval (sec)", + "seconds" : "seconds", + "sec" : "sec", + "empty" : "send empty message when no message arrives" + }, + "concat": { + "topics-label": "Topics", + "topic" : "topic" + }, + "too-many" : "too many pending messages in batch node", + "unexpected" : "unexpected mode", + "no-parts" : "no parts property in message" } } diff --git a/nodes/core/logic/19-batch.html b/nodes/core/logic/19-batch.html new file mode 100644 index 000000000..07fda4915 --- /dev/null +++ b/nodes/core/logic/19-batch.html @@ -0,0 +1,192 @@ + + + + + + + + + diff --git a/nodes/core/logic/19-batch.js b/nodes/core/logic/19-batch.js new file mode 100644 index 000000000..bdd5b5408 --- /dev/null +++ b/nodes/core/logic/19-batch.js @@ -0,0 +1,246 @@ +/** + * Copyright JS Foundation and other contributors, http://js.foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +module.exports = function(RED) { + "use strict"; + + var _max_kept_msgs_count = undefined; + + function max_kept_msgs_count(node) { + if (_max_kept_msgs_count === undefined) { + var name = "batchMaxKeptMsgsCount"; + if (RED.settings.hasOwnProperty(name)) { + _max_kept_msgs_count = RED.settings[name]; + } + else { + _max_kept_msgs_count = 0; + } + } + return _max_kept_msgs_count; + } + + function send_msgs(node, msgs, clone_msg) { + var count = msgs.length; + var msg_id = msgs[0]._msgid; + for (var i = 0; i < count; i++) { + var msg = clone_msg ? RED.util.cloneMessage(msgs[i]) : msgs[i]; + if (!msg.hasOwnProperty("parts")) { + msg.parts = {}; + } + var parts = msg.parts; + parts.id = msg_id; + parts.index = i; + parts.count = count; + node.send(msg); + } + } + + function send_interval(node, allow_empty_seq) { + let msgs = node.pending; + if (msgs.length > 0) { + send_msgs(node, msgs, false); + node.pending = []; + } + else { + if (allow_empty_seq) { + let mid = RED.util.generateId(); + let msg = { + payload: null, + parts: { + id: mid, + index: 0, + count: 1 + } + }; + node.send(msg); + } + } + } + + function is_complete(pending, topic) { + if (pending.hasOwnProperty(topic)) { + var p_topic = pending[topic]; + var gids = p_topic.gids; + if (gids.length > 0) { + var gid = gids[0]; + var groups = p_topic.groups; + var group = groups[gid]; + return (group.count === group.msgs.length); + } + } + return false; + } + + function get_msgs_of_topic(pending, topic) { + var p_topic = pending[topic]; + var groups = p_topic.groups; + var gids = p_topic.gids; + var gid = gids[0]; + var group = groups[gid]; + return group.msgs; + } + + function remove_topic(pending, topic) { + var p_topic = pending[topic]; + var groups = p_topic.groups; + var gids = p_topic.gids; + var gid = gids.shift(); + delete groups[gid]; + } + + function try_concat(node, pending) { + var topics = node.topics; + for (var topic of topics) { + if (!is_complete(pending, topic)) { + return; + } + } + var msgs = []; + for (var topic of topics) { + var t_msgs = get_msgs_of_topic(pending, topic); + msgs = msgs.concat(t_msgs); + } + for (var topic of topics) { + remove_topic(pending, topic); + } + send_msgs(node, msgs, false); + node.pending_count -= msgs.length; + } + + function add_to_topic_group(pending, topic, gid, msg) { + if (!pending.hasOwnProperty(topic)) { + pending[topic] = { groups: {}, gids: [] }; + } + var p_topic = pending[topic]; + var groups = p_topic.groups; + var gids = p_topic.gids; + if (!groups.hasOwnProperty(gid)) { + groups[gid] = { msgs: [], count: undefined }; + gids.push(gid); + } + var group = groups[gid]; + group.msgs.push(msg); + if ((group.count === undefined) && + msg.parts.hasOwnProperty('count')) { + group.count = msg.parts.count; + } + } + + function concat_msg(node, msg) { + var topic = msg.topic; + if(node.topics.indexOf(topic) >= 0) { + if (!msg.hasOwnProperty("parts") || + !msg.parts.hasOwnProperty("id") || + !msg.parts.hasOwnProperty("index") || + !msg.parts.hasOwnProperty("count")) { + node.error(RED._("batch.no-parts"), msg); + return; + } + var gid = msg.parts.id; + var pending = node.pending; + add_to_topic_group(pending, topic, gid, msg); + node.pending_count++; + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + node.pending = {}; + node.pending_count = 0; + node.error(RED._("batch.too-many"), msg); + } + try_concat(node, pending); + } + } + + function BatchNode(n) { + RED.nodes.createNode(this,n); + var node = this; + var mode = n.mode || "count"; + + node.pending_count = 0; + if (mode === "count") { + var count = Number(n.count || 1); + var overwrap = Number(n.overwrap || 0); + var is_overwrap = (overwrap > 0); + if (count <= overwrap) { + node.error(RED._("batch.count.invalid")); + return; + } + node.pending = []; + this.on("input", function(msg) { + var queue = node.pending; + queue.push(msg); + node.pending_count++; + if (queue.length === count) { + send_msgs(node, queue, is_overwrap); + node.pending = + (overwrap === 0) ? [] : queue.slice(-overwrap); + node.pending_count = 0; + } + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + node.pending = []; + node.pending_count = 0; + node.error(RED._("batch.too-many"), msg); + } + }); + this.on("close", function() { + node.pending_count = 0; + node.pending = []; + }); + } + else if (mode === "interval") { + var interval = Number(n.interval || "0") *1000; + var allow_empty_seq = n.allowEmptySequence; + node.pending = [] + var timer = setInterval(function() { + send_interval(node, allow_empty_seq); + node.pending_count = 0; + }, interval); + this.on("input", function(msg) { + node.pending.push(msg); + node.pending_count++; + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + node.pending = []; + node.pending_count = 0; + node.error(RED._("batch.too-many"), msg); + } + }); + this.on("close", function() { + clearInterval(timer); + node.pending = []; + node.pending_count = 0; + }); + } + else if(mode === "concat") { + node.topics = (n.topics || []).map(function(x) { + return x.topic; + }); + node.pending = {}; + this.on("input", function(msg) { + concat_msg(node, msg); + }); + this.on("close", function() { + node.pending = {}; + node.pending_count = 0; + }); + } + else { + node.error(RED._("batch.unexpected")); + } + } + + RED.nodes.registerType("batch", BatchNode); +} diff --git a/settings.js b/settings.js index cb634d47d..9679c882a 100644 --- a/settings.js +++ b/settings.js @@ -47,9 +47,10 @@ module.exports = { // The maximum length, in characters, of any message sent to the debug sidebar tab debugMaxLength: 1000, - // The maximum number of messages kept internally in sort node. + // The maximum number of messages kept internally in nodes. // Zero or undefined value means not restricting number of messages. //sortMaxKeptMsgsCount: 0, + //batchMaxKeptMsgsCount: 0, // To disable the option for using local files for storing keys and certificates in the TLS configuration // node, set this to true diff --git a/test/nodes/core/logic/19-batch_spec.js b/test/nodes/core/logic/19-batch_spec.js new file mode 100644 index 000000000..aa806f5bf --- /dev/null +++ b/test/nodes/core/logic/19-batch_spec.js @@ -0,0 +1,338 @@ +/** + * Copyright JS Foundation and other contributors, http://js.foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +var should = require("should"); +var batchNode = require("../../../../nodes/core/logic/19-batch.js"); +var helper = require("../../helper.js"); +var RED = require("../../../../red/red.js"); + +describe('BATCH node', function() { + this.timeout(8000); + + before(function(done) { + helper.startServer(done); + }); + + afterEach(function() { + helper.unload(); + RED.settings.batchMaxKeptMsgsCount = 0; + }); + + it('should be loaded with defaults', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + n1.should.have.property('name', 'BatchNode'); + done(); + }); + }); + + function check_parts(msg, id, idx, count) { + msg.should.have.property("parts"); + var parts = msg.parts; + parts.should.have.property("id", id); + parts.should.have.property("index", idx); + parts.should.have.property("count", count); + } + + function check_data(n1, n2, results, done) { + var id = undefined; + var ix0 = 0; // seq no + var ix1 = 0; // loc. in seq + var seq = undefined; + n2.on("input", function(msg) { + try { + if (seq === undefined) { + seq = results[ix0]; + } + var val = seq[ix1]; + msg.should.have.property("payload", val); + if (id === undefined) { + id = msg.parts.id; + } + check_parts(msg, id, ix1, seq.length); + ix1++; + if (ix1 === seq.length) { + ix0++; + ix1 = 0; + seq = undefined; + id = undefined; + if (ix0 === results.length) { + done(); + } + } + } + catch (e) { + done(e); + } + }); + } + + function check_count(flow, results, done) { + try { + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + check_data(n1, n2, results, done); + for(var i = 0; i < 6; i++) { + n1.receive({payload: i}); + } + }); + } + catch (e) { + done(e); + } + } + + function delayed_send(receiver, index, count, delay) { + if (index < count) { + setTimeout(function() { + receiver.receive({payload: index}); + delayed_send(receiver, index+1, count, delay); + }, delay); + } + } + + function check_interval(flow, results, delay, done) { + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + check_data(n1, n2, results, done); + delayed_send(n1, 0, 4, delay); + }); + } + + function check_concat(flow, results, inputs, done) { + try { + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + check_data(n1, n2, results, done); + for(var data of inputs) { + var msg = { + topic: data[0], + payload: data[1], + parts: { + id: data[0], + index: data[2], + count: data[3] + } + }; + n1.receive(msg); + } + }); + } + catch (e) { + done(e); + } + } + + describe('mode: count', function() { + + it('should create seq. with count', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 2, overwrap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [0, 1], + [2, 3], + [4, 5] + ]; + check_count(flow, results, done); + }); + + it('should create seq. with count and overwrap', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 3, overwrap: 2, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [0, 1, 2], + [1, 2, 3], + [2, 3, 4], + [3, 4, 5] + ]; + check_count(flow, results, done); + }); + + it('should handle too many pending messages', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 5, overwrap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + RED.settings.batchMaxKeptMsgsCount = 2; + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "batch"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "batch"); + evt.should.have.property('msg', "batch.too-many"); + done(); + }, 150); + for(var i = 0; i < 3; i++) { + n1.receive({payload: i}); + } + }); + }); + + }); + + describe('mode: interval', function() { + + it('should create seq. with interval', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [0, 1], + [2, 3] + ]; + check_interval(flow, results, 450, done); + }); + + it('should create seq. with interval (in float)', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 0.5, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [0, 1], + [2, 3] + ]; + check_interval(flow, results, 225, done); + }); + + it('should create seq. with interval & not send empty seq', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + // 1300, 2600, 3900, 5200, + [0], [1], [2], [3] + ]; + check_interval(flow, results, 1300, done); + }); + + it('should create seq. with interval & send empty seq', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: true, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + // 1300, 2600, 3900, 5200, + [null], [0], [1], [2], [null], [3] + ]; + check_interval(flow, results, 1300, done); + }); + + it('should handle too many pending messages', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + RED.settings.batchMaxKeptMsgsCount = 2; + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "batch"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "batch"); + evt.should.have.property('msg', "batch.too-many"); + done(); + }, 150); + for(var i = 0; i < 3; i++) { + n1.receive({payload: i}); + } + }); + }); + + }); + + describe('mode: concat', function() { + + it('should concat two seq. (series)', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [2, 3, 0, 1] + ]; + var inputs = [ + ["TB", 0, 0, 2], + ["TB", 1, 1, 2], + ["TA", 2, 0, 2], + ["TA", 3, 1, 2] + ]; + check_concat(flow, results, inputs, done); + }); + + it('should concat two seq. (mixed)', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [2, 3, 0, 1] + ]; + var inputs = [ + ["TA", 2, 0, 2], + ["TB", 0, 0, 2], + ["TA", 3, 1, 2], + ["TB", 1, 1, 2] + ]; + check_concat(flow, results, inputs, done); + }); + + it('should concat three seq.', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}, {topic: "TC"}], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [2, 3, 0, 1, 4] + ]; + var inputs = [ + ["TC", 4, 0, 1], + ["TB", 0, 0, 2], + ["TB", 1, 1, 2], + ["TA", 2, 0, 2], + ["TA", 3, 1, 2] + ]; + check_concat(flow, results, inputs, done); + }); + + it('should handle too many pending messages', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + RED.settings.batchMaxKeptMsgsCount = 2; + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "batch"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "batch"); + evt.should.have.property('msg', "batch.too-many"); + done(); + }, 150); + var C = 3; + for(var i = 0; i < C; i++) { + var parts_a = {index:i, count:C, id:"A"}; + var parts_b = {index:i, count:C, id:"B"}; + n1.receive({payload: i, topic: "TA", parts:parts_a}); + n1.receive({payload: i, topic: "TB", parts:parts_b}); + } + }); + }); + + }); + +});