From 218794be77d389930b93852d2b471062d939b4a8 Mon Sep 17 00:00:00 2001 From: Hiroyasu Nishiyama Date: Wed, 17 Jan 2018 19:08:23 +0900 Subject: [PATCH] Initial support of merge & reduce mode for JOIN node (#1546) * initial support of merge mode of JOIN node * initial support of reduce mode of JOIN node * update info document of JOIN node * add tests for merge & reduce mode of JOIN node * tidy tabs & spaces * add test for too many pending messages & related fixes * add an test for reduce mode of JOIN node * change order of modes of SWITCH node * add initial topics entry of merge mode * fixed descriptions on "reduce right" checkbox * fixed update of typedInput field of reduce mode * fixed a typo in info document of JOIN node * allow empty string in JSONata input field of reduce mode * fixed a typo * fixed error in reduce mode description --- nodes/core/locales/en-US/messages.json | 21 +- nodes/core/logic/17-split.html | 208 ++++++++++++- nodes/core/logic/17-split.js | 259 ++++++++++++++++ settings.js | 1 + test/nodes/core/logic/17-split_spec.js | 392 ++++++++++++++++++++++++- 5 files changed, 873 insertions(+), 8 deletions(-) diff --git a/nodes/core/locales/en-US/messages.json b/nodes/core/locales/en-US/messages.json index f77858f20..787571a56 100644 --- a/nodes/core/locales/en-US/messages.json +++ b/nodes/core/locales/en-US/messages.json @@ -840,6 +840,8 @@ "mode":{ "mode":"Mode", "auto":"automatic", + "merge":"merge sequence", + "reduce":"reduce sequence", "custom":"manual" }, "combine":"Combine each", @@ -861,7 +863,24 @@ "afterTimeout":"After a timeout following the first message", "seconds":"seconds", "complete":"After a message with the msg.complete property set", - "tip":"This mode assumes this node is either paired with a split node or the received messages will have a properly configured msg.parts property." + "tip":"This mode assumes this node is either paired with a split node or the received messages will have a properly configured msg.parts property.", + "too-many" : "too many pending messages in join node", + "merge": { + "topics-label":"Merged Topics", + "topics":"topics", + "topic" : "topic", + "on-change":"Send merged message on arrival of a new topic" + }, + "reduce": { + "exp": "Reduce exp", + "exp-value": "exp", + "init": "Initial value", + "right": "Evaluate in reverse order (right to left)", + "fixup": "Fixup exp" + }, + "errors": { + "invalid-expr": "Invalid JSONata expression: __error__" + } }, "sort" : { "key-type" : "Key type", diff --git a/nodes/core/logic/17-split.html b/nodes/core/logic/17-split.html index 6cc7589f0..651a4702e 100644 --- a/nodes/core/logic/17-split.html +++ b/nodes/core/logic/17-split.html @@ -108,7 +108,7 @@ arraySplt: {value:1}, arraySpltType: {value:"len"}, stream: {value:false}, - addname: {value:""}, + addname: {value:""} }, inputs:1, outputs:1, @@ -170,6 +170,8 @@
@@ -217,6 +219,38 @@
+
+
+ +
+
    +
    +
    +
    + + +
    +
    +
    +
    + + +
    +
    + + + +
    +
    + + +
    +
    + + +
    +
    @@ -225,9 +259,17 @@ diff --git a/nodes/core/logic/17-split.js b/nodes/core/logic/17-split.js index d4fdfc55b..803131671 100644 --- a/nodes/core/logic/17-split.js +++ b/nodes/core/logic/17-split.js @@ -232,6 +232,229 @@ module.exports = function(RED) { RED.nodes.registerType("split",SplitNode); + var _max_kept_msgs_count = undefined; + + function max_kept_msgs_count(node) { + if (_max_kept_msgs_count === undefined) { + var name = "joinMaxKeptMsgsCount"; + if (RED.settings.hasOwnProperty(name)) { + _max_kept_msgs_count = RED.settings[name]; + } + else { + _max_kept_msgs_count = 0; + } + } + return _max_kept_msgs_count; + } + + function add_to_topic(node, pending, topic, msg) { + var merge_on_change = node.merge_on_change; + if (!pending.hasOwnProperty(topic)) { + pending[topic] = []; + } + var topics = pending[topic]; + topics.push(msg); + if (merge_on_change) { + var counts = node.topic_counts; + if (topics.length > counts[topic]) { + topics.shift(); + node.pending_count--; + } + } + } + + function compute_topic_counts(topics) { + var counts = {}; + for (var topic of topics) { + counts[topic] = (counts.hasOwnProperty(topic) ? counts[topic] : 0) +1; + } + return counts; + } + + function try_merge(node, pending, merge_on_change) { + var topics = node.topics; + var counts = node.topic_counts; + for(var topic of topics) { + if(!pending.hasOwnProperty(topic) || + (pending[topic].length < counts[topic])) { + return; + } + } + var merge_on_change = node.merge_on_change; + var msgs = []; + var vals = []; + var new_msg = {payload: vals}; + for (var topic of topics) { + var pmsgs = pending[topic]; + var msg = pmsgs.shift(); + if (merge_on_change) { + pmsgs.push(msg); + } + var pval = msg.payload; + var val = new_msg[topic]; + msgs.push(msg); + vals.push(pval); + if (val instanceof Array) { + new_msg[topic].push(pval); + } + else if (val === undefined) { + new_msg[topic] = pval; + } + else { + new_msg[topic] = [val, pval] + } + } + node.send(new_msg); + if (!merge_on_change) { + node.pending_count -= topics.length; + } + } + + function merge_msg(node, msg) { + var topics = node.topics; + var topic = msg.topic; + if(node.topics.indexOf(topic) >= 0) { + var pending = node.pending; + if(node.topic_counts == undefined) { + node.topic_counts = compute_topic_counts(topics) + } + add_to_topic(node, pending, topic, msg); + node.pending_count++; + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (node.pending_count > max_msgs)) { + node.pending = {}; + node.pending_count = 0; + node.error(RED._("join.too-many"), msg); + } + try_merge(node, pending); + } + } + + function apply_r(exp, accum, msg, index, count) { + exp.assign("I", index); + exp.assign("N", count); + exp.assign("A", accum); + return RED.util.evaluateJSONataExpression(exp, msg); + } + + function apply_f(exp, accum, count) { + exp.assign("N", count); + exp.assign("A", accum); + return RED.util.evaluateJSONataExpression(exp, {}); + } + + function exp_or_undefined(exp) { + if((exp === "") || + (exp === null)) { + return undefined; + } + return exp + } + + function reduce_and_send_group(node, group) { + var is_right = node.reduce_right; + var flag = is_right ? -1 : 1; + var msgs = group.msgs; + var accum = node.reduce_init; + var reduce_exp = node.reduce_exp; + var reduce_fixup = node.reduce_fixup; + var count = group.count; + msgs.sort(function(x,y) { + var ix = x.parts.index; + var iy = y.parts.index; + if (ix < iy) return -flag; + if (ix > iy) return flag; + return 0; + }); + for(var msg of msgs) { + accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count); + } + if(reduce_fixup !== undefined) { + accum = apply_f(reduce_fixup, accum, count); + } + node.send({payload: accum}); + } + + function reduce_msg(node, msg) { + if(msg.hasOwnProperty('parts')) { + var parts = msg.parts; + var pending = node.pending; + var pending_count = node.pending_count; + var gid = msg.parts.id; + if(!pending.hasOwnProperty(gid)) { + var count = undefined; + if(parts.hasOwnProperty('count')) { + count = msg.parts.count; + } + pending[gid] = { + count: count, + msgs: [] + }; + } + var group = pending[gid]; + var msgs = group.msgs; + if(parts.hasOwnProperty('count') && + (group.count === undefined)) { + group.count = count; + } + msgs.push(msg); + pending_count++; + if(msgs.length === group.count) { + delete pending[gid]; + try { + pending_count -= msgs.length; + reduce_and_send_group(node, group); + } catch(e) { + node.error(RED._("join.errors.invalid-expr",{error:e.message})); } + } + node.pending_count = pending_count; + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (pending_count > max_msgs)) { + node.pending = {}; + node.pending_count = 0; + node.error(RED._("join.too-many"), msg); + } + } + else { + node.send(msg); + } + } + + function eval_exp(node, exp, exp_type) { + if(exp_type === "flow") { + return node.context().flow.get(exp); + } + else if(exp_type === "global") { + return node.context().global.get(exp); + } + else if(exp_type === "str") { + return exp; + } + else if(exp_type === "num") { + return Number(exp); + } + else if(exp_type === "bool") { + if (exp === 'true') { + return true; + } + else if (exp === 'false') { + return false; + } + } + else if ((exp_type === "bin") || + (exp_type === "json")) { + return JSON.parse(exp); + } + else if(exp_type === "date") { + return Date.now(); + } + else if(exp_type === "jsonata") { + var jexp = RED.util.prepareJSONataExpression(exp, node); + return RED.util.evaluateJSONataExpression(jexp, {}); + } + throw new Error("unexpected initial value type"); + } + function JoinNode(n) { RED.nodes.createNode(this,n); this.mode = n.mode||"auto"; @@ -246,6 +469,22 @@ module.exports = function(RED) { this.joiner = n.joiner||""; this.joinerType = n.joinerType||"str"; + this.reduce = (this.mode === "reduce"); + if (this.reduce) { + var exp_init = n.reduceInit; + var exp_init_type = n.reduceInitType; + var exp_reduce = n.reduceExp; + var exp_fixup = exp_or_undefined(n.reduceFixup); + this.reduce_right = n.reduceRight; + try { + this.reduce_init = eval_exp(this, exp_init, exp_init_type); + this.reduce_exp = RED.util.prepareJSONataExpression(exp_reduce, this); + this.reduce_fixup = (exp_fixup !== undefined) ? RED.util.prepareJSONataExpression(exp_fixup, this) : undefined; + } catch(e) { + this.error(RED._("join.errors.invalid-expr",{error:e.message})); + } + } + if (this.joinerType === "str") { this.joiner = this.joiner.replace(/\\n/g,"\n").replace(/\\r/g,"\r").replace(/\\t/g,"\t").replace(/\\e/g,"\e").replace(/\\f/g,"\f").replace(/\\0/g,"\0"); } else if (this.joinerType === "bin") { @@ -259,6 +498,14 @@ module.exports = function(RED) { this.build = n.build || "array"; this.accumulate = n.accumulate || "false"; + + this.topics = (n.topics || []).map(function(x) { return x.topic; }); + this.merge_on_change = n.mergeOnChange || false; + this.topic_counts = undefined; + this.output = n.output || "stream"; + this.pending = {}; + this.pending_count = 0; + //this.topic = n.topic; var node = this; var inflight = {}; @@ -321,6 +568,10 @@ module.exports = function(RED) { node.warn("Message missing msg.parts property - cannot join in 'auto' mode") return; } + if (node.mode === 'merge' && !msg.hasOwnProperty("topic")) { + node.warn("Message missing msg.topic property - cannot join in 'merge' mode"); + return; + } if (node.propertyType == "full") { property = msg; @@ -351,6 +602,14 @@ module.exports = function(RED) { arrayLen = msg.parts.len; propertyIndex = msg.parts.index; } + else if (node.mode === 'merge') { + merge_msg(node, msg); + return; + } + else if (node.mode === 'reduce') { + reduce_msg(node, msg); + return; + } else { // Use the node configuration to identify all of the group information partId = "_"; diff --git a/settings.js b/settings.js index 9679c882a..ac6d0afdd 100644 --- a/settings.js +++ b/settings.js @@ -50,6 +50,7 @@ module.exports = { // The maximum number of messages kept internally in nodes. // Zero or undefined value means not restricting number of messages. //sortMaxKeptMsgsCount: 0, + //joinMaxKeptMsgsCount: 0, //batchMaxKeptMsgsCount: 0, // To disable the option for using local files for storing keys and certificates in the TLS configuration diff --git a/test/nodes/core/logic/17-split_spec.js b/test/nodes/core/logic/17-split_spec.js index db092658e..e2c8aaa97 100644 --- a/test/nodes/core/logic/17-split_spec.js +++ b/test/nodes/core/logic/17-split_spec.js @@ -18,6 +18,7 @@ var should = require("should"); var splitNode = require("../../../../nodes/core/logic/17-split.js"); var joinNode = require("../../../../nodes/core/logic/17-split.js"); var helper = require("../../helper.js"); +var RED = require("../../../../red/red.js"); describe('SPLIT node', function() { @@ -269,6 +270,7 @@ describe('JOIN node', function() { afterEach(function() { helper.unload(); + RED.settings.joinMaxKeptMsgsCount = 0; }); it('should be loaded', function(done) { @@ -727,6 +729,394 @@ describe('JOIN node', function() { }); s1.receive({payload:[[1,2,3],"a\nb\nc",[7,8,9]]}); }); - }) + }); + + it('should merge messages with topics (single)', function(done) { + var flow = [{id:"n1", type:"join", mode:"merge", + topics:[{topic:"TA"}, {topic:"TB"}], + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var count = 0; + n2.on("input", function(msg) { + try { + msg.should.have.property("TA"); + msg.should.have.property("TB"); + msg.should.have.property("payload"); + msg.payload.should.be.an.Array(); + msg.payload.length.should.equal(2); + count++; + if (count === 1) { + msg.TA.should.equal("a"); + msg.TB.should.equal("b"); + msg.payload[0].should.equal("a"); + msg.payload[1].should.equal("b"); + } + if (count === 2) { + msg.TA.should.equal("d"); + msg.TB.should.equal("c"); + msg.payload[0].should.equal("d"); + msg.payload[1].should.equal("c"); + done(); + } + } + catch(e) { done(e); } + }); + n1.receive({payload:"a", topic:"TA"}); + n1.receive({payload:"b", topic:"TB"}); + n1.receive({payload:"c", topic:"TB"}); + n1.receive({payload:"d", topic:"TA"}); + }); + }); + + it('should merge messages with topics (multiple)', function(done) { + var flow = [{id:"n1", type:"join", mode:"merge", + topics:[{topic:"TA"}, {topic:"TB"}, {topic:"TA"}], + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var count = 0; + n2.on("input", function(msg) { + try { + msg.should.have.property("TA"); + msg.TA.should.be.an.Array(); + msg.TA.length.should.equal(2); + msg.should.have.property("TB"); + msg.should.have.property("payload"); + msg.payload.should.be.an.Array(); + msg.payload.length.should.equal(3); + count++; + if (count === 1) { + msg.TA[0].should.equal("a"); + msg.TA[1].should.equal("d"); + msg.TB.should.equal("b"); + msg.payload[0].should.equal("a"); + msg.payload[1].should.equal("b"); + msg.payload[2].should.equal("d"); + } + if (count === 2) { + msg.TA[0].should.equal("e"); + msg.TA[1].should.equal("f"); + msg.TB.should.equal("c"); + msg.payload[0].should.equal("e"); + msg.payload[1].should.equal("c"); + msg.payload[2].should.equal("f"); + done(); + } + } + catch(e) { done(e); } + }); + n1.receive({payload:"a", topic:"TA"}); + n1.receive({payload:"b", topic:"TB"}); + n1.receive({payload:"c", topic:"TB"}); + n1.receive({payload:"d", topic:"TA"}); + n1.receive({payload:"e", topic:"TA"}); + n1.receive({payload:"f", topic:"TA"}); + }); + }); + + it('should merge messages with topics (single, send on new topic)', function(done) { + var flow = [{id:"n1", type:"join", mode:"merge", + topics:[{topic:"TA"}, {topic:"TB"}], + mergeOnChange:true, + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var count = 0; + n2.on("input", function(msg) { + try { + msg.should.have.property("TA"); + msg.should.have.property("TB"); + msg.should.have.property("payload"); + msg.payload.should.be.an.Array(); + msg.payload.length.should.equal(2); + count++; + if (count === 1) { + msg.TA.should.equal("a"); + msg.TB.should.equal("b"); + msg.payload[0].should.equal("a"); + msg.payload[1].should.equal("b"); + } + if (count === 2) { + msg.TA.should.equal("a"); + msg.TB.should.equal("c"); + msg.payload[0].should.equal("a"); + msg.payload[1].should.equal("c"); + } + if (count === 3) { + msg.TA.should.equal("d"); + msg.TB.should.equal("c"); + msg.payload[0].should.equal("d"); + msg.payload[1].should.equal("c"); + done(); + } + } + catch(e) { done(e); } + }); + n1.receive({payload:"a", topic:"TA"}); + n1.receive({payload:"b", topic:"TB"}); + n1.receive({payload:"c", topic:"TB"}); + n1.receive({payload:"d", topic:"TA"}); + }); + }); + + it('should merge messages with topics (multiple, send on new topic)', function(done) { + var flow = [{id:"n1", type:"join", mode:"merge", + topics:[{topic:"TA"}, {topic:"TB"}, {topic:"TA"}], + mergeOnChange:true, + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var count = 0; + n2.on("input", function(msg) { + try { + msg.should.have.property("TA"); + msg.TA.should.be.an.Array(); + msg.TA.length.should.equal(2); + msg.should.have.property("TB"); + msg.should.have.property("payload"); + msg.payload.should.be.an.Array(); + msg.payload.length.should.equal(3); + count++; + if (count === 1) { + msg.TA[0].should.equal("a"); + msg.TA[1].should.equal("c"); + msg.TB.should.equal("b"); + msg.payload[0].should.equal("a"); + msg.payload[1].should.equal("b"); + msg.payload[2].should.equal("c"); + } + if (count === 2) { + msg.TA[0].should.equal("c"); + msg.TA[1].should.equal("d"); + msg.TB.should.equal("b"); + msg.payload[0].should.equal("c"); + msg.payload[1].should.equal("b"); + msg.payload[2].should.equal("d"); + } + if (count === 3) { + msg.TA[0].should.equal("c"); + msg.TA[1].should.equal("d"); + msg.TB.should.equal("e"); + msg.payload[0].should.equal("c"); + msg.payload[1].should.equal("e"); + msg.payload[2].should.equal("d"); + done(); + } + } + catch(e) { done(e); } + }); + n1.receive({payload:"a", topic:"TA"}); + n1.receive({payload:"b", topic:"TB"}); + n1.receive({payload:"c", topic:"TA"}); + n1.receive({payload:"d", topic:"TA"}); + n1.receive({payload:"e", topic:"TB"}); + }); + }); + + it('should redece messages', function(done) { + var flow = [{id:"n1", type:"join", mode:"reduce", + reduceRight:false, + reduceExp:"$A+payload", + reduceInit:"0", + reduceInitType:"num", + reduceFixup:undefined, + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var count = 0; + n2.on("input", function(msg) { + try { + msg.should.have.property("payload"); + msg.payload.should.equal(10); + done(); + } + catch(e) { done(e); } + }); + n1.receive({payload:3, parts:{index:2, count:4, id:222}}); + n1.receive({payload:2, parts:{index:1, count:4, id:222}}); + n1.receive({payload:4, parts:{index:3, count:4, id:222}}); + n1.receive({payload:1, parts:{index:0, count:4, id:222}}); + }); + }); + + it('should redece messages using $I', function(done) { + var flow = [{id:"n1", type:"join", mode:"reduce", + reduceRight:false, + reduceExp:"$A+$I", + reduceInit:"0", + reduceInitType:"num", + reduceFixup:undefined, + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var count = 0; + n2.on("input", function(msg) { + try { + msg.should.have.property("payload"); + msg.payload.should.equal(6); + done(); + } + catch(e) { done(e); } + }); + n1.receive({payload:3, parts:{index:2, count:4, id:222}}); + n1.receive({payload:2, parts:{index:1, count:4, id:222}}); + n1.receive({payload:4, parts:{index:3, count:4, id:222}}); + n1.receive({payload:1, parts:{index:0, count:4, id:222}}); + }); + }); + + it('should redece messages with fixup', function(done) { + var flow = [{id:"n1", type:"join", mode:"reduce", + reduceRight:false, + reduceExp:"$A+payload", + reduceInit:"0", + reduceInitType:"num", + reduceFixup:"$A/$N", + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var count = 0; + n2.on("input", function(msg) { + try { + msg.should.have.property("payload"); + msg.payload.should.equal(2); + done(); + } + catch(e) { done(e); } + }); + n1.receive({payload:3, parts:{index:2, count:5, id:222}}); + n1.receive({payload:2, parts:{index:1, count:5, id:222}}); + n1.receive({payload:4, parts:{index:3, count:5, id:222}}); + n1.receive({payload:1, parts:{index:0, count:5, id:222}}); + n1.receive({payload:0, parts:{index:4, count:5, id:222}}); + }); + }); + + it('should redece messages (left)', function(done) { + var flow = [{id:"n1", type:"join", mode:"reduce", + reduceRight:false, + reduceExp:"'(' & $A & '+' & payload & ')'", + reduceInit:"0", + reduceInitType:"str", + reduceFixup:undefined, + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var count = 0; + n2.on("input", function(msg) { + try { + msg.should.have.property("payload"); + msg.payload.should.be.an.String(); + msg.payload.should.equal("((((0+1)+2)+3)+4)"); + done(); + } + catch(e) { done(e); } + }); + n1.receive({payload:'3', parts:{index:2, count:4, id:222}}); + n1.receive({payload:'2', parts:{index:1, count:4, id:222}}); + n1.receive({payload:'4', parts:{index:3, count:4, id:222}}); + n1.receive({payload:'1', parts:{index:0, count:4, id:222}}); + }); + }); + + it('should redece messages (right)', function(done) { + var flow = [{id:"n1", type:"join", mode:"reduce", + reduceRight:true, + reduceExp:"'(' & $A & '+' & payload & ')'", + reduceInit:"0", + reduceInitType:"str", + reduceFixup:undefined, + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var count = 0; + n2.on("input", function(msg) { + try { + msg.should.have.property("payload"); + msg.payload.should.be.an.String(); + msg.payload.should.equal("((((0+4)+3)+2)+1)"); + done(); + } + catch(e) { done(e); } + }); + n1.receive({payload:'3', parts:{index:2, count:4, id:222}}); + n1.receive({payload:'2', parts:{index:1, count:4, id:222}}); + n1.receive({payload:'4', parts:{index:3, count:4, id:222}}); + n1.receive({payload:'1', parts:{index:0, count:4, id:222}}); + }); + }); + + it('should handle too many pending messages for merge mode', function(done) { + var flow = [{id:"n1", type:"join", mode:"merge", + topics:[{topic:"TA"}, {topic:"TA"}, {topic:"TB"}], + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + RED.settings.joinMaxKeptMsgsCount = 2; + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "join"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "join"); + evt.should.have.property('msg', "join.too-many"); + done(); + }, 150); + n1.receive({payload:"a", topic:"TA"}); + n1.receive({payload:"b", topic:"TB"}); + n1.receive({payload:"c", topic:"TB"}); + n1.receive({payload:"d", topic:"TA"}); + }); + }); + + it('should handle too many pending messages for reduce mode', function(done) { + var flow = [{id:"n1", type:"join", mode:"reduce", + reduceRight:false, + reduceExp:"$A+payload", + reduceInit:"0", + reduceInitType:"num", + reduceFixup:undefined, + wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(joinNode, flow, function() { + var n1 = helper.getNode("n1"); + RED.settings.joinMaxKeptMsgsCount = 2; + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "join"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "join"); + evt.should.have.property('msg', "join.too-many"); + done(); + }, 150); + n1.receive({payload:3, parts:{index:2, count:4, id:222}}); + n1.receive({payload:2, parts:{index:1, count:4, id:222}}); + n1.receive({payload:4, parts:{index:3, count:4, id:222}}); + n1.receive({payload:1, parts:{index:0, count:4, id:222}}); + }); + }); });