mirror of
				https://github.com/node-red/node-red.git
				synced 2025-03-01 10:36:34 +00:00 
			
		
		
		
	Messaging API support in Sort node
This commit is contained in:
		| @@ -81,16 +81,16 @@ module.exports = function(RED) { | ||||
|  | ||||
|         function sortMessageGroup(group) { | ||||
|             var promise; | ||||
|             var msgs = group.msgs; | ||||
|             var msgInfos = group.msgInfos; | ||||
|             if (key_is_exp) { | ||||
|                 var evaluatedDataPromises = msgs.map(msg => { | ||||
|                 var evaluatedDataPromises = msgInfos.map(mInfo => { | ||||
|                     return new Promise((resolve,reject) => { | ||||
|                         RED.util.evaluateJSONataExpression(key_exp, msg, (err, result) => { | ||||
|                         RED.util.evaluateJSONataExpression(key_exp, mInfo.msg, (err, result) => { | ||||
|                             if (err) { | ||||
|                                 reject(RED._("sort.invalid-exp",{message:err.toString()})); | ||||
|                             } else { | ||||
|                                 resolve({ | ||||
|                                     item: msg, | ||||
|                                     item: mInfo, | ||||
|                                     sortValue: result | ||||
|                                 }) | ||||
|                             } | ||||
| @@ -106,20 +106,21 @@ module.exports = function(RED) { | ||||
|                 var key = function(msg) { | ||||
|                     return ; | ||||
|                 } | ||||
|                 var comp = generateComparisonFunction(msg => RED.util.getMessageProperty(msg, key_prop)); | ||||
|                 var comp = generateComparisonFunction(mInfo => RED.util.getMessageProperty(mInfo.msg, key_prop)); | ||||
|                 try { | ||||
|                     msgs.sort(comp); | ||||
|                     msgInfos.sort(comp); | ||||
|                 } | ||||
|                 catch (e) { | ||||
|                     return; // not send when error | ||||
|                 } | ||||
|                 promise = Promise.resolve(msgs); | ||||
|                 promise = Promise.resolve(msgInfos); | ||||
|             } | ||||
|             return promise.then(msgs => { | ||||
|                 for (var i = 0; i < msgs.length; i++) { | ||||
|                     var msg = msgs[i]; | ||||
|             return promise.then(msgInfos => { | ||||
|                 for (let i = 0; i < msgInfos.length; i++) { | ||||
|                     const msg = msgInfos[i].msg; | ||||
|                     msg.parts.index = i; | ||||
|                     node.send(msg); | ||||
|                     msgInfos[i].send(msg); | ||||
|                     msgInfos[i].done(); | ||||
|                 } | ||||
|             }); | ||||
|         } | ||||
| @@ -181,65 +182,79 @@ module.exports = function(RED) { | ||||
|                 } | ||||
|             } | ||||
|             if(oldest !== undefined) { | ||||
|                 oldest.msgInfos[oldest.msgInfos.length - 1].done(RED._("sort.too-many")); | ||||
|                 for (let i = 0; i < oldest.msgInfos.length - 1; i++) { | ||||
|                     oldest.msgInfos[i].done(); | ||||
|                 } | ||||
|                 delete pending[oldest_key]; | ||||
|                 return oldest.msgs.length; | ||||
|                 return oldest.msgInfos.length; | ||||
|             } | ||||
|             return 0; | ||||
|         } | ||||
|  | ||||
|         function processMessage(msg) { | ||||
|         function processMessage(msgInfo) { | ||||
|             const msg = msgInfo.msg; | ||||
|             if (target_is_prop) { | ||||
|                 sortMessageProperty(msg).then(send => { | ||||
|                     if (send) { | ||||
|                         node.send(msg); | ||||
|                         msgInfo.send(msg); | ||||
|                     } | ||||
|                     msgInfo.done(); | ||||
|                 }).catch(err => { | ||||
|                     node.error(err,msg); | ||||
|                     msgInfo.done(err); | ||||
|                 }); | ||||
|                 return; | ||||
|             } | ||||
|             var parts = msg.parts; | ||||
|             if (!parts || !parts.hasOwnProperty("id") || !parts.hasOwnProperty("index")) { | ||||
|                 msgInfo.done(); | ||||
|                 return; | ||||
|             } | ||||
|             var gid = parts.id; | ||||
|             if (!pending.hasOwnProperty(gid)) { | ||||
|                 pending[gid] = { | ||||
|                     count: undefined, | ||||
|                     msgs: [], | ||||
|                     msgInfos: [], | ||||
|                     seq_no: pending_id++ | ||||
|                 }; | ||||
|             } | ||||
|             var group = pending[gid]; | ||||
|             var msgs = group.msgs; | ||||
|             msgs.push(msg); | ||||
|             var msgInfos = group.msgInfos; | ||||
|             msgInfos.push(msgInfo); | ||||
|             if (parts.hasOwnProperty("count")) { | ||||
|                 group.count = parts.count; | ||||
|             } | ||||
|             pending_count++; | ||||
|             if (group.count === msgs.length) { | ||||
|             if (group.count === msgInfos.length) { | ||||
|                 delete pending[gid] | ||||
|                 sortMessageGroup(group).catch(err => { | ||||
|                     node.error(err,msg); | ||||
|                     // throw an error for last message, and just call done() for remaining messages | ||||
|                     msgInfos[msgInfos.length-1].done(err); | ||||
|                     for (let i = 0; i < msgInfos.length - 1; i++) { | ||||
|                         msgInfos[i].done() | ||||
|                     }; | ||||
|                 }); | ||||
|                 pending_count -= msgs.length; | ||||
|                 pending_count -= msgInfos.length; | ||||
|             } else { | ||||
|                 var max_msgs = max_kept_msgs_count(node); | ||||
|                 if ((max_msgs > 0) && (pending_count > max_msgs)) { | ||||
|                     pending_count -= removeOldestPending(); | ||||
|                     node.error(RED._("sort.too-many"), msg); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         this.on("input", function(msg) { | ||||
|             processMessage(msg); | ||||
|         this.on("input", function(msg, send, done) { | ||||
|             processMessage({msg, send, done}); | ||||
|         }); | ||||
|  | ||||
|         this.on("close", function() { | ||||
|             for(var key in pending) { | ||||
|                 if (pending.hasOwnProperty(key)) { | ||||
|                     node.log(RED._("sort.clear"), pending[key].msgs[0]); | ||||
|                     node.log(RED._("sort.clear"), pending[key].msgInfos[0]); | ||||
|                     const group = pending[key]; | ||||
|                     group.msgInfos.forEach(mInfo => { | ||||
|                         mInfo.done(); | ||||
|                     }); | ||||
|                     delete pending[key]; | ||||
|                 } | ||||
|             } | ||||
|   | ||||
| @@ -493,4 +493,62 @@ describe('SORT node', function() { | ||||
|         }); | ||||
|     }); | ||||
|  | ||||
|     describe('messaging API', function() { | ||||
|         function mapiDoneTestHelper(done, targetType, msgAndTimings) { | ||||
|             const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); | ||||
|             const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js"); | ||||
|             const flow = [ | ||||
|                 {id: "sortNode1", type: "sort", order: "ascending", as_num: false, target: "payload", targetType, | ||||
|                  seqKey: "payload", seqKeyType: "msg", wires: [[]]}, | ||||
|                 { id: "completeNode1", type: "complete", scope: ["sortNode1"], uncaught: false, wires: [["helperNode1"]] }, | ||||
|                 { id: "catchNode1", type: "catch", scope: ["sortNode1"], uncaught: false, wires: [["helperNode1"]] }, | ||||
|                 { id: "helperNode1", type: "helper", wires: [[]] }]; | ||||
|             const numMsgs = msgAndTimings.length; | ||||
|             helper.load([sortNode, completeNode, catchNode], flow, function () { | ||||
|                 const sortNode1 = helper.getNode("sortNode1"); | ||||
|                 const helperNode1 = helper.getNode("helperNode1"); | ||||
|                 RED.settings.nodeMessageBufferMaxLength = 2; | ||||
|                 const t = Date.now(); | ||||
|                 let c = 0; | ||||
|                 helperNode1.on("input", function (msg) { | ||||
|                     msg.should.have.a.property('payload'); | ||||
|                     (Date.now() - t).should.be.approximately(msgAndTimings[msg.seq].avr, msgAndTimings[msg.seq].var); | ||||
|                     c += 1; | ||||
|                     if (c === numMsgs) { | ||||
|                         done(); | ||||
|                     } | ||||
|                 }); | ||||
|                 for (let i = 0; i < numMsgs; i++) { | ||||
|                     setTimeout(function () { sortNode1.receive(msgAndTimings[i].msg); }, msgAndTimings[i].delay); | ||||
|                 } | ||||
|             }); | ||||
|         } | ||||
|         it('should call done() when message is sent (payload)', function (done) { | ||||
|             mapiDoneTestHelper(done, "msg", [ | ||||
|                 { msg: { seq: 0, payload: [1, 3, 2] }, delay: 0, avr: 0, var: 100 }, | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() when message is sent (sequence)', function (done) { | ||||
|             mapiDoneTestHelper(done, "seq", [ | ||||
|                 { msg: { seq: 0, payload: 3, parts: {id:"A", index: 0, count: 2}}, delay: 0, avr: 500, var: 100 }, | ||||
|                 { msg: { seq: 1, payload: 2, parts: {id:"A", index: 1, count: 2}}, delay: 500, avr: 500, var: 100} | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() regardless of buffer overflow (same group)', function (done) { | ||||
|             mapiDoneTestHelper(done, "seq", [ | ||||
|                 { msg: { seq: 0, payload: 1, parts: {id:"A", index: 0, count: 3}}, delay: 0, avr: 1000, var: 100 }, | ||||
|                 { msg: { seq: 1, payload: 3, parts: {id:"A", index: 1, count: 3}}, delay: 500, avr: 1000, var: 100 }, | ||||
|                 { msg: { seq: 2, payload: 2, parts: {id:"A", index: 2, count: 3}}, delay: 1000, avr: 1000, var: 100 }, | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() regardless of buffer overflow (different group)', function (done) { | ||||
|             mapiDoneTestHelper(done, "seq", [ | ||||
|                 { msg: { seq: 0, payload: 1, parts: {id:"A", index: 0, count: 2}}, delay: 0, avr: 1000, var: 100 }, | ||||
|                 { msg: { seq: 1, payload: 3, parts: {id:"B", index: 0, count: 2}}, delay: 500, avr: 1200, var: 100 }, | ||||
|                 { msg: { seq: 2, payload: 5, parts: {id:"C", index: 0, count: 2}}, delay: 1000, avr: 1500, var: 100 }, | ||||
|                 { msg: { seq: 3, payload: 2, parts: {id:"B", index: 1, count: 2}}, delay: 1200, avr: 1200, var: 100 }, | ||||
|                 { msg: { seq: 4, payload: 4, parts: {id:"C", index: 1, count: 2}}, delay: 1500, avr: 1500, var: 100 },                 | ||||
|             ]); | ||||
|         });         | ||||
|     }); | ||||
| }); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user