1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00
node-red/nodes/core/logic/19-batch.js
Hiroyasu Nishiyama af71ae649b Initial support of new BATCH node (#1548)
* initial support of BATCH node

* add concat mode & fix for docs and js code

* add tests for BATCH node

* minor correction of typo

* allow interval in float

* fixed message catalog

* add test for too many pending messages & related fixes

* update info document on batchMaxKeptMsgsCount

* fixed close callback

* fixed info document

* add initial topics entry of concat mode
2018-01-17 10:05:01 +00:00

247 lines
7.9 KiB
JavaScript

/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
module.exports = function(RED) {
"use strict";
var _max_kept_msgs_count = undefined;
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "batchMaxKeptMsgsCount";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}
else {
_max_kept_msgs_count = 0;
}
}
return _max_kept_msgs_count;
}
function send_msgs(node, msgs, clone_msg) {
var count = msgs.length;
var msg_id = msgs[0]._msgid;
for (var i = 0; i < count; i++) {
var msg = clone_msg ? RED.util.cloneMessage(msgs[i]) : msgs[i];
if (!msg.hasOwnProperty("parts")) {
msg.parts = {};
}
var parts = msg.parts;
parts.id = msg_id;
parts.index = i;
parts.count = count;
node.send(msg);
}
}
function send_interval(node, allow_empty_seq) {
let msgs = node.pending;
if (msgs.length > 0) {
send_msgs(node, msgs, false);
node.pending = [];
}
else {
if (allow_empty_seq) {
let mid = RED.util.generateId();
let msg = {
payload: null,
parts: {
id: mid,
index: 0,
count: 1
}
};
node.send(msg);
}
}
}
function is_complete(pending, topic) {
if (pending.hasOwnProperty(topic)) {
var p_topic = pending[topic];
var gids = p_topic.gids;
if (gids.length > 0) {
var gid = gids[0];
var groups = p_topic.groups;
var group = groups[gid];
return (group.count === group.msgs.length);
}
}
return false;
}
function get_msgs_of_topic(pending, topic) {
var p_topic = pending[topic];
var groups = p_topic.groups;
var gids = p_topic.gids;
var gid = gids[0];
var group = groups[gid];
return group.msgs;
}
function remove_topic(pending, topic) {
var p_topic = pending[topic];
var groups = p_topic.groups;
var gids = p_topic.gids;
var gid = gids.shift();
delete groups[gid];
}
function try_concat(node, pending) {
var topics = node.topics;
for (var topic of topics) {
if (!is_complete(pending, topic)) {
return;
}
}
var msgs = [];
for (var topic of topics) {
var t_msgs = get_msgs_of_topic(pending, topic);
msgs = msgs.concat(t_msgs);
}
for (var topic of topics) {
remove_topic(pending, topic);
}
send_msgs(node, msgs, false);
node.pending_count -= msgs.length;
}
function add_to_topic_group(pending, topic, gid, msg) {
if (!pending.hasOwnProperty(topic)) {
pending[topic] = { groups: {}, gids: [] };
}
var p_topic = pending[topic];
var groups = p_topic.groups;
var gids = p_topic.gids;
if (!groups.hasOwnProperty(gid)) {
groups[gid] = { msgs: [], count: undefined };
gids.push(gid);
}
var group = groups[gid];
group.msgs.push(msg);
if ((group.count === undefined) &&
msg.parts.hasOwnProperty('count')) {
group.count = msg.parts.count;
}
}
function concat_msg(node, msg) {
var topic = msg.topic;
if(node.topics.indexOf(topic) >= 0) {
if (!msg.hasOwnProperty("parts") ||
!msg.parts.hasOwnProperty("id") ||
!msg.parts.hasOwnProperty("index") ||
!msg.parts.hasOwnProperty("count")) {
node.error(RED._("batch.no-parts"), msg);
return;
}
var gid = msg.parts.id;
var pending = node.pending;
add_to_topic_group(pending, topic, gid, msg);
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
try_concat(node, pending);
}
}
function BatchNode(n) {
RED.nodes.createNode(this,n);
var node = this;
var mode = n.mode || "count";
node.pending_count = 0;
if (mode === "count") {
var count = Number(n.count || 1);
var overwrap = Number(n.overwrap || 0);
var is_overwrap = (overwrap > 0);
if (count <= overwrap) {
node.error(RED._("batch.count.invalid"));
return;
}
node.pending = [];
this.on("input", function(msg) {
var queue = node.pending;
queue.push(msg);
node.pending_count++;
if (queue.length === count) {
send_msgs(node, queue, is_overwrap);
node.pending =
(overwrap === 0) ? [] : queue.slice(-overwrap);
node.pending_count = 0;
}
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
node.pending = [];
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
});
this.on("close", function() {
node.pending_count = 0;
node.pending = [];
});
}
else if (mode === "interval") {
var interval = Number(n.interval || "0") *1000;
var allow_empty_seq = n.allowEmptySequence;
node.pending = []
var timer = setInterval(function() {
send_interval(node, allow_empty_seq);
node.pending_count = 0;
}, interval);
this.on("input", function(msg) {
node.pending.push(msg);
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
node.pending = [];
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
});
this.on("close", function() {
clearInterval(timer);
node.pending = [];
node.pending_count = 0;
});
}
else if(mode === "concat") {
node.topics = (n.topics || []).map(function(x) {
return x.topic;
});
node.pending = {};
this.on("input", function(msg) {
concat_msg(node, msg);
});
this.on("close", function() {
node.pending = {};
node.pending_count = 0;
});
}
else {
node.error(RED._("batch.unexpected"));
}
}
RED.nodes.registerType("batch", BatchNode);
}