mirror of
				https://github.com/node-red/node-red.git
				synced 2025-03-01 10:36:34 +00:00 
			
		
		
		
	Merge pull request #2738 from node-red-hitachi/batch-node-mapi
Messaging API support in Batch node
This commit is contained in:
		| @@ -32,11 +32,11 @@ module.exports = function(RED) { | ||||
|         return _max_kept_msgs_count; | ||||
|     } | ||||
|  | ||||
|     function send_msgs(node, msgs, clone_msg) { | ||||
|         var count = msgs.length; | ||||
|         var msg_id = msgs[0]._msgid; | ||||
|     function send_msgs(node, msgInfos, clone_msg) { | ||||
|         var count = msgInfos.length; | ||||
|         var msg_id = msgInfos[0].msg._msgid; | ||||
|         for (var i = 0; i < count; i++) { | ||||
|             var msg = clone_msg ? RED.util.cloneMessage(msgs[i]) : msgs[i]; | ||||
|             var msg = clone_msg ? RED.util.cloneMessage(msgInfos[i].msg) : msgInfos[i].msg; | ||||
|             if (!msg.hasOwnProperty("parts")) { | ||||
|                 msg.parts = {}; | ||||
|             } | ||||
| @@ -44,14 +44,16 @@ module.exports = function(RED) { | ||||
|             parts.id = msg_id; | ||||
|             parts.index = i; | ||||
|             parts.count = count; | ||||
|             node.send(msg); | ||||
|             msgInfos[i].send(msg); | ||||
|             //msgInfos[i].done(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     function send_interval(node, allow_empty_seq) { | ||||
|         let msgs = node.pending; | ||||
|         if (msgs.length > 0) { | ||||
|             send_msgs(node, msgs, false); | ||||
|         let msgInfos = node.pending; | ||||
|         if (msgInfos.length > 0) { | ||||
|             send_msgs(node, msgInfos, false); | ||||
|             msgInfos.forEach(e => e.done()); | ||||
|             node.pending = []; | ||||
|         } | ||||
|         else { | ||||
| @@ -108,19 +110,20 @@ module.exports = function(RED) { | ||||
|                 return; | ||||
|             } | ||||
|         } | ||||
|         var msgs = []; | ||||
|         var msgInfos = []; | ||||
|         for (var topic of topics) { | ||||
|             var t_msgs = get_msgs_of_topic(pending, topic); | ||||
|             msgs = msgs.concat(t_msgs); | ||||
|             var t_msgInfos = get_msgs_of_topic(pending, topic); | ||||
|             msgInfos = msgInfos.concat(t_msgInfos); | ||||
|         } | ||||
|         for (var topic of topics) { | ||||
|             remove_topic(pending, topic); | ||||
|         } | ||||
|         send_msgs(node, msgs, true); | ||||
|         node.pending_count -= msgs.length; | ||||
|         send_msgs(node, msgInfos, true); | ||||
|         msgInfos.forEach(e => e.done() ); | ||||
|         node.pending_count -= msgInfos.length; | ||||
|     } | ||||
|  | ||||
|     function add_to_topic_group(pending, topic, gid, msg) { | ||||
|     function add_to_topic_group(pending, topic, gid, msgInfo) { | ||||
|         if (!pending.hasOwnProperty(topic)) { | ||||
|             pending[topic] = { groups: {}, gids: [] }; | ||||
|         } | ||||
| @@ -132,32 +135,43 @@ module.exports = function(RED) { | ||||
|             gids.push(gid); | ||||
|         } | ||||
|         var group = groups[gid]; | ||||
|         group.msgs.push(msg); | ||||
|         group.msgs.push(msgInfo); | ||||
|         if ((group.count === undefined) && | ||||
|             msg.parts.hasOwnProperty('count')) { | ||||
|             group.count = msg.parts.count; | ||||
|             msgInfo.msg.parts.hasOwnProperty('count')) { | ||||
|             group.count = msgInfo.msg.parts.count; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     function concat_msg(node, msg) { | ||||
|     function concat_msg(node, msg, send, done) { | ||||
|         var topic = msg.topic; | ||||
|         if(node.topics.indexOf(topic) >= 0) { | ||||
|             if (!msg.hasOwnProperty("parts") || | ||||
|                 !msg.parts.hasOwnProperty("id") || | ||||
|                 !msg.parts.hasOwnProperty("index") || | ||||
|                 !msg.parts.hasOwnProperty("count")) { | ||||
|                 node.error(RED._("batch.no-parts"), msg); | ||||
|                 done(RED._("batch.no-parts")); | ||||
|                 return; | ||||
|             } | ||||
|             var gid = msg.parts.id; | ||||
|             var pending = node.pending; | ||||
|             add_to_topic_group(pending, topic, gid, msg); | ||||
|             add_to_topic_group(pending, topic, gid, {msg, send, done}); | ||||
|             node.pending_count++; | ||||
|             var max_msgs = max_kept_msgs_count(node); | ||||
|             if ((max_msgs > 0) && (node.pending_count > max_msgs)) { | ||||
|                 Object.values(node.pending).forEach(p_topic => { | ||||
|                     Object.values(p_topic.groups).forEach(group => { | ||||
|                         group.msgs.forEach(msgInfo => { | ||||
|                             if (msgInfo.msg.id === msg.id) { | ||||
|                                 // the message that caused the overflow | ||||
|                                 msgInfo.done(RED._("batch.too-many")); | ||||
|                             } else { | ||||
|                                 msgInfo.done(); | ||||
|                             } | ||||
|                         }) | ||||
|                     }) | ||||
|                 }); | ||||
|                 node.pending = {}; | ||||
|                 node.pending_count = 0; | ||||
|                 node.error(RED._("batch.too-many"), msg); | ||||
|             } | ||||
|             try_concat(node, pending); | ||||
|         } | ||||
| @@ -178,29 +192,37 @@ module.exports = function(RED) { | ||||
|                 return; | ||||
|             } | ||||
|             node.pending = []; | ||||
|             this.on("input", function(msg) { | ||||
|             this.on("input", function(msg, send, done) { | ||||
|                 if (msg.hasOwnProperty("reset")) { | ||||
|                     node.pending.forEach(e => e.done()); | ||||
|                     node.pending = []; | ||||
|                     node.pending_count = 0; | ||||
|                     done(); | ||||
|                     return; | ||||
|                 } | ||||
|                 var queue = node.pending; | ||||
|                 queue.push(msg); | ||||
|                 queue.push({msg, send, done}); | ||||
|                 node.pending_count++; | ||||
|                 if (queue.length === count) { | ||||
|                     send_msgs(node, queue, is_overlap); | ||||
|                     for (let i = 0; i < queue.length-overlap; i++) { | ||||
|                         queue[i].done(); | ||||
|                     } | ||||
|                     node.pending = | ||||
|                         (overlap === 0) ? [] : queue.slice(-overlap); | ||||
|                     node.pending_count = 0; | ||||
|                 } | ||||
|                 var max_msgs = max_kept_msgs_count(node); | ||||
|                 if ((max_msgs > 0) && (node.pending_count > max_msgs)) { | ||||
|                     let lastMInfo = node.pending.pop(); | ||||
|                     lastMInfo.done(RED._("batch.too-many")); | ||||
|                     node.pending.forEach(e => e.done()); | ||||
|                     node.pending = []; | ||||
|                     node.pending_count = 0; | ||||
|                     node.error(RED._("batch.too-many"), msg); | ||||
|                 } | ||||
|             }); | ||||
|             this.on("close", function() { | ||||
|                 node.pending.forEach(e=> e.done()); | ||||
|                 node.pending_count = 0; | ||||
|                 node.pending = []; | ||||
|             }); | ||||
| @@ -217,31 +239,36 @@ module.exports = function(RED) { | ||||
|             if (interval > 0) { | ||||
|                 timer = setInterval(msgHandler, interval); | ||||
|             } | ||||
|             this.on("input", function(msg) { | ||||
|             this.on("input", function(msg, send, done) { | ||||
|                 if (msg.hasOwnProperty("reset")) { | ||||
|                     if (timer !== undefined) { | ||||
|                         clearInterval(timer); | ||||
|                     } | ||||
|                     node.pending.forEach(e => e.done()); | ||||
|                     node.pending = []; | ||||
|                     node.pending_count = 0; | ||||
|                     done(); | ||||
|                     if (interval > 0) { | ||||
|                         timer = setInterval(msgHandler, interval); | ||||
|                     } | ||||
|                     return; | ||||
|                 } | ||||
|                 node.pending.push(msg); | ||||
|                 node.pending.push({msg, send, done}); | ||||
|                 node.pending_count++; | ||||
|                 var max_msgs = max_kept_msgs_count(node); | ||||
|                 if ((max_msgs > 0) && (node.pending_count > max_msgs)) { | ||||
|                     let lastMInfo = node.pending.pop(); | ||||
|                     lastMInfo.done(RED._("batch.too-many")); | ||||
|                     node.pending.forEach(e => e.done()); | ||||
|                     node.pending = []; | ||||
|                     node.pending_count = 0; | ||||
|                     node.error(RED._("batch.too-many"), msg); | ||||
|                 } | ||||
|             }); | ||||
|             this.on("close", function() { | ||||
|                 if (timer !== undefined) { | ||||
|                     clearInterval(timer); | ||||
|                 } | ||||
|                 node.pending.forEach(e => e.done()); | ||||
|                 node.pending = []; | ||||
|                 node.pending_count = 0; | ||||
|             }); | ||||
| @@ -251,15 +278,26 @@ module.exports = function(RED) { | ||||
|                 return x.topic; | ||||
|             }); | ||||
|             node.pending = {}; | ||||
|             this.on("input", function(msg) { | ||||
|             this.on("input", function(msg, send, done) { | ||||
|                 if (msg.hasOwnProperty("reset")) { | ||||
|                     Object.values(node.pending).forEach(p_topic => { | ||||
|                         Object.values(p_topic.groups).forEach(group => { | ||||
|                             group.msgs.forEach(e => e.done()); | ||||
|                         }); | ||||
|                     }); | ||||
|                     node.pending = {}; | ||||
|                     node.pending_count = 0; | ||||
|                     done(); | ||||
|                     return; | ||||
|                 } | ||||
|                 concat_msg(node, msg); | ||||
|                 concat_msg(node, msg, send, done); | ||||
|             }); | ||||
|             this.on("close", function() { | ||||
|                 Object.values(node.pending).forEach(p_topic => { | ||||
|                     Object.values(p_topic.groups).forEach(group => { | ||||
|                         group.msgs.forEach(e => e.done()); | ||||
|                     }); | ||||
|                 }); | ||||
|                 node.pending = {}; | ||||
|                 node.pending_count = 0; | ||||
|             }); | ||||
|   | ||||
| @@ -451,4 +451,92 @@ describe('BATCH node', function() { | ||||
|         }); | ||||
|     }); | ||||
|  | ||||
|     describe('messaging API', function() { | ||||
|         function mapiDoneTestHelper(done, mode, count, overlap, interval, allowEmptySequence, msgAndTimings) { | ||||
|             const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); | ||||
|             const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js"); | ||||
|             const flow = [{id:"batchNode1", type:"batch", name: "BatchNode", mode, count, overlap, interval,  | ||||
|                            allowEmptySequence, topics: [{topic: "TA"}], wires:[[]]}, | ||||
|                           {id:"completeNode1",type:"complete",scope: ["batchNode1"],uncaught:false,wires:[["helperNode1"]]}, | ||||
|                           {id:"catchNode1", type:"catch",scope: ["batchNode1"],uncaught:false,wires:[["helperNode1"]]}, | ||||
|                           {id:"helperNode1",type:"helper", wires:[[]]}]; | ||||
|             const numMsgs = msgAndTimings.length; | ||||
|             helper.load([batchNode, completeNode, catchNode], flow, function () { | ||||
|                 const batchNode1 = helper.getNode("batchNode1"); | ||||
|                 const helperNode1 = helper.getNode("helperNode1"); | ||||
|                 RED.settings.nodeMessageBufferMaxLength = 2; | ||||
|                 const t = Date.now(); | ||||
|                 let c = 0; | ||||
|                 helperNode1.on("input", function (msg) { | ||||
|                     msg.should.have.a.property('payload'); | ||||
|                     (Date.now() - t).should.be.approximately(msgAndTimings[msg.payload].avr, msgAndTimings[msg.payload].var); | ||||
|                     c += 1; | ||||
|                     if ( c === numMsgs) { | ||||
|                         done(); | ||||
|                     } | ||||
|                 }); | ||||
|                 for (let i = 0; i < numMsgs; i++) { | ||||
|                     setTimeout( function() { batchNode1.receive(msgAndTimings[i].msg); }, msgAndTimings[i].delay); | ||||
|                 } | ||||
|             }); | ||||
|         } | ||||
|  | ||||
|         it('should call done() when message is sent (mode: count)', function(done) { | ||||
|             mapiDoneTestHelper(done, "count", 2, 0, 2, false, [  | ||||
|                 { msg: {payload: 0}, delay: 0, avr: 0, var: 100}, | ||||
|                 { msg: {payload: 1}, delay: 0, avr: 0, var: 100} | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() when reset (mode: count)', function(done) { | ||||
|             mapiDoneTestHelper(done, "count", 2, 0, 2, false, [  | ||||
|                 { msg: {payload: 0}, delay: 0, avr: 200, var: 100}, | ||||
|                 { msg: {payload: 1, reset:true}, delay: 200, avr: 200, var: 100} | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() regardless of buffer overflow (mode: count)', function(done) { | ||||
|             mapiDoneTestHelper(done, "count", 10, 0, 2, false, [ | ||||
|                 { msg: {payload: 0}, delay: 0, avr: 500, var: 100}, | ||||
|                 { msg: {payload: 1}, delay: 100, avr: 500, var: 100}, | ||||
|                 { msg: {payload: 2}, delay: 500, avr: 500, var: 100} | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() when message is sent (mode: interval)', function(done) { | ||||
|             mapiDoneTestHelper(done, "interval", 2, 0, 2, false, [ | ||||
|                 { msg: {payload: 0}, delay: 0, avr: 2000, var: 100}, | ||||
|                 { msg: {payload: 1}, delay: 500, avr: 2000, var: 100} | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() when reset (mode: interval)', function(done) { | ||||
|             mapiDoneTestHelper(done, "interval", 2, 0, 2, false, [ | ||||
|                 { msg: {payload: 0}, delay: 0, avr: 200, var: 100}, | ||||
|                 { msg: {payload: 1, reset:true}, delay: 200, avr: 200, var: 100} | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() regardless of buffer overflow (mode: interval)', function(done) { | ||||
|             mapiDoneTestHelper(done, "interval", 2, 0, 2, false, [ | ||||
|                 { msg: {payload: 0}, delay: 0, avr: 500, var: 100}, | ||||
|                 { msg: {payload: 1}, delay: 100, avr: 500, var: 100}, | ||||
|                 { msg: {payload: 2}, delay: 500, avr: 500, var: 100} | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() when message is sent (mode: concat)', function(done) { | ||||
|             mapiDoneTestHelper(done, "concat", 2, 0, 2, false, [ | ||||
|                 { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 0, count: 2}}, delay: 0, avr: 1000, var: 100}, | ||||
|                 { msg: {topic:"TA", payload: 1, parts: {id: "TA", index: 1, count: 2}}, delay: 1000, avr: 1000, var: 100}, | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() when reset (mode: concat)', function(done) { | ||||
|             mapiDoneTestHelper(done, "concat", 2, 0, 2, false, [ | ||||
|                 { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 0, count: 2}}, delay: 0, avr: 1000, var: 100}, | ||||
|                 { msg: {payload: 1, reset:true}, delay: 1000, avr: 1000, var: 100}, | ||||
|             ]); | ||||
|         }); | ||||
|         it('should call done() regardless of buffer overflow (mode: concat)', function(done) { | ||||
|             mapiDoneTestHelper(done, "concat", 2, 0, 2, false, [ | ||||
|                 { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 0, count: 3}}, delay: 0, avr: 1000, var: 100}, | ||||
|                 { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 1, count: 3}}, delay: 500, avr: 1000, var: 100}, | ||||
|                 { msg: {topic:"TA", payload: 0, parts: {id: "TA", index: 2, count: 3}}, delay: 1000, avr: 1000, var: 100} | ||||
|             ]); | ||||
|         }); | ||||
|     }); | ||||
| }); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user