diff --git a/nodes/core/locales/en-US/messages.json b/nodes/core/locales/en-US/messages.json
index f77858f20..787571a56 100644
--- a/nodes/core/locales/en-US/messages.json
+++ b/nodes/core/locales/en-US/messages.json
@@ -840,6 +840,8 @@
"mode":{
"mode":"Mode",
"auto":"automatic",
+ "merge":"merge sequence",
+ "reduce":"reduce sequence",
"custom":"manual"
},
"combine":"Combine each",
@@ -861,7 +863,24 @@
"afterTimeout":"After a timeout following the first message",
"seconds":"seconds",
"complete":"After a message with the msg.complete
property set",
- "tip":"This mode assumes this node is either paired with a split node or the received messages will have a properly configured msg.parts
property."
+ "tip":"This mode assumes this node is either paired with a split node or the received messages will have a properly configured msg.parts
property.",
+ "too-many" : "too many pending messages in join node",
+ "merge": {
+ "topics-label":"Merged Topics",
+ "topics":"topics",
+ "topic" : "topic",
+ "on-change":"Send merged message on arrival of a new topic"
+ },
+ "reduce": {
+ "exp": "Reduce exp",
+ "exp-value": "exp",
+ "init": "Initial value",
+ "right": "Evaluate in reverse order (right to left)",
+ "fixup": "Fixup exp"
+ },
+ "errors": {
+ "invalid-expr": "Invalid JSONata expression: __error__"
+ }
},
"sort" : {
"key-type" : "Key type",
diff --git a/nodes/core/logic/17-split.html b/nodes/core/logic/17-split.html
index 6cc7589f0..651a4702e 100644
--- a/nodes/core/logic/17-split.html
+++ b/nodes/core/logic/17-split.html
@@ -108,7 +108,7 @@
arraySplt: {value:1},
arraySpltType: {value:"len"},
stream: {value:false},
- addname: {value:""},
+ addname: {value:""}
},
inputs:1,
outputs:1,
@@ -170,6 +170,8 @@
@@ -217,6 +219,38 @@
+
+
@@ -225,9 +259,17 @@
diff --git a/nodes/core/logic/17-split.js b/nodes/core/logic/17-split.js
index d4fdfc55b..803131671 100644
--- a/nodes/core/logic/17-split.js
+++ b/nodes/core/logic/17-split.js
@@ -232,6 +232,229 @@ module.exports = function(RED) {
RED.nodes.registerType("split",SplitNode);
+ var _max_kept_msgs_count = undefined;
+
+ function max_kept_msgs_count(node) {
+ if (_max_kept_msgs_count === undefined) {
+ var name = "joinMaxKeptMsgsCount";
+ if (RED.settings.hasOwnProperty(name)) {
+ _max_kept_msgs_count = RED.settings[name];
+ }
+ else {
+ _max_kept_msgs_count = 0;
+ }
+ }
+ return _max_kept_msgs_count;
+ }
+
+ function add_to_topic(node, pending, topic, msg) {
+ var merge_on_change = node.merge_on_change;
+ if (!pending.hasOwnProperty(topic)) {
+ pending[topic] = [];
+ }
+ var topics = pending[topic];
+ topics.push(msg);
+ if (merge_on_change) {
+ var counts = node.topic_counts;
+ if (topics.length > counts[topic]) {
+ topics.shift();
+ node.pending_count--;
+ }
+ }
+ }
+
+ function compute_topic_counts(topics) {
+ var counts = {};
+ for (var topic of topics) {
+ counts[topic] = (counts.hasOwnProperty(topic) ? counts[topic] : 0) +1;
+ }
+ return counts;
+ }
+
+ function try_merge(node, pending, merge_on_change) {
+ var topics = node.topics;
+ var counts = node.topic_counts;
+ for(var topic of topics) {
+ if(!pending.hasOwnProperty(topic) ||
+ (pending[topic].length < counts[topic])) {
+ return;
+ }
+ }
+ var merge_on_change = node.merge_on_change;
+ var msgs = [];
+ var vals = [];
+ var new_msg = {payload: vals};
+ for (var topic of topics) {
+ var pmsgs = pending[topic];
+ var msg = pmsgs.shift();
+ if (merge_on_change) {
+ pmsgs.push(msg);
+ }
+ var pval = msg.payload;
+ var val = new_msg[topic];
+ msgs.push(msg);
+ vals.push(pval);
+ if (val instanceof Array) {
+ new_msg[topic].push(pval);
+ }
+ else if (val === undefined) {
+ new_msg[topic] = pval;
+ }
+ else {
+ new_msg[topic] = [val, pval]
+ }
+ }
+ node.send(new_msg);
+ if (!merge_on_change) {
+ node.pending_count -= topics.length;
+ }
+ }
+
+ function merge_msg(node, msg) {
+ var topics = node.topics;
+ var topic = msg.topic;
+ if(node.topics.indexOf(topic) >= 0) {
+ var pending = node.pending;
+ if(node.topic_counts == undefined) {
+ node.topic_counts = compute_topic_counts(topics)
+ }
+ add_to_topic(node, pending, topic, msg);
+ node.pending_count++;
+ var max_msgs = max_kept_msgs_count(node);
+ if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
+ node.pending = {};
+ node.pending_count = 0;
+ node.error(RED._("join.too-many"), msg);
+ }
+ try_merge(node, pending);
+ }
+ }
+
+ function apply_r(exp, accum, msg, index, count) {
+ exp.assign("I", index);
+ exp.assign("N", count);
+ exp.assign("A", accum);
+ return RED.util.evaluateJSONataExpression(exp, msg);
+ }
+
+ function apply_f(exp, accum, count) {
+ exp.assign("N", count);
+ exp.assign("A", accum);
+ return RED.util.evaluateJSONataExpression(exp, {});
+ }
+
+ function exp_or_undefined(exp) {
+ if((exp === "") ||
+ (exp === null)) {
+ return undefined;
+ }
+ return exp
+ }
+
+ function reduce_and_send_group(node, group) {
+ var is_right = node.reduce_right;
+ var flag = is_right ? -1 : 1;
+ var msgs = group.msgs;
+ var accum = node.reduce_init;
+ var reduce_exp = node.reduce_exp;
+ var reduce_fixup = node.reduce_fixup;
+ var count = group.count;
+ msgs.sort(function(x,y) {
+ var ix = x.parts.index;
+ var iy = y.parts.index;
+ if (ix < iy) return -flag;
+ if (ix > iy) return flag;
+ return 0;
+ });
+ for(var msg of msgs) {
+ accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count);
+ }
+ if(reduce_fixup !== undefined) {
+ accum = apply_f(reduce_fixup, accum, count);
+ }
+ node.send({payload: accum});
+ }
+
+ function reduce_msg(node, msg) {
+ if(msg.hasOwnProperty('parts')) {
+ var parts = msg.parts;
+ var pending = node.pending;
+ var pending_count = node.pending_count;
+ var gid = msg.parts.id;
+ if(!pending.hasOwnProperty(gid)) {
+ var count = undefined;
+ if(parts.hasOwnProperty('count')) {
+ count = msg.parts.count;
+ }
+ pending[gid] = {
+ count: count,
+ msgs: []
+ };
+ }
+ var group = pending[gid];
+ var msgs = group.msgs;
+ if(parts.hasOwnProperty('count') &&
+ (group.count === undefined)) {
+ group.count = count;
+ }
+ msgs.push(msg);
+ pending_count++;
+ if(msgs.length === group.count) {
+ delete pending[gid];
+ try {
+ pending_count -= msgs.length;
+ reduce_and_send_group(node, group);
+ } catch(e) {
+ node.error(RED._("join.errors.invalid-expr",{error:e.message})); }
+ }
+ node.pending_count = pending_count;
+ var max_msgs = max_kept_msgs_count(node);
+ if ((max_msgs > 0) && (pending_count > max_msgs)) {
+ node.pending = {};
+ node.pending_count = 0;
+ node.error(RED._("join.too-many"), msg);
+ }
+ }
+ else {
+ node.send(msg);
+ }
+ }
+
+ function eval_exp(node, exp, exp_type) {
+ if(exp_type === "flow") {
+ return node.context().flow.get(exp);
+ }
+ else if(exp_type === "global") {
+ return node.context().global.get(exp);
+ }
+ else if(exp_type === "str") {
+ return exp;
+ }
+ else if(exp_type === "num") {
+ return Number(exp);
+ }
+ else if(exp_type === "bool") {
+ if (exp === 'true') {
+ return true;
+ }
+ else if (exp === 'false') {
+ return false;
+ }
+ }
+ else if ((exp_type === "bin") ||
+ (exp_type === "json")) {
+ return JSON.parse(exp);
+ }
+ else if(exp_type === "date") {
+ return Date.now();
+ }
+ else if(exp_type === "jsonata") {
+ var jexp = RED.util.prepareJSONataExpression(exp, node);
+ return RED.util.evaluateJSONataExpression(jexp, {});
+ }
+ throw new Error("unexpected initial value type");
+ }
+
function JoinNode(n) {
RED.nodes.createNode(this,n);
this.mode = n.mode||"auto";
@@ -246,6 +469,22 @@ module.exports = function(RED) {
this.joiner = n.joiner||"";
this.joinerType = n.joinerType||"str";
+ this.reduce = (this.mode === "reduce");
+ if (this.reduce) {
+ var exp_init = n.reduceInit;
+ var exp_init_type = n.reduceInitType;
+ var exp_reduce = n.reduceExp;
+ var exp_fixup = exp_or_undefined(n.reduceFixup);
+ this.reduce_right = n.reduceRight;
+ try {
+ this.reduce_init = eval_exp(this, exp_init, exp_init_type);
+ this.reduce_exp = RED.util.prepareJSONataExpression(exp_reduce, this);
+ this.reduce_fixup = (exp_fixup !== undefined) ? RED.util.prepareJSONataExpression(exp_fixup, this) : undefined;
+ } catch(e) {
+ this.error(RED._("join.errors.invalid-expr",{error:e.message}));
+ }
+ }
+
if (this.joinerType === "str") {
this.joiner = this.joiner.replace(/\\n/g,"\n").replace(/\\r/g,"\r").replace(/\\t/g,"\t").replace(/\\e/g,"\e").replace(/\\f/g,"\f").replace(/\\0/g,"\0");
} else if (this.joinerType === "bin") {
@@ -259,6 +498,14 @@ module.exports = function(RED) {
this.build = n.build || "array";
this.accumulate = n.accumulate || "false";
+
+ this.topics = (n.topics || []).map(function(x) { return x.topic; });
+ this.merge_on_change = n.mergeOnChange || false;
+ this.topic_counts = undefined;
+ this.output = n.output || "stream";
+ this.pending = {};
+ this.pending_count = 0;
+
//this.topic = n.topic;
var node = this;
var inflight = {};
@@ -321,6 +568,10 @@ module.exports = function(RED) {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
return;
}
+ if (node.mode === 'merge' && !msg.hasOwnProperty("topic")) {
+ node.warn("Message missing msg.topic property - cannot join in 'merge' mode");
+ return;
+ }
if (node.propertyType == "full") {
property = msg;
@@ -351,6 +602,14 @@ module.exports = function(RED) {
arrayLen = msg.parts.len;
propertyIndex = msg.parts.index;
}
+ else if (node.mode === 'merge') {
+ merge_msg(node, msg);
+ return;
+ }
+ else if (node.mode === 'reduce') {
+ reduce_msg(node, msg);
+ return;
+ }
else {
// Use the node configuration to identify all of the group information
partId = "_";
diff --git a/settings.js b/settings.js
index 9679c882a..ac6d0afdd 100644
--- a/settings.js
+++ b/settings.js
@@ -50,6 +50,7 @@ module.exports = {
// The maximum number of messages kept internally in nodes.
// Zero or undefined value means not restricting number of messages.
//sortMaxKeptMsgsCount: 0,
+ //joinMaxKeptMsgsCount: 0,
//batchMaxKeptMsgsCount: 0,
// To disable the option for using local files for storing keys and certificates in the TLS configuration
diff --git a/test/nodes/core/logic/17-split_spec.js b/test/nodes/core/logic/17-split_spec.js
index db092658e..e2c8aaa97 100644
--- a/test/nodes/core/logic/17-split_spec.js
+++ b/test/nodes/core/logic/17-split_spec.js
@@ -18,6 +18,7 @@ var should = require("should");
var splitNode = require("../../../../nodes/core/logic/17-split.js");
var joinNode = require("../../../../nodes/core/logic/17-split.js");
var helper = require("../../helper.js");
+var RED = require("../../../../red/red.js");
describe('SPLIT node', function() {
@@ -269,6 +270,7 @@ describe('JOIN node', function() {
afterEach(function() {
helper.unload();
+ RED.settings.joinMaxKeptMsgsCount = 0;
});
it('should be loaded', function(done) {
@@ -727,6 +729,394 @@ describe('JOIN node', function() {
});
s1.receive({payload:[[1,2,3],"a\nb\nc",[7,8,9]]});
});
- })
+ });
+
+ it('should merge messages with topics (single)', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"merge",
+ topics:[{topic:"TA"}, {topic:"TB"}],
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ var n2 = helper.getNode("n2");
+ var count = 0;
+ n2.on("input", function(msg) {
+ try {
+ msg.should.have.property("TA");
+ msg.should.have.property("TB");
+ msg.should.have.property("payload");
+ msg.payload.should.be.an.Array();
+ msg.payload.length.should.equal(2);
+ count++;
+ if (count === 1) {
+ msg.TA.should.equal("a");
+ msg.TB.should.equal("b");
+ msg.payload[0].should.equal("a");
+ msg.payload[1].should.equal("b");
+ }
+ if (count === 2) {
+ msg.TA.should.equal("d");
+ msg.TB.should.equal("c");
+ msg.payload[0].should.equal("d");
+ msg.payload[1].should.equal("c");
+ done();
+ }
+ }
+ catch(e) { done(e); }
+ });
+ n1.receive({payload:"a", topic:"TA"});
+ n1.receive({payload:"b", topic:"TB"});
+ n1.receive({payload:"c", topic:"TB"});
+ n1.receive({payload:"d", topic:"TA"});
+ });
+ });
+
+ it('should merge messages with topics (multiple)', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"merge",
+ topics:[{topic:"TA"}, {topic:"TB"}, {topic:"TA"}],
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ var n2 = helper.getNode("n2");
+ var count = 0;
+ n2.on("input", function(msg) {
+ try {
+ msg.should.have.property("TA");
+ msg.TA.should.be.an.Array();
+ msg.TA.length.should.equal(2);
+ msg.should.have.property("TB");
+ msg.should.have.property("payload");
+ msg.payload.should.be.an.Array();
+ msg.payload.length.should.equal(3);
+ count++;
+ if (count === 1) {
+ msg.TA[0].should.equal("a");
+ msg.TA[1].should.equal("d");
+ msg.TB.should.equal("b");
+ msg.payload[0].should.equal("a");
+ msg.payload[1].should.equal("b");
+ msg.payload[2].should.equal("d");
+ }
+ if (count === 2) {
+ msg.TA[0].should.equal("e");
+ msg.TA[1].should.equal("f");
+ msg.TB.should.equal("c");
+ msg.payload[0].should.equal("e");
+ msg.payload[1].should.equal("c");
+ msg.payload[2].should.equal("f");
+ done();
+ }
+ }
+ catch(e) { done(e); }
+ });
+ n1.receive({payload:"a", topic:"TA"});
+ n1.receive({payload:"b", topic:"TB"});
+ n1.receive({payload:"c", topic:"TB"});
+ n1.receive({payload:"d", topic:"TA"});
+ n1.receive({payload:"e", topic:"TA"});
+ n1.receive({payload:"f", topic:"TA"});
+ });
+ });
+
+ it('should merge messages with topics (single, send on new topic)', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"merge",
+ topics:[{topic:"TA"}, {topic:"TB"}],
+ mergeOnChange:true,
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ var n2 = helper.getNode("n2");
+ var count = 0;
+ n2.on("input", function(msg) {
+ try {
+ msg.should.have.property("TA");
+ msg.should.have.property("TB");
+ msg.should.have.property("payload");
+ msg.payload.should.be.an.Array();
+ msg.payload.length.should.equal(2);
+ count++;
+ if (count === 1) {
+ msg.TA.should.equal("a");
+ msg.TB.should.equal("b");
+ msg.payload[0].should.equal("a");
+ msg.payload[1].should.equal("b");
+ }
+ if (count === 2) {
+ msg.TA.should.equal("a");
+ msg.TB.should.equal("c");
+ msg.payload[0].should.equal("a");
+ msg.payload[1].should.equal("c");
+ }
+ if (count === 3) {
+ msg.TA.should.equal("d");
+ msg.TB.should.equal("c");
+ msg.payload[0].should.equal("d");
+ msg.payload[1].should.equal("c");
+ done();
+ }
+ }
+ catch(e) { done(e); }
+ });
+ n1.receive({payload:"a", topic:"TA"});
+ n1.receive({payload:"b", topic:"TB"});
+ n1.receive({payload:"c", topic:"TB"});
+ n1.receive({payload:"d", topic:"TA"});
+ });
+ });
+
+ it('should merge messages with topics (multiple, send on new topic)', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"merge",
+ topics:[{topic:"TA"}, {topic:"TB"}, {topic:"TA"}],
+ mergeOnChange:true,
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ var n2 = helper.getNode("n2");
+ var count = 0;
+ n2.on("input", function(msg) {
+ try {
+ msg.should.have.property("TA");
+ msg.TA.should.be.an.Array();
+ msg.TA.length.should.equal(2);
+ msg.should.have.property("TB");
+ msg.should.have.property("payload");
+ msg.payload.should.be.an.Array();
+ msg.payload.length.should.equal(3);
+ count++;
+ if (count === 1) {
+ msg.TA[0].should.equal("a");
+ msg.TA[1].should.equal("c");
+ msg.TB.should.equal("b");
+ msg.payload[0].should.equal("a");
+ msg.payload[1].should.equal("b");
+ msg.payload[2].should.equal("c");
+ }
+ if (count === 2) {
+ msg.TA[0].should.equal("c");
+ msg.TA[1].should.equal("d");
+ msg.TB.should.equal("b");
+ msg.payload[0].should.equal("c");
+ msg.payload[1].should.equal("b");
+ msg.payload[2].should.equal("d");
+ }
+ if (count === 3) {
+ msg.TA[0].should.equal("c");
+ msg.TA[1].should.equal("d");
+ msg.TB.should.equal("e");
+ msg.payload[0].should.equal("c");
+ msg.payload[1].should.equal("e");
+ msg.payload[2].should.equal("d");
+ done();
+ }
+ }
+ catch(e) { done(e); }
+ });
+ n1.receive({payload:"a", topic:"TA"});
+ n1.receive({payload:"b", topic:"TB"});
+ n1.receive({payload:"c", topic:"TA"});
+ n1.receive({payload:"d", topic:"TA"});
+ n1.receive({payload:"e", topic:"TB"});
+ });
+ });
+
+ it('should redece messages', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"reduce",
+ reduceRight:false,
+ reduceExp:"$A+payload",
+ reduceInit:"0",
+ reduceInitType:"num",
+ reduceFixup:undefined,
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ var n2 = helper.getNode("n2");
+ var count = 0;
+ n2.on("input", function(msg) {
+ try {
+ msg.should.have.property("payload");
+ msg.payload.should.equal(10);
+ done();
+ }
+ catch(e) { done(e); }
+ });
+ n1.receive({payload:3, parts:{index:2, count:4, id:222}});
+ n1.receive({payload:2, parts:{index:1, count:4, id:222}});
+ n1.receive({payload:4, parts:{index:3, count:4, id:222}});
+ n1.receive({payload:1, parts:{index:0, count:4, id:222}});
+ });
+ });
+
+ it('should redece messages using $I', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"reduce",
+ reduceRight:false,
+ reduceExp:"$A+$I",
+ reduceInit:"0",
+ reduceInitType:"num",
+ reduceFixup:undefined,
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ var n2 = helper.getNode("n2");
+ var count = 0;
+ n2.on("input", function(msg) {
+ try {
+ msg.should.have.property("payload");
+ msg.payload.should.equal(6);
+ done();
+ }
+ catch(e) { done(e); }
+ });
+ n1.receive({payload:3, parts:{index:2, count:4, id:222}});
+ n1.receive({payload:2, parts:{index:1, count:4, id:222}});
+ n1.receive({payload:4, parts:{index:3, count:4, id:222}});
+ n1.receive({payload:1, parts:{index:0, count:4, id:222}});
+ });
+ });
+
+ it('should redece messages with fixup', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"reduce",
+ reduceRight:false,
+ reduceExp:"$A+payload",
+ reduceInit:"0",
+ reduceInitType:"num",
+ reduceFixup:"$A/$N",
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ var n2 = helper.getNode("n2");
+ var count = 0;
+ n2.on("input", function(msg) {
+ try {
+ msg.should.have.property("payload");
+ msg.payload.should.equal(2);
+ done();
+ }
+ catch(e) { done(e); }
+ });
+ n1.receive({payload:3, parts:{index:2, count:5, id:222}});
+ n1.receive({payload:2, parts:{index:1, count:5, id:222}});
+ n1.receive({payload:4, parts:{index:3, count:5, id:222}});
+ n1.receive({payload:1, parts:{index:0, count:5, id:222}});
+ n1.receive({payload:0, parts:{index:4, count:5, id:222}});
+ });
+ });
+
+ it('should redece messages (left)', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"reduce",
+ reduceRight:false,
+ reduceExp:"'(' & $A & '+' & payload & ')'",
+ reduceInit:"0",
+ reduceInitType:"str",
+ reduceFixup:undefined,
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ var n2 = helper.getNode("n2");
+ var count = 0;
+ n2.on("input", function(msg) {
+ try {
+ msg.should.have.property("payload");
+ msg.payload.should.be.an.String();
+ msg.payload.should.equal("((((0+1)+2)+3)+4)");
+ done();
+ }
+ catch(e) { done(e); }
+ });
+ n1.receive({payload:'3', parts:{index:2, count:4, id:222}});
+ n1.receive({payload:'2', parts:{index:1, count:4, id:222}});
+ n1.receive({payload:'4', parts:{index:3, count:4, id:222}});
+ n1.receive({payload:'1', parts:{index:0, count:4, id:222}});
+ });
+ });
+
+ it('should redece messages (right)', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"reduce",
+ reduceRight:true,
+ reduceExp:"'(' & $A & '+' & payload & ')'",
+ reduceInit:"0",
+ reduceInitType:"str",
+ reduceFixup:undefined,
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ var n2 = helper.getNode("n2");
+ var count = 0;
+ n2.on("input", function(msg) {
+ try {
+ msg.should.have.property("payload");
+ msg.payload.should.be.an.String();
+ msg.payload.should.equal("((((0+4)+3)+2)+1)");
+ done();
+ }
+ catch(e) { done(e); }
+ });
+ n1.receive({payload:'3', parts:{index:2, count:4, id:222}});
+ n1.receive({payload:'2', parts:{index:1, count:4, id:222}});
+ n1.receive({payload:'4', parts:{index:3, count:4, id:222}});
+ n1.receive({payload:'1', parts:{index:0, count:4, id:222}});
+ });
+ });
+
+ it('should handle too many pending messages for merge mode', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"merge",
+ topics:[{topic:"TA"}, {topic:"TA"}, {topic:"TB"}],
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ RED.settings.joinMaxKeptMsgsCount = 2;
+ setTimeout(function() {
+ var logEvents = helper.log().args.filter(function (evt) {
+ return evt[0].type == "join";
+ });
+ var evt = logEvents[0][0];
+ evt.should.have.property('id', "n1");
+ evt.should.have.property('type', "join");
+ evt.should.have.property('msg', "join.too-many");
+ done();
+ }, 150);
+ n1.receive({payload:"a", topic:"TA"});
+ n1.receive({payload:"b", topic:"TB"});
+ n1.receive({payload:"c", topic:"TB"});
+ n1.receive({payload:"d", topic:"TA"});
+ });
+ });
+
+ it('should handle too many pending messages for reduce mode', function(done) {
+ var flow = [{id:"n1", type:"join", mode:"reduce",
+ reduceRight:false,
+ reduceExp:"$A+payload",
+ reduceInit:"0",
+ reduceInitType:"num",
+ reduceFixup:undefined,
+ wires:[["n2"]]},
+ {id:"n2", type:"helper"}];
+ helper.load(joinNode, flow, function() {
+ var n1 = helper.getNode("n1");
+ RED.settings.joinMaxKeptMsgsCount = 2;
+ setTimeout(function() {
+ var logEvents = helper.log().args.filter(function (evt) {
+ return evt[0].type == "join";
+ });
+ var evt = logEvents[0][0];
+ evt.should.have.property('id', "n1");
+ evt.should.have.property('type', "join");
+ evt.should.have.property('msg', "join.too-many");
+ done();
+ }, 150);
+ n1.receive({payload:3, parts:{index:2, count:4, id:222}});
+ n1.receive({payload:2, parts:{index:1, count:4, id:222}});
+ n1.receive({payload:4, parts:{index:3, count:4, id:222}});
+ n1.receive({payload:1, parts:{index:0, count:4, id:222}});
+ });
+ });
});