mirror of
https://github.com/node-red/node-red.git
synced 2023-10-10 13:36:53 +02:00
Initial support of new BATCH node (#1548)
* initial support of BATCH node * add concat mode & fix for docs and js code * add tests for BATCH node * minor correction of typo * allow interval in float * fixed message catalog * add test for too many pending messages & related fixes * update info document on batchMaxKeptMsgsCount * fixed close callback * fixed info document * add initial topics entry of concat mode
This commit is contained in:
parent
9bc72c1a06
commit
af71ae649b
BIN
editor/icons/batch.png
Normal file
BIN
editor/icons/batch.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 712 B |
@ -875,5 +875,32 @@
|
||||
"invalid-exp" : "invalid JSONata expression in sort node",
|
||||
"too-many" : "too many pending messages in sort node",
|
||||
"clear" : "clear pending message in sort node"
|
||||
},
|
||||
"batch" : {
|
||||
"mode": {
|
||||
"label" : "Mode",
|
||||
"num-msgs" : "number of messages",
|
||||
"interval" : "interval in seconds",
|
||||
"concat" : "concatenate sequences"
|
||||
},
|
||||
"count": {
|
||||
"label" : "Number of msgs",
|
||||
"overwrap" : "Overwrap",
|
||||
"count" : "count",
|
||||
"invalid" : "Invalid count and overwrap"
|
||||
},
|
||||
"interval": {
|
||||
"label" : "Interval (sec)",
|
||||
"seconds" : "seconds",
|
||||
"sec" : "sec",
|
||||
"empty" : "send empty message when no message arrives"
|
||||
},
|
||||
"concat": {
|
||||
"topics-label": "Topics",
|
||||
"topic" : "topic"
|
||||
},
|
||||
"too-many" : "too many pending messages in batch node",
|
||||
"unexpected" : "unexpected mode",
|
||||
"no-parts" : "no parts property in message"
|
||||
}
|
||||
}
|
||||
|
192
nodes/core/logic/19-batch.html
Normal file
192
nodes/core/logic/19-batch.html
Normal file
@ -0,0 +1,192 @@
|
||||
<!--
|
||||
Copyright JS Foundation and other contributors, http://js.foundation
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!DOCTYPE html>
|
||||
|
||||
<script type="text/x-red" data-template-name="batch">
|
||||
<div class="form-row">
|
||||
<label for="node-input-mode"><i class="fa fa-dot-circle-o"></i> <span data-i18n="batch.mode.label"></span></label>
|
||||
<select type="text" id="node-input-mode" style="width: 300px;">
|
||||
<option value="count" data-i18n="batch.mode.num-msgs"></option>
|
||||
<option value="interval" data-i18n="batch.mode.interval"></option>
|
||||
<option value="concat" data-i18n="batch.mode.concat"></option>
|
||||
</select>
|
||||
</div>
|
||||
|
||||
<div class="node-row-msg-count">
|
||||
<div class="form-row node-row-count">
|
||||
<label for="node-input-count" data-i18n="batch.count.label"></label>
|
||||
<input type="text" id="node-input-count" data-i18n="[placeholder]batch.count.count" style="width: 50px;">
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="node-row-msg-overwrap">
|
||||
<div class="form-row node-row-overwrap">
|
||||
<label for="node-input-count" data-i18n="batch.count.overwrap"></label>
|
||||
<input type="text" id="node-input-overwrap" data-i18n="[placeholder]batch.count.count" style="width: 50px;">
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="node-row-msg-interval">
|
||||
<div class="form-row node-row-interval">
|
||||
<label for="node-input-interval"> <span data-i18n="batch.interval.label"></span></label>
|
||||
<input type="text" id="node-input-interval" data-i18n="[placeholder]batch.interval.seconds" style="width: 50px;">
|
||||
<span data-i18n="batch.interval.sec"></span>
|
||||
</div>
|
||||
<div class="form-row">
|
||||
<input type="checkbox" id="node-input-allowEmptySequence" style="margin-left:10px; vertical-align:top; width:auto;">
|
||||
<label for="node-input-allowEmptySequence" style="width:auto;" data-i18n="batch.interval.empty"></label>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="node-row-msg-concat">
|
||||
<div class="form-row">
|
||||
<label data-i18n="batch.concat.topics-label"></label>
|
||||
<div class="form-row node-input-topics-container-row">
|
||||
<ol id="node-input-topics-container"></ol>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="form-row">
|
||||
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="node-red:common.label.name"></span></label>
|
||||
<input type="text" id="node-input-name" data-i18n="[placeholder]node-red:common.label.name">
|
||||
</div>
|
||||
|
||||
</script>
|
||||
|
||||
<script type="text/x-red" data-help-name="batch">
|
||||
<p>A function that divides input messages into multiple sequences of messages or concatenates multiple sequences of messages into a single messages sequence.</p>
|
||||
<h3>Details</h3>
|
||||
<h4>group by number of messages</h4>
|
||||
<p>groups incoming messages into sequences of messages with specified counts. The output message group can be overwrapped.</p>
|
||||
<h4>group by interval in seconds</h4>
|
||||
<p>groups incoming messages received withn specified interval into sequences of messages. </p>
|
||||
<h4>concatenate message groups</h4>
|
||||
<p>creates a message sequence based on <code>topic</code> value of incoming message sequences.</p>
|
||||
<p>Target and order of concatenated sequences are specified by <code>Topics</code> value. Selection of concatenated message groups is based on arrival of first message of the group.</p>
|
||||
<p><b>Note:</b> This node internally keeps messages for its operation. In order to prevent unexpected memory usage, maximum number of messages kept can be specified. Default is no limit on number of messages.
|
||||
<ul>
|
||||
<li><code>batchMaxKeptMsgsCount</code> property set in <b>settings.js</b>.</li>
|
||||
</ul>
|
||||
</p>
|
||||
</script>
|
||||
|
||||
<script type="text/javascript">
|
||||
RED.nodes.registerType("batch",{
|
||||
category: "function",
|
||||
color:"#E2D96E",
|
||||
defaults: {
|
||||
name: {value:""},
|
||||
mode: {value:"count"},
|
||||
count: {value:10},
|
||||
overwrap: {value:0},
|
||||
interval: {value:10},
|
||||
allowEmptySequence: {value:false},
|
||||
topics: {value:[{topic:""}]}
|
||||
},
|
||||
inputs:1,
|
||||
outputs:1,
|
||||
icon: "batch.png",
|
||||
label: function() {
|
||||
return this.name || "batch";
|
||||
},
|
||||
labelStyle: function() {
|
||||
return this.name ? "node_label_italic" : "";
|
||||
},
|
||||
oneditprepare: function() {
|
||||
var node = this;
|
||||
var topic_str = node._("batch.concat.topic");
|
||||
|
||||
function resizeTopics(topic) {
|
||||
var newWidth = topic.width();
|
||||
topic.find('.red-ui-typedInput')
|
||||
.typedInput("width",newWidth-15);
|
||||
}
|
||||
|
||||
$("#node-input-topics-container")
|
||||
.css('min-height','200px').css('min-width','430px')
|
||||
.editableList({
|
||||
addItem: function(container, i, opt) {
|
||||
if (!opt.hasOwnProperty('topic')) {
|
||||
opt.topic = "";
|
||||
}
|
||||
var row = $('<div/>').appendTo(container);
|
||||
var valueField = $('<input/>',{
|
||||
class:"node-input-topic-value",
|
||||
type:"text",
|
||||
style:"margin-left: 5px;"
|
||||
}).appendTo(row)
|
||||
.typedInput({default:'str', types:['str']});
|
||||
valueField.typedInput('value', opt.topic);
|
||||
valueField.typedInput('type', 'str');
|
||||
valueField.attr('placeholder', topic_str);
|
||||
resizeTopics(container);
|
||||
},
|
||||
resizeItem: resizeTopics,
|
||||
sortable: true,
|
||||
removable: true
|
||||
});
|
||||
|
||||
$("#node-input-count").spinner({
|
||||
});
|
||||
$("#node-input-overwrap").spinner({
|
||||
});
|
||||
$("#node-input-interval").spinner({
|
||||
});
|
||||
$("#node-input-mode").change(function(e) {
|
||||
var val = $(this).val();
|
||||
$(".node-row-msg-count").toggle(val==="count");
|
||||
$(".node-row-msg-overwrap").toggle(val==="count");
|
||||
$(".node-row-msg-interval").toggle(val==="interval");
|
||||
$(".node-row-msg-concat").toggle(val==="concat");
|
||||
if (val==="concat") {
|
||||
var topics = node.topics;
|
||||
var container = $("#node-input-topics-container");
|
||||
container.editableList('empty');
|
||||
for (var i = 0; i < topics.length; i++) {
|
||||
var topic = topics[i];
|
||||
container.editableList('addItem', topic);
|
||||
}
|
||||
}
|
||||
});
|
||||
},
|
||||
oneditsave: function() {
|
||||
var topics = $("#node-input-topics-container").editableList('items');
|
||||
var node = this;
|
||||
node.topics = [];
|
||||
topics.each(function(i) {
|
||||
var topicData = $(this).data('data');
|
||||
var topic = $(this);
|
||||
var vf = topic.find(".node-input-topic-value");
|
||||
var value = vf.typedInput('value');
|
||||
var type = vf.typedInput('type');
|
||||
var r = {topic:value};
|
||||
node.topics.push(r);
|
||||
});
|
||||
},
|
||||
oneditresize: function(size) {
|
||||
var rows = $("#dialog-form>div:not(.node-input-topics-container-row)");
|
||||
var height = size.height;
|
||||
for (var i = 0; i < rows.size(); i++) {
|
||||
height -= $(rows[i]).outerHeight(true);
|
||||
}
|
||||
var editorRow = $("#dialog-form>div.node-input-topics-container-row");
|
||||
height -= (parseInt(editorRow.css("marginTop"))+parseInt(editorRow.css("marginBottom")));
|
||||
$("#node-input-topics-container").editableList('height',height);
|
||||
}
|
||||
});
|
||||
</script>
|
246
nodes/core/logic/19-batch.js
Normal file
246
nodes/core/logic/19-batch.js
Normal file
@ -0,0 +1,246 @@
|
||||
/**
|
||||
* Copyright JS Foundation and other contributors, http://js.foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
module.exports = function(RED) {
|
||||
"use strict";
|
||||
|
||||
var _max_kept_msgs_count = undefined;
|
||||
|
||||
function max_kept_msgs_count(node) {
|
||||
if (_max_kept_msgs_count === undefined) {
|
||||
var name = "batchMaxKeptMsgsCount";
|
||||
if (RED.settings.hasOwnProperty(name)) {
|
||||
_max_kept_msgs_count = RED.settings[name];
|
||||
}
|
||||
else {
|
||||
_max_kept_msgs_count = 0;
|
||||
}
|
||||
}
|
||||
return _max_kept_msgs_count;
|
||||
}
|
||||
|
||||
function send_msgs(node, msgs, clone_msg) {
|
||||
var count = msgs.length;
|
||||
var msg_id = msgs[0]._msgid;
|
||||
for (var i = 0; i < count; i++) {
|
||||
var msg = clone_msg ? RED.util.cloneMessage(msgs[i]) : msgs[i];
|
||||
if (!msg.hasOwnProperty("parts")) {
|
||||
msg.parts = {};
|
||||
}
|
||||
var parts = msg.parts;
|
||||
parts.id = msg_id;
|
||||
parts.index = i;
|
||||
parts.count = count;
|
||||
node.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
function send_interval(node, allow_empty_seq) {
|
||||
let msgs = node.pending;
|
||||
if (msgs.length > 0) {
|
||||
send_msgs(node, msgs, false);
|
||||
node.pending = [];
|
||||
}
|
||||
else {
|
||||
if (allow_empty_seq) {
|
||||
let mid = RED.util.generateId();
|
||||
let msg = {
|
||||
payload: null,
|
||||
parts: {
|
||||
id: mid,
|
||||
index: 0,
|
||||
count: 1
|
||||
}
|
||||
};
|
||||
node.send(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function is_complete(pending, topic) {
|
||||
if (pending.hasOwnProperty(topic)) {
|
||||
var p_topic = pending[topic];
|
||||
var gids = p_topic.gids;
|
||||
if (gids.length > 0) {
|
||||
var gid = gids[0];
|
||||
var groups = p_topic.groups;
|
||||
var group = groups[gid];
|
||||
return (group.count === group.msgs.length);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function get_msgs_of_topic(pending, topic) {
|
||||
var p_topic = pending[topic];
|
||||
var groups = p_topic.groups;
|
||||
var gids = p_topic.gids;
|
||||
var gid = gids[0];
|
||||
var group = groups[gid];
|
||||
return group.msgs;
|
||||
}
|
||||
|
||||
function remove_topic(pending, topic) {
|
||||
var p_topic = pending[topic];
|
||||
var groups = p_topic.groups;
|
||||
var gids = p_topic.gids;
|
||||
var gid = gids.shift();
|
||||
delete groups[gid];
|
||||
}
|
||||
|
||||
function try_concat(node, pending) {
|
||||
var topics = node.topics;
|
||||
for (var topic of topics) {
|
||||
if (!is_complete(pending, topic)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
var msgs = [];
|
||||
for (var topic of topics) {
|
||||
var t_msgs = get_msgs_of_topic(pending, topic);
|
||||
msgs = msgs.concat(t_msgs);
|
||||
}
|
||||
for (var topic of topics) {
|
||||
remove_topic(pending, topic);
|
||||
}
|
||||
send_msgs(node, msgs, false);
|
||||
node.pending_count -= msgs.length;
|
||||
}
|
||||
|
||||
function add_to_topic_group(pending, topic, gid, msg) {
|
||||
if (!pending.hasOwnProperty(topic)) {
|
||||
pending[topic] = { groups: {}, gids: [] };
|
||||
}
|
||||
var p_topic = pending[topic];
|
||||
var groups = p_topic.groups;
|
||||
var gids = p_topic.gids;
|
||||
if (!groups.hasOwnProperty(gid)) {
|
||||
groups[gid] = { msgs: [], count: undefined };
|
||||
gids.push(gid);
|
||||
}
|
||||
var group = groups[gid];
|
||||
group.msgs.push(msg);
|
||||
if ((group.count === undefined) &&
|
||||
msg.parts.hasOwnProperty('count')) {
|
||||
group.count = msg.parts.count;
|
||||
}
|
||||
}
|
||||
|
||||
function concat_msg(node, msg) {
|
||||
var topic = msg.topic;
|
||||
if(node.topics.indexOf(topic) >= 0) {
|
||||
if (!msg.hasOwnProperty("parts") ||
|
||||
!msg.parts.hasOwnProperty("id") ||
|
||||
!msg.parts.hasOwnProperty("index") ||
|
||||
!msg.parts.hasOwnProperty("count")) {
|
||||
node.error(RED._("batch.no-parts"), msg);
|
||||
return;
|
||||
}
|
||||
var gid = msg.parts.id;
|
||||
var pending = node.pending;
|
||||
add_to_topic_group(pending, topic, gid, msg);
|
||||
node.pending_count++;
|
||||
var max_msgs = max_kept_msgs_count(node);
|
||||
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
|
||||
node.pending = {};
|
||||
node.pending_count = 0;
|
||||
node.error(RED._("batch.too-many"), msg);
|
||||
}
|
||||
try_concat(node, pending);
|
||||
}
|
||||
}
|
||||
|
||||
function BatchNode(n) {
|
||||
RED.nodes.createNode(this,n);
|
||||
var node = this;
|
||||
var mode = n.mode || "count";
|
||||
|
||||
node.pending_count = 0;
|
||||
if (mode === "count") {
|
||||
var count = Number(n.count || 1);
|
||||
var overwrap = Number(n.overwrap || 0);
|
||||
var is_overwrap = (overwrap > 0);
|
||||
if (count <= overwrap) {
|
||||
node.error(RED._("batch.count.invalid"));
|
||||
return;
|
||||
}
|
||||
node.pending = [];
|
||||
this.on("input", function(msg) {
|
||||
var queue = node.pending;
|
||||
queue.push(msg);
|
||||
node.pending_count++;
|
||||
if (queue.length === count) {
|
||||
send_msgs(node, queue, is_overwrap);
|
||||
node.pending =
|
||||
(overwrap === 0) ? [] : queue.slice(-overwrap);
|
||||
node.pending_count = 0;
|
||||
}
|
||||
var max_msgs = max_kept_msgs_count(node);
|
||||
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
|
||||
node.pending = [];
|
||||
node.pending_count = 0;
|
||||
node.error(RED._("batch.too-many"), msg);
|
||||
}
|
||||
});
|
||||
this.on("close", function() {
|
||||
node.pending_count = 0;
|
||||
node.pending = [];
|
||||
});
|
||||
}
|
||||
else if (mode === "interval") {
|
||||
var interval = Number(n.interval || "0") *1000;
|
||||
var allow_empty_seq = n.allowEmptySequence;
|
||||
node.pending = []
|
||||
var timer = setInterval(function() {
|
||||
send_interval(node, allow_empty_seq);
|
||||
node.pending_count = 0;
|
||||
}, interval);
|
||||
this.on("input", function(msg) {
|
||||
node.pending.push(msg);
|
||||
node.pending_count++;
|
||||
var max_msgs = max_kept_msgs_count(node);
|
||||
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
|
||||
node.pending = [];
|
||||
node.pending_count = 0;
|
||||
node.error(RED._("batch.too-many"), msg);
|
||||
}
|
||||
});
|
||||
this.on("close", function() {
|
||||
clearInterval(timer);
|
||||
node.pending = [];
|
||||
node.pending_count = 0;
|
||||
});
|
||||
}
|
||||
else if(mode === "concat") {
|
||||
node.topics = (n.topics || []).map(function(x) {
|
||||
return x.topic;
|
||||
});
|
||||
node.pending = {};
|
||||
this.on("input", function(msg) {
|
||||
concat_msg(node, msg);
|
||||
});
|
||||
this.on("close", function() {
|
||||
node.pending = {};
|
||||
node.pending_count = 0;
|
||||
});
|
||||
}
|
||||
else {
|
||||
node.error(RED._("batch.unexpected"));
|
||||
}
|
||||
}
|
||||
|
||||
RED.nodes.registerType("batch", BatchNode);
|
||||
}
|
@ -47,9 +47,10 @@ module.exports = {
|
||||
// The maximum length, in characters, of any message sent to the debug sidebar tab
|
||||
debugMaxLength: 1000,
|
||||
|
||||
// The maximum number of messages kept internally in sort node.
|
||||
// The maximum number of messages kept internally in nodes.
|
||||
// Zero or undefined value means not restricting number of messages.
|
||||
//sortMaxKeptMsgsCount: 0,
|
||||
//batchMaxKeptMsgsCount: 0,
|
||||
|
||||
// To disable the option for using local files for storing keys and certificates in the TLS configuration
|
||||
// node, set this to true
|
||||
|
338
test/nodes/core/logic/19-batch_spec.js
Normal file
338
test/nodes/core/logic/19-batch_spec.js
Normal file
@ -0,0 +1,338 @@
|
||||
/**
|
||||
* Copyright JS Foundation and other contributors, http://js.foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
var should = require("should");
|
||||
var batchNode = require("../../../../nodes/core/logic/19-batch.js");
|
||||
var helper = require("../../helper.js");
|
||||
var RED = require("../../../../red/red.js");
|
||||
|
||||
describe('BATCH node', function() {
|
||||
this.timeout(8000);
|
||||
|
||||
before(function(done) {
|
||||
helper.startServer(done);
|
||||
});
|
||||
|
||||
afterEach(function() {
|
||||
helper.unload();
|
||||
RED.settings.batchMaxKeptMsgsCount = 0;
|
||||
});
|
||||
|
||||
it('should be loaded with defaults', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(batchNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
n1.should.have.property('name', 'BatchNode');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
function check_parts(msg, id, idx, count) {
|
||||
msg.should.have.property("parts");
|
||||
var parts = msg.parts;
|
||||
parts.should.have.property("id", id);
|
||||
parts.should.have.property("index", idx);
|
||||
parts.should.have.property("count", count);
|
||||
}
|
||||
|
||||
function check_data(n1, n2, results, done) {
|
||||
var id = undefined;
|
||||
var ix0 = 0; // seq no
|
||||
var ix1 = 0; // loc. in seq
|
||||
var seq = undefined;
|
||||
n2.on("input", function(msg) {
|
||||
try {
|
||||
if (seq === undefined) {
|
||||
seq = results[ix0];
|
||||
}
|
||||
var val = seq[ix1];
|
||||
msg.should.have.property("payload", val);
|
||||
if (id === undefined) {
|
||||
id = msg.parts.id;
|
||||
}
|
||||
check_parts(msg, id, ix1, seq.length);
|
||||
ix1++;
|
||||
if (ix1 === seq.length) {
|
||||
ix0++;
|
||||
ix1 = 0;
|
||||
seq = undefined;
|
||||
id = undefined;
|
||||
if (ix0 === results.length) {
|
||||
done();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function check_count(flow, results, done) {
|
||||
try {
|
||||
helper.load(batchNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
check_data(n1, n2, results, done);
|
||||
for(var i = 0; i < 6; i++) {
|
||||
n1.receive({payload: i});
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (e) {
|
||||
done(e);
|
||||
}
|
||||
}
|
||||
|
||||
function delayed_send(receiver, index, count, delay) {
|
||||
if (index < count) {
|
||||
setTimeout(function() {
|
||||
receiver.receive({payload: index});
|
||||
delayed_send(receiver, index+1, count, delay);
|
||||
}, delay);
|
||||
}
|
||||
}
|
||||
|
||||
function check_interval(flow, results, delay, done) {
|
||||
helper.load(batchNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
check_data(n1, n2, results, done);
|
||||
delayed_send(n1, 0, 4, delay);
|
||||
});
|
||||
}
|
||||
|
||||
function check_concat(flow, results, inputs, done) {
|
||||
try {
|
||||
helper.load(batchNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
check_data(n1, n2, results, done);
|
||||
for(var data of inputs) {
|
||||
var msg = {
|
||||
topic: data[0],
|
||||
payload: data[1],
|
||||
parts: {
|
||||
id: data[0],
|
||||
index: data[2],
|
||||
count: data[3]
|
||||
}
|
||||
};
|
||||
n1.receive(msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (e) {
|
||||
done(e);
|
||||
}
|
||||
}
|
||||
|
||||
describe('mode: count', function() {
|
||||
|
||||
it('should create seq. with count', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 2, overwrap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
var results = [
|
||||
[0, 1],
|
||||
[2, 3],
|
||||
[4, 5]
|
||||
];
|
||||
check_count(flow, results, done);
|
||||
});
|
||||
|
||||
it('should create seq. with count and overwrap', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 3, overwrap: 2, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
var results = [
|
||||
[0, 1, 2],
|
||||
[1, 2, 3],
|
||||
[2, 3, 4],
|
||||
[3, 4, 5]
|
||||
];
|
||||
check_count(flow, results, done);
|
||||
});
|
||||
|
||||
it('should handle too many pending messages', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 5, overwrap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(batchNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
RED.settings.batchMaxKeptMsgsCount = 2;
|
||||
setTimeout(function() {
|
||||
var logEvents = helper.log().args.filter(function (evt) {
|
||||
return evt[0].type == "batch";
|
||||
});
|
||||
var evt = logEvents[0][0];
|
||||
evt.should.have.property('id', "n1");
|
||||
evt.should.have.property('type', "batch");
|
||||
evt.should.have.property('msg', "batch.too-many");
|
||||
done();
|
||||
}, 150);
|
||||
for(var i = 0; i < 3; i++) {
|
||||
n1.receive({payload: i});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
describe('mode: interval', function() {
|
||||
|
||||
it('should create seq. with interval', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
var results = [
|
||||
[0, 1],
|
||||
[2, 3]
|
||||
];
|
||||
check_interval(flow, results, 450, done);
|
||||
});
|
||||
|
||||
it('should create seq. with interval (in float)', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 0.5, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
var results = [
|
||||
[0, 1],
|
||||
[2, 3]
|
||||
];
|
||||
check_interval(flow, results, 225, done);
|
||||
});
|
||||
|
||||
it('should create seq. with interval & not send empty seq', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
var results = [
|
||||
// 1300, 2600, 3900, 5200,
|
||||
[0], [1], [2], [3]
|
||||
];
|
||||
check_interval(flow, results, 1300, done);
|
||||
});
|
||||
|
||||
it('should create seq. with interval & send empty seq', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: true, topics: [], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
var results = [
|
||||
// 1300, 2600, 3900, 5200,
|
||||
[null], [0], [1], [2], [null], [3]
|
||||
];
|
||||
check_interval(flow, results, 1300, done);
|
||||
});
|
||||
|
||||
it('should handle too many pending messages', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(batchNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
RED.settings.batchMaxKeptMsgsCount = 2;
|
||||
setTimeout(function() {
|
||||
var logEvents = helper.log().args.filter(function (evt) {
|
||||
return evt[0].type == "batch";
|
||||
});
|
||||
var evt = logEvents[0][0];
|
||||
evt.should.have.property('id', "n1");
|
||||
evt.should.have.property('type', "batch");
|
||||
evt.should.have.property('msg', "batch.too-many");
|
||||
done();
|
||||
}, 150);
|
||||
for(var i = 0; i < 3; i++) {
|
||||
n1.receive({payload: i});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
describe('mode: concat', function() {
|
||||
|
||||
it('should concat two seq. (series)', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
var results = [
|
||||
[2, 3, 0, 1]
|
||||
];
|
||||
var inputs = [
|
||||
["TB", 0, 0, 2],
|
||||
["TB", 1, 1, 2],
|
||||
["TA", 2, 0, 2],
|
||||
["TA", 3, 1, 2]
|
||||
];
|
||||
check_concat(flow, results, inputs, done);
|
||||
});
|
||||
|
||||
it('should concat two seq. (mixed)', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
var results = [
|
||||
[2, 3, 0, 1]
|
||||
];
|
||||
var inputs = [
|
||||
["TA", 2, 0, 2],
|
||||
["TB", 0, 0, 2],
|
||||
["TA", 3, 1, 2],
|
||||
["TB", 1, 1, 2]
|
||||
];
|
||||
check_concat(flow, results, inputs, done);
|
||||
});
|
||||
|
||||
it('should concat three seq.', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}, {topic: "TC"}], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
var results = [
|
||||
[2, 3, 0, 1, 4]
|
||||
];
|
||||
var inputs = [
|
||||
["TC", 4, 0, 1],
|
||||
["TB", 0, 0, 2],
|
||||
["TB", 1, 1, 2],
|
||||
["TA", 2, 0, 2],
|
||||
["TA", 3, 1, 2]
|
||||
];
|
||||
check_concat(flow, results, inputs, done);
|
||||
});
|
||||
|
||||
it('should handle too many pending messages', function(done) {
|
||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(batchNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
RED.settings.batchMaxKeptMsgsCount = 2;
|
||||
setTimeout(function() {
|
||||
var logEvents = helper.log().args.filter(function (evt) {
|
||||
return evt[0].type == "batch";
|
||||
});
|
||||
var evt = logEvents[0][0];
|
||||
evt.should.have.property('id', "n1");
|
||||
evt.should.have.property('type', "batch");
|
||||
evt.should.have.property('msg', "batch.too-many");
|
||||
done();
|
||||
}, 150);
|
||||
var C = 3;
|
||||
for(var i = 0; i < C; i++) {
|
||||
var parts_a = {index:i, count:C, id:"A"};
|
||||
var parts_b = {index:i, count:C, id:"B"};
|
||||
n1.receive({payload: i, topic: "TA", parts:parts_a});
|
||||
n1.receive({payload: i, topic: "TB", parts:parts_b});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
});
|
Loading…
x
Reference in New Issue
Block a user