diff --git a/packages/node_modules/@node-red/nodes/core/function/89-trigger.js b/packages/node_modules/@node-red/nodes/core/function/89-trigger.js index 7b5b820b7..9339436ee 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-trigger.js +++ b/packages/node_modules/@node-red/nodes/core/function/89-trigger.js @@ -82,10 +82,10 @@ module.exports = function(RED) { var npay = {}; var pendingMessages = []; var activeMessagePromise = null; - var processMessageQueue = function(msg) { - if (msg) { + var processMessageQueue = function(msgInfo) { + if (msgInfo) { // A new message has arrived - add it to the message queue - pendingMessages.push(msg); + pendingMessages.push(msgInfo); if (activeMessagePromise !== null) { // The node is currently processing a message, so do nothing // more with this message @@ -101,17 +101,17 @@ module.exports = function(RED) { // There are more messages to process. Get the next message and // start processing it. Recurse back in to check for any more - var nextMsg = pendingMessages.shift(); - activeMessagePromise = processMessage(nextMsg) + var nextMsgInfo = pendingMessages.shift(); + activeMessagePromise = processMessage(nextMsgInfo) .then(processMessageQueue) .catch((err) => { - node.error(err,nextMsg); + nextMsgInfo.done(err); return processMessageQueue(); }); } - this.on('input', function(msg) { - processMessageQueue(msg); + this.on('input', function(msg, send, done) { + processMessageQueue({msg, send, done}); }); var stat = function() { @@ -121,7 +121,8 @@ module.exports = function(RED) { else return {fill:"blue",shape:"dot",text:l}; } - var processMessage = function(msg) { + var processMessage = function(msgInfo) { + let msg = msgInfo.msg; var topic = RED.util.getMessageProperty(msg,node.topic) || "_none"; var promise; var delayDuration = node.duration; @@ -179,7 +180,7 @@ module.exports = function(RED) { /* istanbul ignore else */ if (node.op1type !== "nul") { var msg2 = RED.util.cloneMessage(msg); - node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, delayDuration); + node.topics[topic].tout = setInterval(function() { msgInfo.send(RED.util.cloneMessage(msg2)); }, delayDuration); } } else { @@ -203,14 +204,14 @@ module.exports = function(RED) { } promise.then(() => { if (node.op2type === "payl") { - if (node.second === true) { node.send([null,npay[topic]]); } - else { node.send(npay[topic]); } + if (node.second === true) { msgInfo.send([null,npay[topic]]); } + else { msgInfo.send(npay[topic]); } delete npay[topic]; } else { msg2.payload = node.topics[topic].m2; - if (node.second === true) { node.send([null,msg2]); } - else { node.send(msg2); } + if (node.second === true) { msgInfo.send([null,msg2]); } + else { msgInfo.send(msg2); } } delete node.topics[topic]; node.status(stat()); @@ -225,8 +226,9 @@ module.exports = function(RED) { }, delayDuration); } } + msgInfo.done(); node.status(stat()); - if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); } + if (node.op1type !== "nul") { msgInfo.send(RED.util.cloneMessage(msg)); } }); }); } @@ -262,8 +264,8 @@ module.exports = function(RED) { } delete node.topics[topic]; node.status(stat()); - if (node.second === true) { node.send([null,msg2]); } - else { node.send(msg2); } + if (node.second === true) { msgInfo.send([null,msg2]); } + else { msgInfo.send(msg2); } }).catch(err => { node.error(err); }); @@ -273,6 +275,7 @@ module.exports = function(RED) { // if (node.op2type === "payl") {node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); } // } } + msgInfo.done(); return Promise.resolve(); } this.on("close", function() { diff --git a/test/nodes/core/function/89-trigger_spec.js b/test/nodes/core/function/89-trigger_spec.js index dd77b37d9..ed25789a7 100644 --- a/test/nodes/core/function/89-trigger_spec.js +++ b/test/nodes/core/function/89-trigger_spec.js @@ -1154,5 +1154,45 @@ describe('trigger node', function() { },180); }); }); - + describe('messaging API', function () { + function mapiDoneTriggerTestHelper(done, nodeSetting, msgAndTimings) { + const completeNode = require("nr-test-utils").require("@node-red/nodes/core/common/24-complete.js"); + const catchNode = require("nr-test-utils").require("@node-red/nodes/core/common/25-catch.js"); + const flow = [ + { ...nodeSetting, id: "triggerNode1", type: "trigger", wires: [[]] }, + { id: "completeNode1", type: "complete", scope: ["triggerNode1"], uncaught: false, wires: [["helperNode1"]] }, + { id: "catchNode1", type: "catch", scope: ["triggerNode1"], uncaught: false, wires: [["helperNode1"]] }, + { id: "helperNode1", type: "helper", wires: [[]] }]; + const numMsgs = msgAndTimings.length; + helper.load([triggerNode, completeNode, catchNode], flow, function () { + const triggerNode1 = helper.getNode("triggerNode1"); + const helperNode1 = helper.getNode("helperNode1"); + RED.settings.nodeMessageBufferMaxLength = 3; + const t = Date.now(); + let c = 0; + helperNode1.on("input", function (msg) { + msg.should.have.a.property('payload'); + (Date.now() - t).should.be.approximately(msgAndTimings[msg.seq].avr, msgAndTimings[msg.seq].var); + c += 1; + if (c === numMsgs) { + done(); + } + }); + for (let i = 0; i < numMsgs; i++) { + setTimeout(function () { triggerNode1.receive(msgAndTimings[i].msg); }, msgAndTimings[i].delay); + } + }); + } + it('should call done() when first message has been processed', function (done) { + // not when second and more messages are emitted. + mapiDoneTriggerTestHelper(done, { units:"s", duration:"1" }, [ + { msg: { seq: 0, payload: "A"}, delay: 0, avr: 0, var: 100} + ]); + }); + it('should call done() when it receives reset message', function (done) { + mapiDoneTriggerTestHelper(done, {units:"s", duration:"1"}, [ + {msg: { seq: 0, payload: "A", reset:true}, delay: 0, avr: 0, var:100} + ]); + }) + }); });