From af71ae649b1dc056a4d92bb77ce5e7891155ac8d Mon Sep 17 00:00:00 2001 From: Hiroyasu Nishiyama Date: Wed, 17 Jan 2018 19:05:01 +0900 Subject: [PATCH] Initial support of new BATCH node (#1548) * initial support of BATCH node * add concat mode & fix for docs and js code * add tests for BATCH node * minor correction of typo * allow interval in float * fixed message catalog * add test for too many pending messages & related fixes * update info document on batchMaxKeptMsgsCount * fixed close callback * fixed info document * add initial topics entry of concat mode --- editor/icons/batch.png | Bin 0 -> 712 bytes nodes/core/locales/en-US/messages.json | 27 ++ nodes/core/logic/19-batch.html | 192 ++++++++++++++ nodes/core/logic/19-batch.js | 246 ++++++++++++++++++ settings.js | 3 +- test/nodes/core/logic/19-batch_spec.js | 338 +++++++++++++++++++++++++ 6 files changed, 805 insertions(+), 1 deletion(-) create mode 100644 editor/icons/batch.png create mode 100644 nodes/core/logic/19-batch.html create mode 100644 nodes/core/logic/19-batch.js create mode 100644 test/nodes/core/logic/19-batch_spec.js diff --git a/editor/icons/batch.png b/editor/icons/batch.png new file mode 100644 index 0000000000000000000000000000000000000000..44803d185861d525b735e754d226caa6c428ba33 GIT binary patch literal 712 zcmeAS@N?(olHy`uVBq!ia0vp^B0wz1!3HFCgzU0`6k~CayA#8@b22Z19L@rd$YKTt zZeb8+WSBKa0;u3>W=KRygs+cPa(=E}VoH8es$NBI0Z=sqgH44MkeQoWlBiITo0C^; zRbi_HR$&EXgM{^!6u?SKvTc^23 znW;cugLNB1bt8*G)!XQUJdP9)kWdDT0)yF(%SIm_Wp-Q}9vj^S#$u1Bi(`lfZ*qbJ z>taQR{Q^JuSrw!w-t_rW%(T=@(OSA!*Lm8_jJAfI3e62Yj~&iT1 zZ3Z$b;P6TB8zQaTrY4U+3I3hTYg%KRb*zA!kzv=V<9}x!lyL?nFHcuLmvv4FO#lYl B<5B + + + + + + + + diff --git a/nodes/core/logic/19-batch.js b/nodes/core/logic/19-batch.js new file mode 100644 index 000000000..bdd5b5408 --- /dev/null +++ b/nodes/core/logic/19-batch.js @@ -0,0 +1,246 @@ +/** + * Copyright JS Foundation and other contributors, http://js.foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +module.exports = function(RED) { + "use strict"; + + var _max_kept_msgs_count = undefined; + + function max_kept_msgs_count(node) { + if (_max_kept_msgs_count === undefined) { + var name = "batchMaxKeptMsgsCount"; + if (RED.settings.hasOwnProperty(name)) { + _max_kept_msgs_count = RED.settings[name]; + } + else { + _max_kept_msgs_count = 0; + } + } + return _max_kept_msgs_count; + } + + function send_msgs(node, msgs, clone_msg) { + var count = msgs.length; + var msg_id = msgs[0]._msgid; + for (var i = 0; i < count; i++) { + var msg = clone_msg ? RED.util.cloneMessage(msgs[i]) : msgs[i]; + if (!msg.hasOwnProperty("parts")) { + msg.parts = {}; + } + var parts = msg.parts; + parts.id = msg_id; + parts.index = i; + parts.count = count; + node.send(msg); + } + } + + function send_interval(node, allow_empty_seq) { + let msgs = node.pending; + if (msgs.length > 0) { + send_msgs(node, msgs, false); + node.pending = []; + } + else { + if (allow_empty_seq) { + let mid = RED.util.generateId(); + let msg = { + payload: null, + parts: { + id: mid, + index: 0, + count: 1 + } + }; + node.send(msg); + } + } + } + + function is_complete(pending, topic) { + if (pending.hasOwnProperty(topic)) { + var p_topic = pending[topic]; + var gids = p_topic.gids; + if (gids.length > 0) { + var gid = gids[0]; + var groups = p_topic.groups; + var group = groups[gid]; + return (group.count === group.msgs.length); + } + } + return false; + } + + function get_msgs_of_topic(pending, topic) { + var p_topic = pending[topic]; + var groups = p_topic.groups; + var gids = p_topic.gids; + var gid = gids[0]; + var group = groups[gid]; + return group.msgs; + } + + function remove_topic(pending, topic) { + var p_topic = pending[topic]; + var groups = p_topic.groups; + var gids = p_topic.gids; + var gid = gids.shift(); + delete groups[gid]; + } + + function try_concat(node, pending) { + var topics = node.topics; + for (var topic of topics) { + if (!is_complete(pending, topic)) { + return; + } + } + var msgs = []; + for (var topic of topics) { + var t_msgs = get_msgs_of_topic(pending, topic); + msgs = msgs.concat(t_msgs); + } + for (var topic of topics) { + remove_topic(pending, topic); + } + send_msgs(node, msgs, false); + node.pending_count -= msgs.length; + } + + function add_to_topic_group(pending, topic, gid, msg) { + if (!pending.hasOwnProperty(topic)) { + pending[topic] = { groups: {}, gids: [] }; + } + var p_topic = pending[topic]; + var groups = p_topic.groups; + var gids = p_topic.gids; + if (!groups.hasOwnProperty(gid)) { + groups[gid] = { msgs: [], count: undefined }; + gids.push(gid); + } + var group = groups[gid]; + group.msgs.push(msg); + if ((group.count === undefined) && + msg.parts.hasOwnProperty('count')) { + group.count = msg.parts.count; + } + } + + function concat_msg(node, msg) { + var topic = msg.topic; + if(node.topics.indexOf(topic) >= 0) { + if (!msg.hasOwnProperty("parts") || + !msg.parts.hasOwnProperty("id") || + !msg.parts.hasOwnProperty("index") || + !msg.parts.hasOwnProperty("count")) { + node.error(RED._("batch.no-parts"), msg); + return; + } + var gid = msg.parts.id; + var pending = node.pending; + add_to_topic_group(pending, topic, gid, msg); + node.pending_count++; + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + node.pending = {}; + node.pending_count = 0; + node.error(RED._("batch.too-many"), msg); + } + try_concat(node, pending); + } + } + + function BatchNode(n) { + RED.nodes.createNode(this,n); + var node = this; + var mode = n.mode || "count"; + + node.pending_count = 0; + if (mode === "count") { + var count = Number(n.count || 1); + var overwrap = Number(n.overwrap || 0); + var is_overwrap = (overwrap > 0); + if (count <= overwrap) { + node.error(RED._("batch.count.invalid")); + return; + } + node.pending = []; + this.on("input", function(msg) { + var queue = node.pending; + queue.push(msg); + node.pending_count++; + if (queue.length === count) { + send_msgs(node, queue, is_overwrap); + node.pending = + (overwrap === 0) ? [] : queue.slice(-overwrap); + node.pending_count = 0; + } + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + node.pending = []; + node.pending_count = 0; + node.error(RED._("batch.too-many"), msg); + } + }); + this.on("close", function() { + node.pending_count = 0; + node.pending = []; + }); + } + else if (mode === "interval") { + var interval = Number(n.interval || "0") *1000; + var allow_empty_seq = n.allowEmptySequence; + node.pending = [] + var timer = setInterval(function() { + send_interval(node, allow_empty_seq); + node.pending_count = 0; + }, interval); + this.on("input", function(msg) { + node.pending.push(msg); + node.pending_count++; + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + node.pending = []; + node.pending_count = 0; + node.error(RED._("batch.too-many"), msg); + } + }); + this.on("close", function() { + clearInterval(timer); + node.pending = []; + node.pending_count = 0; + }); + } + else if(mode === "concat") { + node.topics = (n.topics || []).map(function(x) { + return x.topic; + }); + node.pending = {}; + this.on("input", function(msg) { + concat_msg(node, msg); + }); + this.on("close", function() { + node.pending = {}; + node.pending_count = 0; + }); + } + else { + node.error(RED._("batch.unexpected")); + } + } + + RED.nodes.registerType("batch", BatchNode); +} diff --git a/settings.js b/settings.js index cb634d47d..9679c882a 100644 --- a/settings.js +++ b/settings.js @@ -47,9 +47,10 @@ module.exports = { // The maximum length, in characters, of any message sent to the debug sidebar tab debugMaxLength: 1000, - // The maximum number of messages kept internally in sort node. + // The maximum number of messages kept internally in nodes. // Zero or undefined value means not restricting number of messages. //sortMaxKeptMsgsCount: 0, + //batchMaxKeptMsgsCount: 0, // To disable the option for using local files for storing keys and certificates in the TLS configuration // node, set this to true diff --git a/test/nodes/core/logic/19-batch_spec.js b/test/nodes/core/logic/19-batch_spec.js new file mode 100644 index 000000000..aa806f5bf --- /dev/null +++ b/test/nodes/core/logic/19-batch_spec.js @@ -0,0 +1,338 @@ +/** + * Copyright JS Foundation and other contributors, http://js.foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +var should = require("should"); +var batchNode = require("../../../../nodes/core/logic/19-batch.js"); +var helper = require("../../helper.js"); +var RED = require("../../../../red/red.js"); + +describe('BATCH node', function() { + this.timeout(8000); + + before(function(done) { + helper.startServer(done); + }); + + afterEach(function() { + helper.unload(); + RED.settings.batchMaxKeptMsgsCount = 0; + }); + + it('should be loaded with defaults', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + n1.should.have.property('name', 'BatchNode'); + done(); + }); + }); + + function check_parts(msg, id, idx, count) { + msg.should.have.property("parts"); + var parts = msg.parts; + parts.should.have.property("id", id); + parts.should.have.property("index", idx); + parts.should.have.property("count", count); + } + + function check_data(n1, n2, results, done) { + var id = undefined; + var ix0 = 0; // seq no + var ix1 = 0; // loc. in seq + var seq = undefined; + n2.on("input", function(msg) { + try { + if (seq === undefined) { + seq = results[ix0]; + } + var val = seq[ix1]; + msg.should.have.property("payload", val); + if (id === undefined) { + id = msg.parts.id; + } + check_parts(msg, id, ix1, seq.length); + ix1++; + if (ix1 === seq.length) { + ix0++; + ix1 = 0; + seq = undefined; + id = undefined; + if (ix0 === results.length) { + done(); + } + } + } + catch (e) { + done(e); + } + }); + } + + function check_count(flow, results, done) { + try { + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + check_data(n1, n2, results, done); + for(var i = 0; i < 6; i++) { + n1.receive({payload: i}); + } + }); + } + catch (e) { + done(e); + } + } + + function delayed_send(receiver, index, count, delay) { + if (index < count) { + setTimeout(function() { + receiver.receive({payload: index}); + delayed_send(receiver, index+1, count, delay); + }, delay); + } + } + + function check_interval(flow, results, delay, done) { + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + check_data(n1, n2, results, done); + delayed_send(n1, 0, 4, delay); + }); + } + + function check_concat(flow, results, inputs, done) { + try { + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + check_data(n1, n2, results, done); + for(var data of inputs) { + var msg = { + topic: data[0], + payload: data[1], + parts: { + id: data[0], + index: data[2], + count: data[3] + } + }; + n1.receive(msg); + } + }); + } + catch (e) { + done(e); + } + } + + describe('mode: count', function() { + + it('should create seq. with count', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 2, overwrap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [0, 1], + [2, 3], + [4, 5] + ]; + check_count(flow, results, done); + }); + + it('should create seq. with count and overwrap', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 3, overwrap: 2, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [0, 1, 2], + [1, 2, 3], + [2, 3, 4], + [3, 4, 5] + ]; + check_count(flow, results, done); + }); + + it('should handle too many pending messages', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 5, overwrap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + RED.settings.batchMaxKeptMsgsCount = 2; + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "batch"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "batch"); + evt.should.have.property('msg', "batch.too-many"); + done(); + }, 150); + for(var i = 0; i < 3; i++) { + n1.receive({payload: i}); + } + }); + }); + + }); + + describe('mode: interval', function() { + + it('should create seq. with interval', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [0, 1], + [2, 3] + ]; + check_interval(flow, results, 450, done); + }); + + it('should create seq. with interval (in float)', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 0.5, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [0, 1], + [2, 3] + ]; + check_interval(flow, results, 225, done); + }); + + it('should create seq. with interval & not send empty seq', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + // 1300, 2600, 3900, 5200, + [0], [1], [2], [3] + ]; + check_interval(flow, results, 1300, done); + }); + + it('should create seq. with interval & send empty seq', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: true, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + // 1300, 2600, 3900, 5200, + [null], [0], [1], [2], [null], [3] + ]; + check_interval(flow, results, 1300, done); + }); + + it('should handle too many pending messages', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + RED.settings.batchMaxKeptMsgsCount = 2; + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "batch"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "batch"); + evt.should.have.property('msg', "batch.too-many"); + done(); + }, 150); + for(var i = 0; i < 3; i++) { + n1.receive({payload: i}); + } + }); + }); + + }); + + describe('mode: concat', function() { + + it('should concat two seq. (series)', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [2, 3, 0, 1] + ]; + var inputs = [ + ["TB", 0, 0, 2], + ["TB", 1, 1, 2], + ["TA", 2, 0, 2], + ["TA", 3, 1, 2] + ]; + check_concat(flow, results, inputs, done); + }); + + it('should concat two seq. (mixed)', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [2, 3, 0, 1] + ]; + var inputs = [ + ["TA", 2, 0, 2], + ["TB", 0, 0, 2], + ["TA", 3, 1, 2], + ["TB", 1, 1, 2] + ]; + check_concat(flow, results, inputs, done); + }); + + it('should concat three seq.', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}, {topic: "TC"}], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + var results = [ + [2, 3, 0, 1, 4] + ]; + var inputs = [ + ["TC", 4, 0, 1], + ["TB", 0, 0, 2], + ["TB", 1, 1, 2], + ["TA", 2, 0, 2], + ["TA", 3, 1, 2] + ]; + check_concat(flow, results, inputs, done); + }); + + it('should handle too many pending messages', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + RED.settings.batchMaxKeptMsgsCount = 2; + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "batch"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "batch"); + evt.should.have.property('msg', "batch.too-many"); + done(); + }, 150); + var C = 3; + for(var i = 0; i < C; i++) { + var parts_a = {index:i, count:C, id:"A"}; + var parts_b = {index:i, count:C, id:"B"}; + n1.receive({payload: i, topic: "TA", parts:parts_a}); + n1.receive({payload: i, topic: "TB", parts:parts_b}); + } + }); + }); + + }); + +});