Messaging API support in Batch node

This commit is contained in:
Kunihiko Toumura
2020-10-29 16:16:03 +09:00
parent 15a600c763
commit dbfbd54e1f
2 changed files with 155 additions and 29 deletions

View File

@@ -32,11 +32,11 @@ module.exports = function(RED) {
return _max_kept_msgs_count;
}
function send_msgs(node, msgs, clone_msg) {
var count = msgs.length;
var msg_id = msgs[0]._msgid;
function send_msgs(node, msgInfos, clone_msg) {
var count = msgInfos.length;
var msg_id = msgInfos[0].msg._msgid;
for (var i = 0; i < count; i++) {
var msg = clone_msg ? RED.util.cloneMessage(msgs[i]) : msgs[i];
var msg = clone_msg ? RED.util.cloneMessage(msgInfos[i].msg) : msgInfos[i].msg;
if (!msg.hasOwnProperty("parts")) {
msg.parts = {};
}
@@ -44,14 +44,16 @@ module.exports = function(RED) {
parts.id = msg_id;
parts.index = i;
parts.count = count;
node.send(msg);
msgInfos[i].send(msg);
//msgInfos[i].done();
}
}
function send_interval(node, allow_empty_seq) {
let msgs = node.pending;
if (msgs.length > 0) {
send_msgs(node, msgs, false);
let msgInfos = node.pending;
if (msgInfos.length > 0) {
send_msgs(node, msgInfos, false);
msgInfos.forEach(e => e.done());
node.pending = [];
}
else {
@@ -108,19 +110,20 @@ module.exports = function(RED) {
return;
}
}
var msgs = [];
var msgInfos = [];
for (var topic of topics) {
var t_msgs = get_msgs_of_topic(pending, topic);
msgs = msgs.concat(t_msgs);
var t_msgInfos = get_msgs_of_topic(pending, topic);
msgInfos = msgInfos.concat(t_msgInfos);
}
for (var topic of topics) {
remove_topic(pending, topic);
}
send_msgs(node, msgs, true);
node.pending_count -= msgs.length;
send_msgs(node, msgInfos, true);
msgInfos.forEach(e => e.done() );
node.pending_count -= msgInfos.length;
}
function add_to_topic_group(pending, topic, gid, msg) {
function add_to_topic_group(pending, topic, gid, msgInfo) {
if (!pending.hasOwnProperty(topic)) {
pending[topic] = { groups: {}, gids: [] };
}
@@ -132,32 +135,43 @@ module.exports = function(RED) {
gids.push(gid);
}
var group = groups[gid];
group.msgs.push(msg);
group.msgs.push(msgInfo);
if ((group.count === undefined) &&
msg.parts.hasOwnProperty('count')) {
group.count = msg.parts.count;
msgInfo.msg.parts.hasOwnProperty('count')) {
group.count = msgInfo.msg.parts.count;
}
}
function concat_msg(node, msg) {
function concat_msg(node, msg, send, done) {
var topic = msg.topic;
if(node.topics.indexOf(topic) >= 0) {
if (!msg.hasOwnProperty("parts") ||
!msg.parts.hasOwnProperty("id") ||
!msg.parts.hasOwnProperty("index") ||
!msg.parts.hasOwnProperty("count")) {
node.error(RED._("batch.no-parts"), msg);
done(RED._("batch.no-parts"));
return;
}
var gid = msg.parts.id;
var pending = node.pending;
add_to_topic_group(pending, topic, gid, msg);
add_to_topic_group(pending, topic, gid, {msg, send, done});
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
Object.values(node.pending).forEach(p_topic => {
Object.values(p_topic.groups).forEach(group => {
group.msgs.forEach(msgInfo => {
if (msgInfo.msg.id === msg.id) {
// the message that caused the overflow
msgInfo.done(RED._("batch.too-many"));
} else {
msgInfo.done();
}
})
})
});
node.pending = {};
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
try_concat(node, pending);
}
@@ -178,29 +192,37 @@ module.exports = function(RED) {
return;
}
node.pending = [];
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
done();
return;
}
var queue = node.pending;
queue.push(msg);
queue.push({msg, send, done});
node.pending_count++;
if (queue.length === count) {
send_msgs(node, queue, is_overlap);
for (let i = 0; i < queue.length-overlap; i++) {
queue[i].done();
}
node.pending =
(overlap === 0) ? [] : queue.slice(-overlap);
node.pending_count = 0;
}
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
let lastMInfo = node.pending.pop();
lastMInfo.done(RED._("batch.too-many"));
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
});
this.on("close", function() {
node.pending.forEach(e=> e.done());
node.pending_count = 0;
node.pending = [];
});
@@ -217,31 +239,36 @@ module.exports = function(RED) {
if (interval > 0) {
timer = setInterval(msgHandler, interval);
}
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
if (timer !== undefined) {
clearInterval(timer);
}
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
done();
if (interval > 0) {
timer = setInterval(msgHandler, interval);
}
return;
}
node.pending.push(msg);
node.pending.push({msg, send, done});
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
let lastMInfo = node.pending.pop();
lastMInfo.done(RED._("batch.too-many"));
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
});
this.on("close", function() {
if (timer !== undefined) {
clearInterval(timer);
}
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
});
@@ -251,15 +278,26 @@ module.exports = function(RED) {
return x.topic;
});
node.pending = {};
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
Object.values(node.pending).forEach(p_topic => {
Object.values(p_topic.groups).forEach(group => {
group.msgs.forEach(e => e.done());
});
});
node.pending = {};
node.pending_count = 0;
done();
return;
}
concat_msg(node, msg);
concat_msg(node, msg, send, done);
});
this.on("close", function() {
Object.values(node.pending).forEach(p_topic => {
Object.values(p_topic.groups).forEach(group => {
group.msgs.forEach(e => e.done());
});
});
node.pending = {};
node.pending_count = 0;
});