From d7dfeaf0c1ad69638f1ed4f168b9db96f0027650 Mon Sep 17 00:00:00 2001 From: Kunihiko Toumura Date: Mon, 2 Nov 2020 13:31:27 +0900 Subject: [PATCH] Messaging API support in Sort node --- .../@node-red/nodes/core/sequence/18-sort.js | 65 ++++++++++++------- test/nodes/core/sequence/18-sort_spec.js | 58 +++++++++++++++++ 2 files changed, 98 insertions(+), 25 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/sequence/18-sort.js b/packages/node_modules/@node-red/nodes/core/sequence/18-sort.js index c330801e2..3bcdfb105 100644 --- a/packages/node_modules/@node-red/nodes/core/sequence/18-sort.js +++ b/packages/node_modules/@node-red/nodes/core/sequence/18-sort.js @@ -81,16 +81,16 @@ module.exports = function(RED) { function sortMessageGroup(group) { var promise; - var msgs = group.msgs; + var msgInfos = group.msgInfos; if (key_is_exp) { - var evaluatedDataPromises = msgs.map(msg => { + var evaluatedDataPromises = msgInfos.map(mInfo => { return new Promise((resolve,reject) => { - RED.util.evaluateJSONataExpression(key_exp, msg, (err, result) => { + RED.util.evaluateJSONataExpression(key_exp, mInfo.msg, (err, result) => { if (err) { reject(RED._("sort.invalid-exp",{message:err.toString()})); } else { resolve({ - item: msg, + item: mInfo, sortValue: result }) } @@ -106,20 +106,21 @@ module.exports = function(RED) { var key = function(msg) { return ; } - var comp = generateComparisonFunction(msg => RED.util.getMessageProperty(msg, key_prop)); + var comp = generateComparisonFunction(mInfo => RED.util.getMessageProperty(mInfo.msg, key_prop)); try { - msgs.sort(comp); + msgInfos.sort(comp); } catch (e) { return; // not send when error } - promise = Promise.resolve(msgs); + promise = Promise.resolve(msgInfos); } - return promise.then(msgs => { - for (var i = 0; i < msgs.length; i++) { - var msg = msgs[i]; + return promise.then(msgInfos => { + for (let i = 0; i < msgInfos.length; i++) { + const msg = msgInfos[i].msg; msg.parts.index = i; - node.send(msg); + msgInfos[i].send(msg); + msgInfos[i].done(); } }); } @@ -181,65 +182,79 @@ module.exports = function(RED) { } } if(oldest !== undefined) { + oldest.msgInfos[oldest.msgInfos.length - 1].done(RED._("sort.too-many")); + for (let i = 0; i < oldest.msgInfos.length - 1; i++) { + oldest.msgInfos[i].done(); + } delete pending[oldest_key]; - return oldest.msgs.length; + return oldest.msgInfos.length; } return 0; } - function processMessage(msg) { + function processMessage(msgInfo) { + const msg = msgInfo.msg; if (target_is_prop) { sortMessageProperty(msg).then(send => { if (send) { - node.send(msg); + msgInfo.send(msg); } + msgInfo.done(); }).catch(err => { - node.error(err,msg); + msgInfo.done(err); }); return; } var parts = msg.parts; if (!parts || !parts.hasOwnProperty("id") || !parts.hasOwnProperty("index")) { + msgInfo.done(); return; } var gid = parts.id; if (!pending.hasOwnProperty(gid)) { pending[gid] = { count: undefined, - msgs: [], + msgInfos: [], seq_no: pending_id++ }; } var group = pending[gid]; - var msgs = group.msgs; - msgs.push(msg); + var msgInfos = group.msgInfos; + msgInfos.push(msgInfo); if (parts.hasOwnProperty("count")) { group.count = parts.count; } pending_count++; - if (group.count === msgs.length) { + if (group.count === msgInfos.length) { delete pending[gid] sortMessageGroup(group).catch(err => { - node.error(err,msg); + // throw an error for last message, and just call done() for remaining messages + msgInfos[msgInfos.length-1].done(err); + for (let i = 0; i < msgInfos.length - 1; i++) { + msgInfos[i].done() + }; }); - pending_count -= msgs.length; + pending_count -= msgInfos.length; } else { var max_msgs = max_kept_msgs_count(node); if ((max_msgs > 0) && (pending_count > max_msgs)) { pending_count -= removeOldestPending(); - node.error(RED._("sort.too-many"), msg); } } } - this.on("input", function(msg) { - processMessage(msg); + this.on("input", function(msg, send, done) { + processMessage({msg, send, done}); }); this.on("close", function() { for(var key in pending) { if (pending.hasOwnProperty(key)) { - node.log(RED._("sort.clear"), pending[key].msgs[0]); + node.log(RED._("sort.clear"), pending[key].msgInfos[0]); + const group = pending[key]; + group.msgInfos.forEach(mInfo => { + mInfo.done(); + }); delete pending[key]; } } diff --git a/test/nodes/core/sequence/18-sort_spec.js b/test/nodes/core/sequence/18-sort_spec.js index ac220216c..955038bfb 100644 --- a/test/nodes/core/sequence/18-sort_spec.js +++ b/test/nodes/core/sequence/18-sort_spec.js @@ -493,4 +493,62 @@ describe('SORT node', function() { }); }); + describe('messaging API', function() { + function mapiDoneTestHelper(done, targetType, msgAndTimings) { + const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); + const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js"); + const flow = [ + {id: "sortNode1", type: "sort", order: "ascending", as_num: false, target: "payload", targetType, + seqKey: "payload", seqKeyType: "msg", wires: [[]]}, + { id: "completeNode1", type: "complete", scope: ["sortNode1"], uncaught: false, wires: [["helperNode1"]] }, + { id: "catchNode1", type: "catch", scope: ["sortNode1"], uncaught: false, wires: [["helperNode1"]] }, + { id: "helperNode1", type: "helper", wires: [[]] }]; + const numMsgs = msgAndTimings.length; + helper.load([sortNode, completeNode, catchNode], flow, function () { + const sortNode1 = helper.getNode("sortNode1"); + const helperNode1 = helper.getNode("helperNode1"); + RED.settings.nodeMessageBufferMaxLength = 2; + const t = Date.now(); + let c = 0; + helperNode1.on("input", function (msg) { + msg.should.have.a.property('payload'); + (Date.now() - t).should.be.approximately(msgAndTimings[msg.seq].avr, msgAndTimings[msg.seq].var); + c += 1; + if (c === numMsgs) { + done(); + } + }); + for (let i = 0; i < numMsgs; i++) { + setTimeout(function () { sortNode1.receive(msgAndTimings[i].msg); }, msgAndTimings[i].delay); + } + }); + } + it('should call done() when message is sent (payload)', function (done) { + mapiDoneTestHelper(done, "msg", [ + { msg: { seq: 0, payload: [1, 3, 2] }, delay: 0, avr: 0, var: 100 }, + ]); + }); + it('should call done() when message is sent (sequence)', function (done) { + mapiDoneTestHelper(done, "seq", [ + { msg: { seq: 0, payload: 3, parts: {id:"A", index: 0, count: 2}}, delay: 0, avr: 500, var: 100 }, + { msg: { seq: 1, payload: 2, parts: {id:"A", index: 1, count: 2}}, delay: 500, avr: 500, var: 100} + ]); + }); + it('should call done() regardless of buffer overflow (same group)', function (done) { + mapiDoneTestHelper(done, "seq", [ + { msg: { seq: 0, payload: 1, parts: {id:"A", index: 0, count: 3}}, delay: 0, avr: 1000, var: 100 }, + { msg: { seq: 1, payload: 3, parts: {id:"A", index: 1, count: 3}}, delay: 500, avr: 1000, var: 100 }, + { msg: { seq: 2, payload: 2, parts: {id:"A", index: 2, count: 3}}, delay: 1000, avr: 1000, var: 100 }, + ]); + }); + it('should call done() regardless of buffer overflow (different group)', function (done) { + mapiDoneTestHelper(done, "seq", [ + { msg: { seq: 0, payload: 1, parts: {id:"A", index: 0, count: 2}}, delay: 0, avr: 1000, var: 100 }, + { msg: { seq: 1, payload: 3, parts: {id:"B", index: 0, count: 2}}, delay: 500, avr: 1200, var: 100 }, + { msg: { seq: 2, payload: 5, parts: {id:"C", index: 0, count: 2}}, delay: 1000, avr: 1500, var: 100 }, + { msg: { seq: 3, payload: 2, parts: {id:"B", index: 1, count: 2}}, delay: 1200, avr: 1200, var: 100 }, + { msg: { seq: 4, payload: 4, parts: {id:"C", index: 1, count: 2}}, delay: 1500, avr: 1500, var: 100 }, + ]); + }); + }); });