Delay node enhancements (#2294)

* Remove unused messages in message catalog

* Support msg.rate in delay node

* Support nodeMessageBufferMaxLength in delay node

* Add logging function for queue size

* Support msg.nodeMessageBufferMaxLength

* Revert "Support msg.nodeMessageBufferMaxLength"

This reverts commit cc72f892f7.

* Improve logging function for delay node

* Add support for Messaging API to delay node

* Add documentation about msg.rate in delay node

* Add test cases for msg.rate in delay node

Co-authored-by: Dave Conway-Jones <dceejay@users.noreply.github.com>
This commit is contained in:
Kazuhito Yokoi
2021-04-22 17:01:28 +09:00
committed by GitHub
parent 719aea2a58
commit a20049c82a
9 changed files with 686 additions and 52 deletions

View File

@@ -20,6 +20,20 @@ module.exports = function(RED) {
var MILLIS_TO_NANOS = 1000000;
var SECONDS_TO_NANOS = 1000000000;
var _maxKeptMsgsCount;
function maxKeptMsgsCount(node) {
if (_maxKeptMsgsCount === undefined) {
var name = "nodeMessageBufferMaxLength";
if (RED.settings.hasOwnProperty(name)) {
_maxKeptMsgsCount = RED.settings[name];
}
else {
_maxKeptMsgsCount = 0;
}
}
return _maxKeptMsgsCount;
}
function DelayNode(n) {
RED.nodes.createNode(this,n);
@@ -78,6 +92,7 @@ module.exports = function(RED) {
this.randomID = -1;
this.lastSent = null;
this.drop = n.drop;
this.droppedMsgs = 0;
var node = this;
function ourTimeout(handler, delay, clearHandler) {
@@ -88,6 +103,19 @@ module.exports = function(RED) {
};
}
var sendMsgFromBuffer = function() {
if (node.buffer.length === 0) {
clearInterval(node.intervalID);
node.intervalID = -1;
}
if (node.buffer.length > 0) {
const msgInfo = node.buffer.shift();
msgInfo.send(msgInfo.msg);
msgInfo.done();
}
node.reportDepth();
}
var clearDelayList = function(s) {
for (var i=0; i<node.idList.length; i++ ) { node.idList[i].clear(); }
node.idList = [];
@@ -112,6 +140,14 @@ module.exports = function(RED) {
}
}
var loggerId = setInterval(function () {
if (node.droppedMsgs !== 0) {
node.debug("node.droppedMsgs = " + node.droppedMsgs);
node.droppedMsgs = 0;
}
}, 15 * 1000);
node.on("close", function() { clearInterval(loggerId); });
if (node.pauseType === "delay") {
node.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("flush")) { flushDelayList(); done(); }
@@ -166,28 +202,32 @@ module.exports = function(RED) {
done();
return;
}
if (!node.drop) {
var m = RED.util.cloneMessage(msg);
delete m.flush;
if (node.intervalID !== -1) {
node.buffer.push({msg: m, send: send, done: done});
node.reportDepth();
if (msg.hasOwnProperty("rate") && !isNaN(parseFloat(msg.rate)) && node.rate !== msg.rate) {
node.rate = msg.rate;
clearInterval(node.intervalID);
node.intervalID = setInterval(sendMsgFromBuffer, node.rate);
}
var max_msgs = maxKeptMsgsCount(node);
if ((max_msgs > 0) && (node.buffer.length >= max_msgs)) {
node.buffer = [];
node.error(RED._("delay.errors.too-many"), msg);
} else {
node.buffer.push({msg: m, send: send, done: done});
node.reportDepth();
}
}
else {
if (msg.hasOwnProperty("rate") && !isNaN(parseFloat(msg.rate))) {
node.rate = msg.rate;
}
send(m);
node.reportDepth();
node.intervalID = setInterval(function() {
if (node.buffer.length === 0) {
clearInterval(node.intervalID);
node.intervalID = -1;
}
if (node.buffer.length > 0) {
const msgInfo = node.buffer.shift();
msgInfo.send(msgInfo.msg);
msgInfo.done();
}
node.reportDepth();
}, node.rate);
node.intervalID = setInterval(sendMsgFromBuffer, node.rate);
done();
}
if (msg.hasOwnProperty("flush")) {
@@ -201,17 +241,40 @@ module.exports = function(RED) {
}
}
else {
var timeSinceLast;
if (node.lastSent) {
timeSinceLast = process.hrtime(node.lastSent);
}
if (!node.lastSent) { // ensuring that we always send the first message
node.lastSent = process.hrtime();
send(msg);
}
else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) {
node.lastSent = process.hrtime();
send(msg);
if (maxKeptMsgsCount(node) > 0) {
if (node.intervalID === -1) {
node.send(msg);
node.intervalID = setInterval(sendMsgFromBuffer, node.rate);
} else {
if (msg.hasOwnProperty("rate") && node.rate !== msg.rate) {
node.rate = msg.rate;
clearInterval(node.intervalID);
node.intervalID = setInterval(sendMsgFromBuffer, node.rate);
}
if (node.buffer.length < _maxKeptMsgsCount) {
var m = RED.util.cloneMessage(msg);
node.buffer.push({msg: m, send: send, done: done});
} else {
node.trace("dropped due to buffer overflow. msg._msgid = " + msg._msgid);
node.droppedMsgs++;
}
}
} else {
if (msg.hasOwnProperty("rate")) {
node.rate = msg.rate;
}
var timeSinceLast;
if (node.lastSent) {
timeSinceLast = process.hrtime(node.lastSent);
}
if (!node.lastSent) { // ensuring that we always send the first message
node.lastSent = process.hrtime();
send(msg);
}
else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) {
node.lastSent = process.hrtime();
send(msg);
}
}
done();
}