1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Rewording some of the message sequence nodes (#1564)

* Rewording some of the message sequence nodes

* Fix batch test for overlap renaming

* Finish msg-sequence node help rewording

* Rename maxKeptMsgsCount to nodeMessageBufferMaxLength

* Rename nodeMessageBufferMaxLength in tests

* Remove Join-merge mode for later rework
This commit is contained in:
Nick O'Leary 2018-01-24 22:01:07 +00:00 committed by GitHub
parent a62a1012fa
commit e7960d1d44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 181 additions and 645 deletions

View File

@ -543,7 +543,7 @@
"label": {
"property": "Property",
"rule": "rule",
"repair" : "repair sequence (reconstruct parts property of outgoing messages)"
"repair" : "recreate message sequences"
},
"and": "and",
"checkall": "checking all rules",
@ -559,7 +559,7 @@
"nnull":"is not null",
"head":"head",
"tail":"tail",
"index":"is between",
"index":"index between",
"exp":"JSONata exp",
"else":"otherwise"
},
@ -847,7 +847,7 @@
"mode":{
"mode":"Mode",
"auto":"automatic",
"merge":"merge sequence",
"merge":"merge sequences",
"reduce":"reduce sequence",
"custom":"manual"
},
@ -882,8 +882,8 @@
"exp": "Reduce exp",
"exp-value": "exp",
"init": "Initial value",
"right": "Evaluate in reverse order (right to left)",
"fixup": "Fixup exp"
"right": "Evaluate in reverse order (last to first)",
"fixup": "Fix-up exp"
},
"errors": {
"invalid-expr": "Invalid JSONata expression: __error__"
@ -905,20 +905,19 @@
"batch" : {
"mode": {
"label" : "Mode",
"num-msgs" : "number of messages",
"interval" : "interval in seconds",
"concat" : "concatenate sequences"
"num-msgs" : "Group by number of messages",
"interval" : "Group by time interval",
"concat" : "Concatenate sequences"
},
"count": {
"label" : "Number of msgs",
"overwrap" : "Overwrap",
"label" : "Number of messages",
"overlap" : "Overlap",
"count" : "count",
"invalid" : "Invalid count and overwrap"
"invalid" : "Invalid count and overlap"
},
"interval": {
"label" : "Interval (sec)",
"label" : "Interval",
"seconds" : "seconds",
"sec" : "sec",
"empty" : "send empty message when no message arrives"
},
"concat": {

View File

@ -40,7 +40,7 @@
</script>
<script type="text/x-red" data-help-name="switch">
<p>Route messages based on their property values.</p>
<p>Route messages based on their property values or sequence position.</p>
<h3>Details</h3>
<p>When a message arrives, the node will evaluate each of the defined rules
and forward the message to the corresponding outputs of any matching rules.</p>
@ -48,115 +48,25 @@
that matches.</p>
<p>The rules can be evaluated against an individual message property, a flow or global
context property or the result of a JSONata expression.</p>
<h3>Rules</h3>
<p>Routing rules are categorized into three:</p>
<dl>
<dt>value rules</dt>
<dd>
<table>
<tr>
<th>operator</th>
<th>description</th>
</tr>
<tr>
<td><b>==</b></td>
<td>property value is equals to specified value</td>
</tr>
<tr>
<td><b>!=</b></td>
<td>property value is not equals to specified value</td>
</tr>
<tr>
<td><b>&lt;</b></td>
<td>property value is less than specified value</td>
</tr>
<tr>
<td><b>&lt;=</b></td>
<td>property value is less than or equals to specified value</td>
</tr>
<tr>
<td><b>&gt;</b></td>
<td>property value is greater than specified value</td>
</tr>
<tr>
<td><b>&gt;=</b></td>
<td>property value is greater than or equals to specified value</td>
</tr>
<tr>
<td><b>is between</b></td>
<td>property value is between specified values</td>
</tr>
<tr>
<td><b>contains</b></td>
<td>property string value contains specified string value</td>
</tr>
<tr>
<td><b>matches regex</b></td>
<td>property string value matches specified regex</td>
</tr>
<tr>
<td><b>is true</b></td>
<td>property value is true</td>
</tr>
<tr>
<td><b>is false</b></td>
<td>property value is false</td>
</tr>
<tr>
<td><b>is null</b></td>
<td>property value is null</td>
</tr>
<tr>
<td><b>is not null</b></td>
<td>property value is not null</td>
</tr>
</table>
</dd>
<dt>sequence rules</dt>
<dd>
<table>
<tr>
<th>operator</th>
<th>description</th>
</tr>
<tr>
<td><b>head</b></td>
<td>message is included in the first specified number of messages in a sequence</td>
</tr>
<tr>
<td><b>tail</b></td>
<td>message is included in the last specified number of messages in a sequence</td>
</tr>
<tr>
<td><b>pos. between</b></td>
<td>message is included between specified positions in a sequence</td>
</tr>
</table>
</dd>
<dt>other rules</dt>
<dd>
<table>
<tr>
<th>operator</th>
<th>description</th>
</tr>
<tr>
<td><b>JSONata exp</b></td>
<td>specified JSONata expression evaluate to true. In JSONata expression, $I represents <code>msg.parts.index</code> and $N represents <code>msg.parts.count</code> respectively if exists.</td>
</tr>
<tr>
<td><b>otherwise</b></td>
<td>no matching rule found</td>
</tr>
</table>
</dd>
</dl>
<h3>Repair Sequence</h3>
<p>If <b>repair sequence (reconstruct parts property of outgoing messages)</b> checkbox is selected, <code>msg.parts</code> property are reconstructed for each port to make forks of valid sequences. Otherwise, <code>msg.parts</code> property of incoming messages are passed through.</p>
<h3>Note:</h3>
<p>This node internally keeps messages for its operation if <b>repair sequence</b> checkbox is ON. In order to prevent unexpected memory usage, maximum number of messages kept can be specified by <code>maxKeptMsgsCount</code> property in <b>settings.js</b>.</p>
<h4>Rules</h4>
<p>There are four types of rule:</p>
<ol>
<li><b>Value</b> rules are evaluated against the configured property</li>
<li><b>Sequence</b> rules can be used on message sequences, such as those
generated by the Split node</li>
<li>A JSONata <b>Expression</b> can be provided that will be evaluated
against the whole message and will match if the expression returns
a true value.</li>
<li>An <b>Otherwise</b> rule can be used to match if none of the preceeding
rules have matched.</li>
</ol>
<h4>Handling message sequences</h4>
<p>By default, the node does not modify the <code>msg.parts</code> property of messages
that are part of a sequence.</p>
<p>The <b>recreate message sequences</b> option can be enabled to generate new message sequences
for each rule that matches. In this mode, the node will buffer the entire incoming
sequence before sending the new sequences on. The runtime setting `nodeMessageBufferMaxLength`
can be used to limit how many messages nodes will buffer.</p>
</script>
<script type="text/javascript">

View File

@ -53,7 +53,7 @@ module.exports = function(RED) {
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "maxKeptMsgsCount";
var name = "nodeMessageBufferMaxLength";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}

View File

@ -90,11 +90,9 @@
a message and send each complete segment. If there is a partial segment at the end,
the node will hold on to it and prepend it to the next message that is received.
</p>
<p>When operating in this mode, the node will not set the `msg.parts.count`
<p>When operating in this mode, the node will not set the <code>msg.parts.count</code>
property as it does not know how many messages to expect in the stream. This
means it cannot be used with the <b>join</b> node in its automatic mode</p>
</script>
<script type="text/javascript">
@ -170,7 +168,6 @@
<select id="node-input-mode" style="width:200px;">
<option value="auto" data-i18n="join.mode.auto"></option>
<option value="custom" data-i18n="join.mode.custom"></option>
<option value="merge" data-i18n="join.mode.merge"></option>
<option value="reduce" data-i18n="join.mode.reduce"></option>
</select>
</div>
@ -219,18 +216,6 @@
</ul>
</div>
</div>
<div class="node-row-merge">
<div class="form-row">
<label data-i18n="join.merge.topics-label"></label>
<div class="form-row node-input-topics-container-row">
<ol id="node-input-topics-container"></ol>
</div>
</div>
<div class="form-row">
<input type="checkbox" id="node-input-mergeOnChange" style="margin-left:10px; vertical-align:top; width:auto;">
<label for="node-input-mergeOnChange" style="width:auto;" data-i18n="join.merge.on-change"></label>
</div>
</div>
<div class="node-row-reduce">
<div class="form-row">
<label for="node-input-reduceExp" data-i18n="join.reduce.exp" style="margin-left:10px;"></label>
@ -259,16 +244,15 @@
</script>
<script type="text/x-red" data-help-name="join">
<p>Joins sequences of messages into a single message. This node provides four mode for message combination:</p>
<p>Joins sequences of messages into a single message.</p>
<p>There are three modes available:</p>
<dl>
<dt>automatic</dt>
<dd>When paired with the <b>split</b> node, it will automatically join the messages to reverse the split that was performed.</dd>
<dt>manual</dt>
<dd>It will join sequences of messages in a variety of ways.</dd>
<dt>merge sequence</dt>
<dd>It will merge incoming messages into single message using <code>topic</code> property.</dd>
<dd>Join sequences of messages in a variety of ways.</dd>
<dt>reduce sequence</dt>
<dd>When paired with the <b>split</b> node, it will reduce the message sequence into single message.</dd>
<dd>Apply an expression against all messages in a sequence to reduce it to a single message.</dd>
</dl>
<h3>Inputs</h3>
<dl class="message-properties">
@ -292,11 +276,13 @@
<h3>Details</h3>
<h4>Automatic mode</h4>
<p>When configured to join in manual mode, the node is able to join sequences of messages using <code>parts</code> property of incoming messages.</p>
<p>Automatic mode uses the <code>parts</code> property of incoming messages to
determine how the sequence should be joined. This allows it to automatically
reverse the action of a <b>split</b> node.
<h4>Manual mode</h4>
<p>When configured to join in manual mode, the node is able to join sequences
of messages in a variety of ways.</p>
of messages into a number of different results:</p>
<ul>
<li>a <b>string</b> or <b>buffer</b> - created by joining the selected property of each message with the specified join characters or buffer.</li>
<li>an <b>array</b> - created by adding each selected property, or entire message, to the output array.</li>
@ -311,49 +297,48 @@
<p>A <i>timeout</i> can be set to trigger sending the new message using whatever has been received so far.</p>
<p>If a message is received with the <b>msg.complete</b> property set, the output message is sent.</p>
<h4>Merge Sequence mode</h4>
<p>When configured to join in merge mode, the join node can create a message based on <code>topic</code> value.</p>
<p>Input messages are merged in order specified by <b>Topics</b> value.
<p>For example, if value of <b>Topics</b> is <b>x,x,y</b>, two input messages with topic <b>x</b> and one input message with topic <b>y</b> are merged into a new message in order of arrival.</p>
<p>If "Send merged message on arrival of a new topic" check box is selected, the last messages with each topic is kept internally and output message is sent when a message with new topics arrives.</p>
<p>The merged message contains <code>payload</code> property and properties for each topic. The <code>payload</code> property represents ordered array of payload value of input messages for each topic. The property for each topic represents a payload value for single occurrence of topic or array of payload values for multiple occurrences of the topic.</p>
<h4>Reduce Sequence mode</h4>
<p>When configured to join in reduce sequence mode, following values can be specified:</p>
<p>When configured to join in reduce mode, an expression is applied to each
message in a sequence and the result accumulated to produce a single message.</p>
<dl class="message-properties">
<dt>Reduce exp</dt>
<dd>JSONata expression for reducing message group. This expression represents the accumulated result. In the expression, following special variables can be used:
<ul>
<li><code>$A</code> accumulated value, </li>
<li><code>$I</code> index of the message in a group, </li>
<li><code>$N</code> number of messages of a group.</li>
</ul>
</dd>
<dt>Initial value</dt>
<dd>
initial value of reduction.
</dd>
<dt>Fixup exp</dt>
<dd>
JSONata expression applied after reduction of a message group completed. In the expression, following special variables can be used:
<dd>The initial value of the accumulated value (<code>$A</code>).</dd>
<dt>Reduce expression</dt>
<dd>A JSONata expression that is called for each message in the sequence.
The result is passed to the next call of the expression as the accumulated value.
In the expression, the following special variables can be used:
<ul>
<li><code>$A</code> accumulated value, </li>
<li><code>$N</code> number of messages of a group.</li>
<li><code>$A</code>: the accumulated value, </li>
<li><code>$I</code>: index of the message in the sequence, </li>
<li><code>$N</code>: number of messages in the sequence.</li>
</ul>
</dd>
<p>Order of reduction on a message group can be specified by checkbox (<b>Evaluate in reverse order (right to left)</b>).</p>
<dt>Fix-up expression</dt>
<dd>An optional JSONata expression that is applied after the reduce expression
has been applied to all messages in the sequence.
In the expression, following special variables can be used:
<ul>
<li><code>$A</code>: the accumulated value, </li>
<li><code>$N</code>: number of messages in the sequence.</li>
</ul>
</dd>
<p>By default, the reduce expression is applied in order, from the first
to the last message of the sequence. It can optionally be applied in
reverse order.</p>
</dl>
<p><b>Example:</b> Join node outputs an average for each input message group with the following setting:
<p><b>Example:</b> the following settings, given a sequence of numeric values,
calculates the average value:
<ul>
<li><b>Reduce exp</b>: <code>$A+payload</code></li>
<li><b>Reduce expression</b>: <code>$A+payload</code></li>
<li><b>Initial value</b>: <code>0</code></li>
<li><b>Fixup exp</b>: <code>$A/$N</code></li>
<li><b>Fix-up expression</b>: <code>$A/$N</code></li>
</ul>
</p>
<h4>Note:</h4>
<p>This node internally keeps messages for its operation. In order to prevent unexpected memory usage, maximum number of messages kept can be specified by <code>maxKeptMsgsCount</code> property in <b>settings.js</b>.</p>
<h4>Storing messages</h4>
<p>This node will buffer messages internally in order to work across sequences. The
runtime setting `nodeMessageBufferMaxLength` can be used to limit how many messages nodes
will buffer.</p>
</script>
<script type="text/javascript">
@ -372,8 +357,6 @@
accumulate: { value:"false" },
timeout: {value:""},
count: {value:""},
topics: {value:[{topic:""}]},
mergeOnChange: {value:false},
reduceRight: {value:false},
reduceExp: {value:undefined},
reduceInit: {value:undefined},
@ -391,41 +374,10 @@
},
oneditprepare: function() {
var node = this;
var topic_str = node._("join.merge.topic");
function resizeTopics(topic) {
var newWidth = topic.width();
topic.find('.red-ui-typedInput')
.typedInput("width",newWidth-15);
}
$("#node-input-topics-container")
.css('min-height','250px').css('min-width','420px')
.editableList({
addItem: function(container,i,opt) {
if (!opt.hasOwnProperty('topic')) {
opt.topic = "";
}
var row = $('<div/>').appendTo(container);
var valueField = $('<input/>',{
class:"node-input-topic-value",
type:"text",
style:"margin-left: 5px;"
}).appendTo(row)
.typedInput({default:'str', types:['str']});
valueField.typedInput('value', opt.topic);
valueField.typedInput('type', 'str');
valueField.attr('placeholder', topic_str);
resizeTopics(container);
},
resizeItem: resizeTopics,
sortable: true,
removable: true
});
$("#node-input-mode").change(function(e) {
var val = $(this).val();
$(".node-row-custom").toggle(val==='custom');
$(".node-row-merge").toggle(val==='merge');
$(".node-row-reduce").toggle(val==='reduce');
$(".form-tips-auto").toggle((val==='auto') || (val==='reduce'));
if (val === "auto") {
@ -434,15 +386,6 @@
else if (val === "custom") {
$("#node-input-build").change();
}
else if (val === "merge") {
var topics = node.topics;
var container = $("#node-input-topics-container");
container.editableList('empty');
for (var i=0;i<topics.length;i++) {
var topic = topics[i];
container.editableList('addItem', topic);
}
}
else if (val === "reduce") {
var jsonata_or_empty = {
value: "jsonata",
@ -519,27 +462,6 @@
if (build !== 'object' && build !== 'merged') {
$("#node-input-accumulate").prop("checked",false);
}
var topics = $("#node-input-topics-container").editableList('items');
var node = this;
node.topics = [];
topics.each(function(i) {
var topicData = $(this).data('data');
var topic = $(this);
var vf = topic.find(".node-input-topic-value");
var value = vf.typedInput('value');
var r = {topic:value};
node.topics.push(r);
});
},
oneditresize: function(size) {
var rows = $("#dialog-form>div:not(.node-input-topics-container-row)");
var height = size.height;
for (var i=0;i<rows.size();i++) {
height -= $(rows[i]).outerHeight(true);
}
var editorRow = $("#dialog-form>div.node-input-topics-container-row");
height -= (parseInt(editorRow.css("marginTop"))+parseInt(editorRow.css("marginBottom")));
$("#node-input-topics-container").editableList('height',height);
}
});
</script>

View File

@ -236,7 +236,7 @@ module.exports = function(RED) {
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "maxKeptMsgsCount";
var name = "nodeMessageBufferMaxLength";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}
@ -247,89 +247,6 @@ module.exports = function(RED) {
return _max_kept_msgs_count;
}
function add_to_topic(node, pending, topic, msg) {
var merge_on_change = node.merge_on_change;
if (!pending.hasOwnProperty(topic)) {
pending[topic] = [];
}
var topics = pending[topic];
topics.push(msg);
if (merge_on_change) {
var counts = node.topic_counts;
if (topics.length > counts[topic]) {
topics.shift();
node.pending_count--;
}
}
}
function compute_topic_counts(topics) {
var counts = {};
for (var topic of topics) {
counts[topic] = (counts.hasOwnProperty(topic) ? counts[topic] : 0) +1;
}
return counts;
}
function try_merge(node, pending, merge_on_change) {
var topics = node.topics;
var counts = node.topic_counts;
for(var topic of topics) {
if(!pending.hasOwnProperty(topic) ||
(pending[topic].length < counts[topic])) {
return;
}
}
var merge_on_change = node.merge_on_change;
var msgs = [];
var vals = [];
var new_msg = {payload: vals};
for (var topic of topics) {
var pmsgs = pending[topic];
var msg = pmsgs.shift();
if (merge_on_change) {
pmsgs.push(msg);
}
var pval = msg.payload;
var val = new_msg[topic];
msgs.push(msg);
vals.push(pval);
if (val instanceof Array) {
new_msg[topic].push(pval);
}
else if (val === undefined) {
new_msg[topic] = pval;
}
else {
new_msg[topic] = [val, pval]
}
}
node.send(new_msg);
if (!merge_on_change) {
node.pending_count -= topics.length;
}
}
function merge_msg(node, msg) {
var topics = node.topics;
var topic = msg.topic;
if(node.topics.indexOf(topic) >= 0) {
var pending = node.pending;
if(node.topic_counts == undefined) {
node.topic_counts = compute_topic_counts(topics)
}
add_to_topic(node, pending, topic, msg);
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
node.error(RED._("join.too-many"), msg);
}
try_merge(node, pending);
}
}
function apply_r(exp, accum, msg, index, count) {
exp.assign("I", index);
exp.assign("N", count);
@ -381,8 +298,8 @@ module.exports = function(RED) {
var pending = node.pending;
var pending_count = node.pending_count;
var gid = msg.parts.id;
var count;
if(!pending.hasOwnProperty(gid)) {
var count = undefined;
if(parts.hasOwnProperty('count')) {
count = msg.parts.count;
}
@ -568,10 +485,6 @@ module.exports = function(RED) {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
return;
}
if (node.mode === 'merge' && !msg.hasOwnProperty("topic")) {
node.warn("Message missing msg.topic property - cannot join in 'merge' mode");
return;
}
if (node.propertyType == "full") {
property = msg;
@ -602,10 +515,6 @@ module.exports = function(RED) {
arrayLen = msg.parts.len;
propertyIndex = msg.parts.index;
}
else if (node.mode === 'merge') {
merge_msg(node, msg);
return;
}
else if (node.mode === 'reduce') {
reduce_msg(node, msg);
return;

View File

@ -81,7 +81,7 @@
</p>
<p><b>Note:</b> This node internally keeps messages for its operation. In order to prevent unexpected memory usage, maximum number of messages kept can be specified. Default is no limit on number of messages.
<ul>
<li><code>maxKeptMsgsCount</code> property set in <b>settings.js</b>.</li>
<li><code>nodeMessageBufferMaxLength</code> property set in <b>settings.js</b>.</li>
</ul>
</p>
</script>

View File

@ -21,7 +21,7 @@ module.exports = function(RED) {
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "maxKeptMsgsCount";
var name = "nodeMessageBufferMaxLength";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}

View File

@ -18,7 +18,7 @@
<script type="text/x-red" data-template-name="batch">
<div class="form-row">
<label for="node-input-mode"><i class="fa fa-dot-circle-o"></i> <span data-i18n="batch.mode.label"></span></label>
<label for="node-input-mode"><span data-i18n="batch.mode.label"></span></label>
<select type="text" id="node-input-mode" style="width: 300px;">
<option value="count" data-i18n="batch.mode.num-msgs"></option>
<option value="interval" data-i18n="batch.mode.interval"></option>
@ -28,26 +28,26 @@
<div class="node-row-msg-count">
<div class="form-row node-row-count">
<label for="node-input-count" data-i18n="batch.count.label"></label>
<input type="text" id="node-input-count" data-i18n="[placeholder]batch.count.count" style="width: 50px;">
<label style="margin-left: 10px; width: 175px;" for="node-input-count" data-i18n="batch.count.label"></label>
<input type="text" id="node-input-count" style="width: 50px;">
</div>
</div>
<div class="node-row-msg-overwrap">
<div class="form-row node-row-overwrap">
<label for="node-input-count" data-i18n="batch.count.overwrap"></label>
<input type="text" id="node-input-overwrap" data-i18n="[placeholder]batch.count.count" style="width: 50px;">
<div class="node-row-msg-overlap">
<div class="form-row node-row-overlap">
<label style="margin-left: 10px; width: 175px;" for="node-input-overlap" data-i18n="batch.count.overlap"></label>
<input type="text" id="node-input-overlap" style="width: 50px;">
</div>
</div>
<div class="node-row-msg-interval">
<div class="form-row node-row-interval">
<label for="node-input-interval"> <span data-i18n="batch.interval.label"></span></label>
<input type="text" id="node-input-interval" data-i18n="[placeholder]batch.interval.seconds" style="width: 50px;">
<span data-i18n="batch.interval.sec"></span>
<label style="margin-left: 10px; width: 175px;" for="node-input-interval"> <span data-i18n="batch.interval.label"></span></label>
<input type="text" id="node-input-interval" style="width: 50px;">
<span data-i18n="batch.interval.seconds"></span>
</div>
<div class="form-row">
<input type="checkbox" id="node-input-allowEmptySequence" style="margin-left:10px; vertical-align:top; width:auto;">
<input type="checkbox" id="node-input-allowEmptySequence" style="margin-left:20px; margin-right: 10px; vertical-align:top; width:auto;">
<label for="node-input-allowEmptySequence" style="width:auto;" data-i18n="batch.interval.empty"></label>
</div>
</div>
@ -69,20 +69,31 @@
</script>
<script type="text/x-red" data-help-name="batch">
<p>A function that divides input messages into multiple sequences of messages or concatenates multiple sequences of messages into a single messages sequence.</p>
<p>Creates sequences of messages based on various rules.</p>
<h3>Details</h3>
<h4>group by number of messages</h4>
<p>groups incoming messages into sequences of messages with specified counts. The output message group can be overwrapped.</p>
<h4>group by interval in seconds</h4>
<p>groups incoming messages received withn specified interval into sequences of messages. </p>
<h4>concatenate message groups</h4>
<p>creates a message sequence based on <code>topic</code> value of incoming message sequences.</p>
<p>Target and order of concatenated sequences are specified by <code>Topics</code> value. Selection of concatenated message groups is based on arrival of first message of the group.</p>
<p><b>Note:</b> This node internally keeps messages for its operation. In order to prevent unexpected memory usage, maximum number of messages kept can be specified. Default is no limit on number of messages.
<ul>
<li><code>maxKeptMsgsCount</code> property set in <b>settings.js</b>.</li>
</ul>
</p>
<p>There are three modes for creating message sequences:</p>
<dl>
<dt>Number of messages</dt>
<dd>groups messages into sequences of a given length. The <b>overlap</b>
option specifies how many messages and the end of one sequence should be
repeated at the start of the next sequence.</dd>
<dt>Time interval</dt>
<dd>groups messages that arrive within the specified interval. If no messages
arrive within the interval, the node can optionally send on an empty message.</dd>
<dt>Concatenate Sequences</dt>
<dd>creates a message sequence by concatenating incoming sequences. Each sequence
must have a <code>msg.topic</code> property to identify it. The node is
configured with a list of <code>topic</code> values to identify the order
sequences are concatenated.
</dd>
</dd>
</dl>
<h4>Storing messages</h4>
<p>This node will buffer messages internally in order to work across sequences. The
runtime setting `nodeMessageBufferMaxLength` can be used to limit how many messages nodes
will buffer.</p>
</script>
<script type="text/javascript">
@ -93,9 +104,9 @@
name: {value:""},
mode: {value:"count"},
count: {value:10},
overwrap: {value:0},
overlap: {value:0},
interval: {value:10},
allowEmptySequence: {value:false},
allowEmptySequence: {value:false},
topics: {value:[{topic:""}]}
},
inputs:1,
@ -130,27 +141,27 @@
type:"text",
style:"margin-left: 5px;"
}).appendTo(row)
.typedInput({default:'str', types:['str']});
valueField.typedInput('value', opt.topic);
valueField.typedInput('type', 'str');
.typedInput({default:'str', types:['str']});
valueField.typedInput('value', opt.topic);
valueField.typedInput('type', 'str');
valueField.attr('placeholder', topic_str);
resizeTopics(container);
resizeTopics(container);
},
resizeItem: resizeTopics,
resizeItem: resizeTopics,
sortable: true,
removable: true
});
$("#node-input-count").spinner({
});
$("#node-input-overwrap").spinner({
$("#node-input-overlap").spinner({
});
$("#node-input-interval").spinner({
});
$("#node-input-mode").change(function(e) {
var val = $(this).val();
$(".node-row-msg-count").toggle(val==="count");
$(".node-row-msg-overwrap").toggle(val==="count");
$(".node-row-msg-overlap").toggle(val==="count");
$(".node-row-msg-interval").toggle(val==="interval");
$(".node-row-msg-concat").toggle(val==="concat");
if (val==="concat") {
@ -171,9 +182,9 @@
topics.each(function(i) {
var topicData = $(this).data('data');
var topic = $(this);
var vf = topic.find(".node-input-topic-value");
var value = vf.typedInput('value');
var type = vf.typedInput('type');
var vf = topic.find(".node-input-topic-value");
var value = vf.typedInput('value');
var type = vf.typedInput('type');
var r = {topic:value};
node.topics.push(r);
});

View File

@ -21,7 +21,7 @@ module.exports = function(RED) {
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "maxKeptMsgsCount";
var name = "nodeMessageBufferMaxLength";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}
@ -171,9 +171,9 @@ module.exports = function(RED) {
node.pending_count = 0;
if (mode === "count") {
var count = Number(n.count || 1);
var overwrap = Number(n.overwrap || 0);
var is_overwrap = (overwrap > 0);
if (count <= overwrap) {
var overlap = Number(n.overlap || 0);
var is_overlap = (overlap > 0);
if (count <= overlap) {
node.error(RED._("batch.count.invalid"));
return;
}
@ -183,9 +183,9 @@ module.exports = function(RED) {
queue.push(msg);
node.pending_count++;
if (queue.length === count) {
send_msgs(node, queue, is_overwrap);
send_msgs(node, queue, is_overlap);
node.pending =
(overwrap === 0) ? [] : queue.slice(-overwrap);
(overlap === 0) ? [] : queue.slice(-overlap);
node.pending_count = 0;
}
var max_msgs = max_kept_msgs_count(node);

View File

@ -47,9 +47,10 @@ module.exports = {
// The maximum length, in characters, of any message sent to the debug sidebar tab
debugMaxLength: 1000,
// The maximum number of messages kept internally in nodes.
// Zero or undefined value means not restricting number of messages.
//maxKeptMsgsCount: 0,
// The maximum number of messages nodes will buffer internally as part of their
// operation. This applies across a range of nodes that operate on message sequences.
// defaults to no limit. A value of 0 also means no limit is applied.
//nodeMaxMessageBufferLength: 0,
// To disable the option for using local files for storing keys and certificates in the TLS configuration
// node, set this to true

View File

@ -29,7 +29,7 @@ describe('switch Node', function() {
afterEach(function(done) {
helper.unload();
helper.stopServer(done);
RED.settings.maxKeptMsgsCount = 0;
RED.settings.nodeMessageBufferMaxLength = 0;
});
it('should be loaded with some defaults', function(done) {
@ -692,7 +692,7 @@ describe('switch Node', function() {
];
helper.load(switchNode, flow, function() {
var n1 = helper.getNode("n1");
RED.settings.maxKeptMsgsCount = 2;
RED.settings.nodeMessageBufferMaxLength = 2;
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "switch";

View File

@ -270,7 +270,7 @@ describe('JOIN node', function() {
afterEach(function() {
helper.unload();
RED.settings.maxKeptMsgsCount = 0;
RED.settings.nodeMessageBufferMaxLength = 0;
});
it('should be loaded', function(done) {
@ -731,197 +731,6 @@ describe('JOIN node', function() {
});
});
it('should merge messages with topics (single)', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TB"}],
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("TA");
msg.should.have.property("TB");
msg.should.have.property("payload");
msg.payload.should.be.an.Array();
msg.payload.length.should.equal(2);
count++;
if (count === 1) {
msg.TA.should.equal("a");
msg.TB.should.equal("b");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("b");
}
if (count === 2) {
msg.TA.should.equal("d");
msg.TB.should.equal("c");
msg.payload[0].should.equal("d");
msg.payload[1].should.equal("c");
done();
}
}
catch(e) { done(e); }
});
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TB"});
n1.receive({payload:"d", topic:"TA"});
});
});
it('should merge messages with topics (multiple)', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TB"}, {topic:"TA"}],
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("TA");
msg.TA.should.be.an.Array();
msg.TA.length.should.equal(2);
msg.should.have.property("TB");
msg.should.have.property("payload");
msg.payload.should.be.an.Array();
msg.payload.length.should.equal(3);
count++;
if (count === 1) {
msg.TA[0].should.equal("a");
msg.TA[1].should.equal("d");
msg.TB.should.equal("b");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("b");
msg.payload[2].should.equal("d");
}
if (count === 2) {
msg.TA[0].should.equal("e");
msg.TA[1].should.equal("f");
msg.TB.should.equal("c");
msg.payload[0].should.equal("e");
msg.payload[1].should.equal("c");
msg.payload[2].should.equal("f");
done();
}
}
catch(e) { done(e); }
});
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TB"});
n1.receive({payload:"d", topic:"TA"});
n1.receive({payload:"e", topic:"TA"});
n1.receive({payload:"f", topic:"TA"});
});
});
it('should merge messages with topics (single, send on new topic)', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TB"}],
mergeOnChange:true,
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("TA");
msg.should.have.property("TB");
msg.should.have.property("payload");
msg.payload.should.be.an.Array();
msg.payload.length.should.equal(2);
count++;
if (count === 1) {
msg.TA.should.equal("a");
msg.TB.should.equal("b");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("b");
}
if (count === 2) {
msg.TA.should.equal("a");
msg.TB.should.equal("c");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("c");
}
if (count === 3) {
msg.TA.should.equal("d");
msg.TB.should.equal("c");
msg.payload[0].should.equal("d");
msg.payload[1].should.equal("c");
done();
}
}
catch(e) { done(e); }
});
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TB"});
n1.receive({payload:"d", topic:"TA"});
});
});
it('should merge messages with topics (multiple, send on new topic)', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TB"}, {topic:"TA"}],
mergeOnChange:true,
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("TA");
msg.TA.should.be.an.Array();
msg.TA.length.should.equal(2);
msg.should.have.property("TB");
msg.should.have.property("payload");
msg.payload.should.be.an.Array();
msg.payload.length.should.equal(3);
count++;
if (count === 1) {
msg.TA[0].should.equal("a");
msg.TA[1].should.equal("c");
msg.TB.should.equal("b");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("b");
msg.payload[2].should.equal("c");
}
if (count === 2) {
msg.TA[0].should.equal("c");
msg.TA[1].should.equal("d");
msg.TB.should.equal("b");
msg.payload[0].should.equal("c");
msg.payload[1].should.equal("b");
msg.payload[2].should.equal("d");
}
if (count === 3) {
msg.TA[0].should.equal("c");
msg.TA[1].should.equal("d");
msg.TB.should.equal("e");
msg.payload[0].should.equal("c");
msg.payload[1].should.equal("e");
msg.payload[2].should.equal("d");
done();
}
}
catch(e) { done(e); }
});
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TA"});
n1.receive({payload:"d", topic:"TA"});
n1.receive({payload:"e", topic:"TB"});
});
});
it('should redece messages', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
@ -1065,31 +874,6 @@ describe('JOIN node', function() {
});
});
it('should handle too many pending messages for merge mode', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TA"}, {topic:"TB"}],
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
RED.settings.maxKeptMsgsCount = 2;
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "join";
});
var evt = logEvents[0][0];
evt.should.have.property('id', "n1");
evt.should.have.property('type', "join");
evt.should.have.property('msg', "join.too-many");
done();
}, 150);
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TB"});
n1.receive({payload:"d", topic:"TA"});
});
});
it('should handle too many pending messages for reduce mode', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
@ -1101,7 +885,7 @@ describe('JOIN node', function() {
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
RED.settings.maxKeptMsgsCount = 2;
RED.settings.nodeMessageBufferMaxLength = 2;
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "join";

View File

@ -27,7 +27,7 @@ describe('SORT node', function() {
afterEach(function() {
helper.unload();
RED.settings.maxKeptMsgsCount = 0;
RED.settings.nodeMessageBufferMaxLength = 0;
});
it('should be loaded', function(done) {
@ -254,7 +254,7 @@ describe('SORT node', function() {
{id:"n2", type:"helper"}];
helper.load(sortNode, flow, function() {
var n1 = helper.getNode("n1");
RED.settings.maxKeptMsgsCount = 2;
RED.settings.nodeMessageBufferMaxLength = 2;
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "sort";

View File

@ -28,7 +28,7 @@ describe('BATCH node', function() {
afterEach(function() {
helper.unload();
RED.settings.maxKeptMsgsCount = 0;
RED.settings.nodeMessageBufferMaxLength = 0;
});
it('should be loaded with defaults', function(done) {
@ -144,7 +144,7 @@ describe('BATCH node', function() {
describe('mode: count', function() {
it('should create seq. with count', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 2, overwrap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 2, overlap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
[0, 1],
@ -154,8 +154,8 @@ describe('BATCH node', function() {
check_count(flow, results, done);
});
it('should create seq. with count and overwrap', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 3, overwrap: 2, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
it('should create seq. with count and overlap', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 3, overlap: 2, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
[0, 1, 2],
@ -167,12 +167,12 @@ describe('BATCH node', function() {
});
it('should handle too many pending messages', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 5, overwrap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 5, overlap: 0, interval: 10, allowEmptySequence: false, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(batchNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
RED.settings.maxKeptMsgsCount = 2;
RED.settings.nodeMessageBufferMaxLength = 2;
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "batch";
@ -194,7 +194,7 @@ describe('BATCH node', function() {
describe('mode: interval', function() {
it('should create seq. with interval', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
[0, 1],
@ -204,7 +204,7 @@ describe('BATCH node', function() {
});
it('should create seq. with interval (in float)', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 0.5, allowEmptySequence: false, topics: [], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 0.5, allowEmptySequence: false, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
[0, 1],
@ -214,7 +214,7 @@ describe('BATCH node', function() {
});
it('should create seq. with interval & not send empty seq', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
// 1300, 2600, 3900, 5200,
@ -224,7 +224,7 @@ describe('BATCH node', function() {
});
it('should create seq. with interval & send empty seq', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: true, topics: [], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 1, allowEmptySequence: true, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
// 1300, 2600, 3900, 5200,
@ -234,12 +234,12 @@ describe('BATCH node', function() {
});
it('should handle too many pending messages', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(batchNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
RED.settings.maxKeptMsgsCount = 2;
RED.settings.nodeMessageBufferMaxLength = 2;
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "batch";
@ -261,7 +261,7 @@ describe('BATCH node', function() {
describe('mode: concat', function() {
it('should concat two seq. (series)', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
[2, 3, 0, 1]
@ -276,7 +276,7 @@ describe('BATCH node', function() {
});
it('should concat two seq. (mixed)', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
[2, 3, 0, 1]
@ -291,7 +291,7 @@ describe('BATCH node', function() {
});
it('should concat three seq.', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}, {topic: "TC"}], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}, {topic: "TC"}], wires:[["n2"]]},
{id:"n2", type:"helper"}];
var results = [
[2, 3, 0, 1, 4]
@ -307,12 +307,12 @@ describe('BATCH node', function() {
});
it('should handle too many pending messages', function(done) {
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overwrap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(batchNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
RED.settings.maxKeptMsgsCount = 2;
RED.settings.nodeMessageBufferMaxLength = 2;
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "batch";