1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Initial support of merge & reduce mode for JOIN node (#1546)

* initial support of merge mode of JOIN node

* initial support of reduce mode of JOIN node

* update info document of JOIN node

* add tests for merge & reduce mode of JOIN node

* tidy tabs & spaces

* add test for too many pending messages & related fixes

* add an test for reduce mode of JOIN node

* change order of modes of SWITCH node

* add initial topics entry of merge mode

* fixed descriptions on "reduce right" checkbox

* fixed update of typedInput field of reduce mode

* fixed a typo in info document of JOIN node

* allow empty string in JSONata input field of reduce mode

* fixed a typo

* fixed error in reduce mode description
This commit is contained in:
Hiroyasu Nishiyama 2018-01-17 19:08:23 +09:00 committed by Nick O'Leary
parent af71ae649b
commit 218794be77
5 changed files with 873 additions and 8 deletions

View File

@ -840,6 +840,8 @@
"mode":{
"mode":"Mode",
"auto":"automatic",
"merge":"merge sequence",
"reduce":"reduce sequence",
"custom":"manual"
},
"combine":"Combine each",
@ -861,7 +863,24 @@
"afterTimeout":"After a timeout following the first message",
"seconds":"seconds",
"complete":"After a message with the <code>msg.complete</code> property set",
"tip":"This mode assumes this node is either paired with a <i>split</i> node or the received messages will have a properly configured <code>msg.parts</code> property."
"tip":"This mode assumes this node is either paired with a <i>split</i> node or the received messages will have a properly configured <code>msg.parts</code> property.",
"too-many" : "too many pending messages in join node",
"merge": {
"topics-label":"Merged Topics",
"topics":"topics",
"topic" : "topic",
"on-change":"Send merged message on arrival of a new topic"
},
"reduce": {
"exp": "Reduce exp",
"exp-value": "exp",
"init": "Initial value",
"right": "Evaluate in reverse order (right to left)",
"fixup": "Fixup exp"
},
"errors": {
"invalid-expr": "Invalid JSONata expression: __error__"
}
},
"sort" : {
"key-type" : "Key type",

View File

@ -108,7 +108,7 @@
arraySplt: {value:1},
arraySpltType: {value:"len"},
stream: {value:false},
addname: {value:""},
addname: {value:""}
},
inputs:1,
outputs:1,
@ -170,6 +170,8 @@
<select id="node-input-mode" style="width:200px;">
<option value="auto" data-i18n="join.mode.auto"></option>
<option value="custom" data-i18n="join.mode.custom"></option>
<option value="merge" data-i18n="join.mode.merge"></option>
<option value="reduce" data-i18n="join.mode.reduce"></option>
</select>
</div>
<div class="node-row-custom">
@ -217,6 +219,38 @@
</ul>
</div>
</div>
<div class="node-row-merge">
<div class="form-row">
<label data-i18n="join.merge.topics-label"></label>
<div class="form-row node-input-topics-container-row">
<ol id="node-input-topics-container"></ol>
</div>
</div>
<div class="form-row">
<input type="checkbox" id="node-input-mergeOnChange" style="margin-left:10px; vertical-align:top; width:auto;">
<label for="node-input-mergeOnChange" style="width:auto;" data-i18n="join.merge.on-change"></label>
</div>
</div>
<div class="node-row-reduce">
<div class="form-row">
<label for="node-input-reduceExp" data-i18n="join.reduce.exp" style="margin-left:10px;"></label>
<input type="text" id="node-input-reduceExp" data-i18n="[placeholder]join.reduce.exp-value" style="width:65%">
</div>
<div class="form-row">
<label for="node-input-reduceInit" data-i18n="join.reduce.init" style="margin-left:10px;"></label>
<input type="text" id="node-input-reduceInit" data-i18n="[placeholder]join.reduce.init" style="width:65%">
<input type="hidden" id="node-input-reduceInitType">
</div>
<div class="form-row">
<label for="node-input-reduceFixup" data-i18n="join.reduce.fixup" style="margin-left:10px;"></label>
<input type="text" id="node-input-reduceFixup" data-i18n="[placeholder]join.reduce.exp-value" style="width:65%">
</div>
<div class="form-row">
<label>&nbsp;</label>
<input type="checkbox" id="node-input-reduceRight" style="display:inline-block; width:auto; vertical-align:top; margin-left:10px;">
<label for="node-input-reduceRight" style="width:70%;" data-i18n="join.reduce.right" style="margin-left:10px;"/>
</div>
</div>
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
@ -225,9 +259,17 @@
</script>
<script type="text/x-red" data-help-name="join">
<p>Joins sequences of messages into a single message.</p>
<p>When paired with the <b>split</b> node, it will automatically join the messages
to reverse the split that was performed.</p>
<p>Joins sequences of messages into a single message. This node provides four mode for message combination:</p>
<dl>
<dt>automatic</dt>
<dd>When paired with the <b>split</b> node, it will automatically join the messages to reverse the split that was performed.</dd>
<dt>manual</dt>
<dd>It will join sequences of messages in a variety of ways.</dd>
<dt>merge sequence</dt>
<dd>It will merge incoming messages into single message using <code>topic</code> property.</dd>
<dt>reduce sequence</dt>
<dd>When paired with the <b>split</b> node, it will reduce the message sequence into single message.</dd>
</dl>
<h3>Inputs</h3>
<dl class="message-properties">
<dt class="optional">parts<span class="property-type">object</span></dt>
@ -248,6 +290,11 @@
<dd>If set, the node will send its output message in its current state.</dd>
</dl>
<h3>Details</h3>
<h4>Automatic mode</h4>
<p>When configured to join in manual mode, the node is able to join sequences of messages using <code>parts</code> property of incoming messages.</p>
<h4>Manual mode</h4>
<p>When configured to join in manual mode, the node is able to join sequences
of messages in a variety of ways.</p>
<ul>
@ -263,6 +310,50 @@
received.</p>
<p>A <i>timeout</i> can be set to trigger sending the new message using whatever has been received so far.</p>
<p>If a message is received with the <b>msg.complete</b> property set, the output message is sent.</p>
<h4>Merge Sequence mode</h4>
<p>When configured to join in merge mode, the join node can create a message based on <code>topic</code> value.</p>
<p>Input messages are merged in order specified by <b>Topics</b> value.
<p>For example, if value of <b>Topics</b> is <b>x,x,y</b>, two input messages with topic <b>x</b> and one input message with topic <b>y</b> are merged into a new message in order of arrival.</p>
<p>If "Send merged message on arrival of a new topic" check box is selected, the last messages with each topic is kept internally and output message is sent when a message with new topics arrives.</p>
<p>The merged message contains <code>payload</code> property and properties for each topic. The <code>payload</code> property represents ordered array of payload value of input messages for each topic. The property for each topic represents a payload value for single occurrence of topic or array of payload values for multiple occurrences of the topic.</p>
<h4>Reduce Sequence mode</h4>
<p>When configured to join in reduce sequence mode, following values can be specified:</p>
<dl class="message-properties">
<dt>Reduce exp</dt>
<dd>JSONata expression for reducing message group. This expression represents the accumulated result. In the expression, following special variables can be used:
<ul>
<li><code>$A</code> accumulated value, </li>
<li><code>$I</code> index of the message in a group, </li>
<li><code>$N</code> number of messages of a group.</li>
</ul>
</dd>
<dt>Initial value</dt>
<dd>
initial value of reduction.
</dd>
<dt>Fixup exp</dt>
<dd>
JSONata expression applied after reduction of a message group completed. In the expression, following special variables can be used:
<ul>
<li><code>$A</code> accumulated value, </li>
<li><code>$N</code> number of messages of a group.</li>
</ul>
</dd>
<p>Order of reduction on a message group can be specified by checkbox (<b>Evaluate in reverse order (right to left)</b>).</p>
</dl>
<p><b>Example:</b> Join node outputs an average for each input message group with the following setting:
<ul>
<li><b>Reduce exp</b>: <code>$A+payload</code></li>
<li><b>Initial value</b>: <code>0</code></li>
<li><b>Fixup exp</b>: <code>$A/$N</code></li>
</ul>
</p>
<h4>Note:</h4>
<p>This node internally keeps messages for its operation. In order to prevent unexpected memory usage, maximum number of messages kept can be specified by <code>joinMaxKeptMsgsCount</code> property in <b>settings.js</b>.</p>
</script>
<script type="text/javascript">
@ -280,7 +371,14 @@
joinerType: { value:"str"},
accumulate: { value:"false" },
timeout: {value:""},
count: {value:""}
count: {value:""},
topics: {value:[{topic:""}]},
mergeOnChange: {value:false},
reduceRight: {value:false},
reduceExp: {value:undefined},
reduceInit: {value:undefined},
reduceInitType: {value:undefined},
reduceFixup: {value:undefined}
},
inputs:1,
outputs:1,
@ -292,13 +390,90 @@
return this.name?"node_label_italic":"";
},
oneditprepare: function() {
var node = this;
var topic_str = node._("join.merge.topic");
function resizeTopics(topic) {
var newWidth = topic.width();
topic.find('.red-ui-typedInput')
.typedInput("width",newWidth-15);
}
$("#node-input-topics-container")
.css('min-height','250px').css('min-width','420px')
.editableList({
addItem: function(container,i,opt) {
if (!opt.hasOwnProperty('topic')) {
opt.topic = "";
}
var row = $('<div/>').appendTo(container);
var valueField = $('<input/>',{
class:"node-input-topic-value",
type:"text",
style:"margin-left: 5px;"
}).appendTo(row)
.typedInput({default:'str', types:['str']});
valueField.typedInput('value', opt.topic);
valueField.typedInput('type', 'str');
valueField.attr('placeholder', topic_str);
resizeTopics(container);
},
resizeItem: resizeTopics,
sortable: true,
removable: true
});
$("#node-input-mode").change(function(e) {
var val = $(this).val();
$(".node-row-custom").toggle(val==='custom');
$(".form-tips-auto").toggle(val==='auto');
$(".node-row-merge").toggle(val==='merge');
$(".node-row-reduce").toggle(val==='reduce');
$(".form-tips-auto").toggle((val==='auto') || (val==='reduce'));
if (val === "auto") {
$("#node-input-accumulate").attr('checked', false);
}
else if (val === "merge") {
var topics = node.topics;
var container = $("#node-input-topics-container");
container.editableList('empty');
for (var i=0;i<topics.length;i++) {
var topic = topics[i];
container.editableList('addItem', topic);
}
}
else if (val === "reduce") {
var jsonata_or_empty = {
value: "jsonata",
label: "expression",
icon: "red/images/typedInput/expr.png",
validate: function(v) {
try{
if(v !== "") {
jsonata(v);
}
return true;
}
catch(e){
return false;
}
},
expand:function() {
var that = this;
RED.editor.editExpression({
value: this.value().replace(/\t/g,"\n"),
complete: function(v) {
that.value(v.replace(/\n/g,"\t"));
}
})
}
};
$("#node-input-reduceExp").typedInput({types:[jsonata_or_empty]});
$("#node-input-reduceInit").typedInput({
default: 'num',
types:['flow','global','str','num','bool','json','bin','date','jsonata'],
typeField: $("#node-input-reduceInitType")
});
$("#node-input-reduceFixup").typedInput({types:[jsonata_or_empty]});
}
});
$("#node-input-build").change(function(e) {
@ -340,6 +515,27 @@
if (build !== 'object' && build !== 'merged') {
$("#node-input-accumulate").prop("checked",false);
}
var topics = $("#node-input-topics-container").editableList('items');
var node = this;
node.topics = [];
topics.each(function(i) {
var topicData = $(this).data('data');
var topic = $(this);
var vf = topic.find(".node-input-topic-value");
var value = vf.typedInput('value');
var r = {topic:value};
node.topics.push(r);
});
},
oneditresize: function(size) {
var rows = $("#dialog-form>div:not(.node-input-topics-container-row)");
var height = size.height;
for (var i=0;i<rows.size();i++) {
height -= $(rows[i]).outerHeight(true);
}
var editorRow = $("#dialog-form>div.node-input-topics-container-row");
height -= (parseInt(editorRow.css("marginTop"))+parseInt(editorRow.css("marginBottom")));
$("#node-input-topics-container").editableList('height',height);
}
});
</script>

View File

@ -232,6 +232,229 @@ module.exports = function(RED) {
RED.nodes.registerType("split",SplitNode);
var _max_kept_msgs_count = undefined;
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "joinMaxKeptMsgsCount";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}
else {
_max_kept_msgs_count = 0;
}
}
return _max_kept_msgs_count;
}
function add_to_topic(node, pending, topic, msg) {
var merge_on_change = node.merge_on_change;
if (!pending.hasOwnProperty(topic)) {
pending[topic] = [];
}
var topics = pending[topic];
topics.push(msg);
if (merge_on_change) {
var counts = node.topic_counts;
if (topics.length > counts[topic]) {
topics.shift();
node.pending_count--;
}
}
}
function compute_topic_counts(topics) {
var counts = {};
for (var topic of topics) {
counts[topic] = (counts.hasOwnProperty(topic) ? counts[topic] : 0) +1;
}
return counts;
}
function try_merge(node, pending, merge_on_change) {
var topics = node.topics;
var counts = node.topic_counts;
for(var topic of topics) {
if(!pending.hasOwnProperty(topic) ||
(pending[topic].length < counts[topic])) {
return;
}
}
var merge_on_change = node.merge_on_change;
var msgs = [];
var vals = [];
var new_msg = {payload: vals};
for (var topic of topics) {
var pmsgs = pending[topic];
var msg = pmsgs.shift();
if (merge_on_change) {
pmsgs.push(msg);
}
var pval = msg.payload;
var val = new_msg[topic];
msgs.push(msg);
vals.push(pval);
if (val instanceof Array) {
new_msg[topic].push(pval);
}
else if (val === undefined) {
new_msg[topic] = pval;
}
else {
new_msg[topic] = [val, pval]
}
}
node.send(new_msg);
if (!merge_on_change) {
node.pending_count -= topics.length;
}
}
function merge_msg(node, msg) {
var topics = node.topics;
var topic = msg.topic;
if(node.topics.indexOf(topic) >= 0) {
var pending = node.pending;
if(node.topic_counts == undefined) {
node.topic_counts = compute_topic_counts(topics)
}
add_to_topic(node, pending, topic, msg);
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
node.error(RED._("join.too-many"), msg);
}
try_merge(node, pending);
}
}
function apply_r(exp, accum, msg, index, count) {
exp.assign("I", index);
exp.assign("N", count);
exp.assign("A", accum);
return RED.util.evaluateJSONataExpression(exp, msg);
}
function apply_f(exp, accum, count) {
exp.assign("N", count);
exp.assign("A", accum);
return RED.util.evaluateJSONataExpression(exp, {});
}
function exp_or_undefined(exp) {
if((exp === "") ||
(exp === null)) {
return undefined;
}
return exp
}
function reduce_and_send_group(node, group) {
var is_right = node.reduce_right;
var flag = is_right ? -1 : 1;
var msgs = group.msgs;
var accum = node.reduce_init;
var reduce_exp = node.reduce_exp;
var reduce_fixup = node.reduce_fixup;
var count = group.count;
msgs.sort(function(x,y) {
var ix = x.parts.index;
var iy = y.parts.index;
if (ix < iy) return -flag;
if (ix > iy) return flag;
return 0;
});
for(var msg of msgs) {
accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count);
}
if(reduce_fixup !== undefined) {
accum = apply_f(reduce_fixup, accum, count);
}
node.send({payload: accum});
}
function reduce_msg(node, msg) {
if(msg.hasOwnProperty('parts')) {
var parts = msg.parts;
var pending = node.pending;
var pending_count = node.pending_count;
var gid = msg.parts.id;
if(!pending.hasOwnProperty(gid)) {
var count = undefined;
if(parts.hasOwnProperty('count')) {
count = msg.parts.count;
}
pending[gid] = {
count: count,
msgs: []
};
}
var group = pending[gid];
var msgs = group.msgs;
if(parts.hasOwnProperty('count') &&
(group.count === undefined)) {
group.count = count;
}
msgs.push(msg);
pending_count++;
if(msgs.length === group.count) {
delete pending[gid];
try {
pending_count -= msgs.length;
reduce_and_send_group(node, group);
} catch(e) {
node.error(RED._("join.errors.invalid-expr",{error:e.message})); }
}
node.pending_count = pending_count;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
node.error(RED._("join.too-many"), msg);
}
}
else {
node.send(msg);
}
}
function eval_exp(node, exp, exp_type) {
if(exp_type === "flow") {
return node.context().flow.get(exp);
}
else if(exp_type === "global") {
return node.context().global.get(exp);
}
else if(exp_type === "str") {
return exp;
}
else if(exp_type === "num") {
return Number(exp);
}
else if(exp_type === "bool") {
if (exp === 'true') {
return true;
}
else if (exp === 'false') {
return false;
}
}
else if ((exp_type === "bin") ||
(exp_type === "json")) {
return JSON.parse(exp);
}
else if(exp_type === "date") {
return Date.now();
}
else if(exp_type === "jsonata") {
var jexp = RED.util.prepareJSONataExpression(exp, node);
return RED.util.evaluateJSONataExpression(jexp, {});
}
throw new Error("unexpected initial value type");
}
function JoinNode(n) {
RED.nodes.createNode(this,n);
this.mode = n.mode||"auto";
@ -246,6 +469,22 @@ module.exports = function(RED) {
this.joiner = n.joiner||"";
this.joinerType = n.joinerType||"str";
this.reduce = (this.mode === "reduce");
if (this.reduce) {
var exp_init = n.reduceInit;
var exp_init_type = n.reduceInitType;
var exp_reduce = n.reduceExp;
var exp_fixup = exp_or_undefined(n.reduceFixup);
this.reduce_right = n.reduceRight;
try {
this.reduce_init = eval_exp(this, exp_init, exp_init_type);
this.reduce_exp = RED.util.prepareJSONataExpression(exp_reduce, this);
this.reduce_fixup = (exp_fixup !== undefined) ? RED.util.prepareJSONataExpression(exp_fixup, this) : undefined;
} catch(e) {
this.error(RED._("join.errors.invalid-expr",{error:e.message}));
}
}
if (this.joinerType === "str") {
this.joiner = this.joiner.replace(/\\n/g,"\n").replace(/\\r/g,"\r").replace(/\\t/g,"\t").replace(/\\e/g,"\e").replace(/\\f/g,"\f").replace(/\\0/g,"\0");
} else if (this.joinerType === "bin") {
@ -259,6 +498,14 @@ module.exports = function(RED) {
this.build = n.build || "array";
this.accumulate = n.accumulate || "false";
this.topics = (n.topics || []).map(function(x) { return x.topic; });
this.merge_on_change = n.mergeOnChange || false;
this.topic_counts = undefined;
this.output = n.output || "stream";
this.pending = {};
this.pending_count = 0;
//this.topic = n.topic;
var node = this;
var inflight = {};
@ -321,6 +568,10 @@ module.exports = function(RED) {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
return;
}
if (node.mode === 'merge' && !msg.hasOwnProperty("topic")) {
node.warn("Message missing msg.topic property - cannot join in 'merge' mode");
return;
}
if (node.propertyType == "full") {
property = msg;
@ -351,6 +602,14 @@ module.exports = function(RED) {
arrayLen = msg.parts.len;
propertyIndex = msg.parts.index;
}
else if (node.mode === 'merge') {
merge_msg(node, msg);
return;
}
else if (node.mode === 'reduce') {
reduce_msg(node, msg);
return;
}
else {
// Use the node configuration to identify all of the group information
partId = "_";

View File

@ -50,6 +50,7 @@ module.exports = {
// The maximum number of messages kept internally in nodes.
// Zero or undefined value means not restricting number of messages.
//sortMaxKeptMsgsCount: 0,
//joinMaxKeptMsgsCount: 0,
//batchMaxKeptMsgsCount: 0,
// To disable the option for using local files for storing keys and certificates in the TLS configuration

View File

@ -18,6 +18,7 @@ var should = require("should");
var splitNode = require("../../../../nodes/core/logic/17-split.js");
var joinNode = require("../../../../nodes/core/logic/17-split.js");
var helper = require("../../helper.js");
var RED = require("../../../../red/red.js");
describe('SPLIT node', function() {
@ -269,6 +270,7 @@ describe('JOIN node', function() {
afterEach(function() {
helper.unload();
RED.settings.joinMaxKeptMsgsCount = 0;
});
it('should be loaded', function(done) {
@ -727,6 +729,394 @@ describe('JOIN node', function() {
});
s1.receive({payload:[[1,2,3],"a\nb\nc",[7,8,9]]});
});
})
});
it('should merge messages with topics (single)', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TB"}],
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("TA");
msg.should.have.property("TB");
msg.should.have.property("payload");
msg.payload.should.be.an.Array();
msg.payload.length.should.equal(2);
count++;
if (count === 1) {
msg.TA.should.equal("a");
msg.TB.should.equal("b");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("b");
}
if (count === 2) {
msg.TA.should.equal("d");
msg.TB.should.equal("c");
msg.payload[0].should.equal("d");
msg.payload[1].should.equal("c");
done();
}
}
catch(e) { done(e); }
});
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TB"});
n1.receive({payload:"d", topic:"TA"});
});
});
it('should merge messages with topics (multiple)', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TB"}, {topic:"TA"}],
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("TA");
msg.TA.should.be.an.Array();
msg.TA.length.should.equal(2);
msg.should.have.property("TB");
msg.should.have.property("payload");
msg.payload.should.be.an.Array();
msg.payload.length.should.equal(3);
count++;
if (count === 1) {
msg.TA[0].should.equal("a");
msg.TA[1].should.equal("d");
msg.TB.should.equal("b");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("b");
msg.payload[2].should.equal("d");
}
if (count === 2) {
msg.TA[0].should.equal("e");
msg.TA[1].should.equal("f");
msg.TB.should.equal("c");
msg.payload[0].should.equal("e");
msg.payload[1].should.equal("c");
msg.payload[2].should.equal("f");
done();
}
}
catch(e) { done(e); }
});
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TB"});
n1.receive({payload:"d", topic:"TA"});
n1.receive({payload:"e", topic:"TA"});
n1.receive({payload:"f", topic:"TA"});
});
});
it('should merge messages with topics (single, send on new topic)', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TB"}],
mergeOnChange:true,
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("TA");
msg.should.have.property("TB");
msg.should.have.property("payload");
msg.payload.should.be.an.Array();
msg.payload.length.should.equal(2);
count++;
if (count === 1) {
msg.TA.should.equal("a");
msg.TB.should.equal("b");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("b");
}
if (count === 2) {
msg.TA.should.equal("a");
msg.TB.should.equal("c");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("c");
}
if (count === 3) {
msg.TA.should.equal("d");
msg.TB.should.equal("c");
msg.payload[0].should.equal("d");
msg.payload[1].should.equal("c");
done();
}
}
catch(e) { done(e); }
});
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TB"});
n1.receive({payload:"d", topic:"TA"});
});
});
it('should merge messages with topics (multiple, send on new topic)', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TB"}, {topic:"TA"}],
mergeOnChange:true,
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("TA");
msg.TA.should.be.an.Array();
msg.TA.length.should.equal(2);
msg.should.have.property("TB");
msg.should.have.property("payload");
msg.payload.should.be.an.Array();
msg.payload.length.should.equal(3);
count++;
if (count === 1) {
msg.TA[0].should.equal("a");
msg.TA[1].should.equal("c");
msg.TB.should.equal("b");
msg.payload[0].should.equal("a");
msg.payload[1].should.equal("b");
msg.payload[2].should.equal("c");
}
if (count === 2) {
msg.TA[0].should.equal("c");
msg.TA[1].should.equal("d");
msg.TB.should.equal("b");
msg.payload[0].should.equal("c");
msg.payload[1].should.equal("b");
msg.payload[2].should.equal("d");
}
if (count === 3) {
msg.TA[0].should.equal("c");
msg.TA[1].should.equal("d");
msg.TB.should.equal("e");
msg.payload[0].should.equal("c");
msg.payload[1].should.equal("e");
msg.payload[2].should.equal("d");
done();
}
}
catch(e) { done(e); }
});
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TA"});
n1.receive({payload:"d", topic:"TA"});
n1.receive({payload:"e", topic:"TB"});
});
});
it('should redece messages', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
reduceExp:"$A+payload",
reduceInit:"0",
reduceInitType:"num",
reduceFixup:undefined,
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.equal(10);
done();
}
catch(e) { done(e); }
});
n1.receive({payload:3, parts:{index:2, count:4, id:222}});
n1.receive({payload:2, parts:{index:1, count:4, id:222}});
n1.receive({payload:4, parts:{index:3, count:4, id:222}});
n1.receive({payload:1, parts:{index:0, count:4, id:222}});
});
});
it('should redece messages using $I', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
reduceExp:"$A+$I",
reduceInit:"0",
reduceInitType:"num",
reduceFixup:undefined,
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.equal(6);
done();
}
catch(e) { done(e); }
});
n1.receive({payload:3, parts:{index:2, count:4, id:222}});
n1.receive({payload:2, parts:{index:1, count:4, id:222}});
n1.receive({payload:4, parts:{index:3, count:4, id:222}});
n1.receive({payload:1, parts:{index:0, count:4, id:222}});
});
});
it('should redece messages with fixup', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
reduceExp:"$A+payload",
reduceInit:"0",
reduceInitType:"num",
reduceFixup:"$A/$N",
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.equal(2);
done();
}
catch(e) { done(e); }
});
n1.receive({payload:3, parts:{index:2, count:5, id:222}});
n1.receive({payload:2, parts:{index:1, count:5, id:222}});
n1.receive({payload:4, parts:{index:3, count:5, id:222}});
n1.receive({payload:1, parts:{index:0, count:5, id:222}});
n1.receive({payload:0, parts:{index:4, count:5, id:222}});
});
});
it('should redece messages (left)', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
reduceExp:"'(' & $A & '+' & payload & ')'",
reduceInit:"0",
reduceInitType:"str",
reduceFixup:undefined,
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.be.an.String();
msg.payload.should.equal("((((0+1)+2)+3)+4)");
done();
}
catch(e) { done(e); }
});
n1.receive({payload:'3', parts:{index:2, count:4, id:222}});
n1.receive({payload:'2', parts:{index:1, count:4, id:222}});
n1.receive({payload:'4', parts:{index:3, count:4, id:222}});
n1.receive({payload:'1', parts:{index:0, count:4, id:222}});
});
});
it('should redece messages (right)', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:true,
reduceExp:"'(' & $A & '+' & payload & ')'",
reduceInit:"0",
reduceInitType:"str",
reduceFixup:undefined,
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.be.an.String();
msg.payload.should.equal("((((0+4)+3)+2)+1)");
done();
}
catch(e) { done(e); }
});
n1.receive({payload:'3', parts:{index:2, count:4, id:222}});
n1.receive({payload:'2', parts:{index:1, count:4, id:222}});
n1.receive({payload:'4', parts:{index:3, count:4, id:222}});
n1.receive({payload:'1', parts:{index:0, count:4, id:222}});
});
});
it('should handle too many pending messages for merge mode', function(done) {
var flow = [{id:"n1", type:"join", mode:"merge",
topics:[{topic:"TA"}, {topic:"TA"}, {topic:"TB"}],
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
RED.settings.joinMaxKeptMsgsCount = 2;
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "join";
});
var evt = logEvents[0][0];
evt.should.have.property('id', "n1");
evt.should.have.property('type', "join");
evt.should.have.property('msg', "join.too-many");
done();
}, 150);
n1.receive({payload:"a", topic:"TA"});
n1.receive({payload:"b", topic:"TB"});
n1.receive({payload:"c", topic:"TB"});
n1.receive({payload:"d", topic:"TA"});
});
});
it('should handle too many pending messages for reduce mode', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
reduceExp:"$A+payload",
reduceInit:"0",
reduceInitType:"num",
reduceFixup:undefined,
wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
RED.settings.joinMaxKeptMsgsCount = 2;
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "join";
});
var evt = logEvents[0][0];
evt.should.have.property('id', "n1");
evt.should.have.property('type', "join");
evt.should.have.property('msg', "join.too-many");
done();
}, 150);
n1.receive({payload:3, parts:{index:2, count:4, id:222}});
n1.receive({payload:2, parts:{index:1, count:4, id:222}});
n1.receive({payload:4, parts:{index:3, count:4, id:222}});
n1.receive({payload:1, parts:{index:0, count:4, id:222}});
});
});
});