diff --git a/nodes/core/core/89-trigger.js b/nodes/core/core/89-trigger.js index f5fd28b7f..48a595ccc 100644 --- a/nodes/core/core/89-trigger.js +++ b/nodes/core/core/89-trigger.js @@ -76,8 +76,43 @@ module.exports = function(RED) { var node = this; node.topics = {}; - this.on("input", function(msg) { + var pendingMessages = []; + var activeMessagePromise = null; + var processMessageQueue = function(msg) { + if (msg) { + // A new message has arrived - add it to the message queue + pendingMessages.push(msg); + if (activeMessagePromise !== null) { + // The node is currently processing a message, so do nothing + // more with this message + return; + } + } + if (pendingMessages.length === 0) { + // There are no more messages to process, clear the active flag + // and return + activeMessagePromise = null; + return; + } + + // There are more messages to process. Get the next message and + // start processing it. Recurse back in to check for any more + var nextMsg = pendingMessages.shift(); + activeMessagePromise = processMessage(nextMsg) + .then(processMessageQueue) + .catch((err) => { + node.error(err,nextMsg); + return processMessageQueue(); + }); + } + + this.on('input', function(msg) { + processMessageQueue(msg); + }); + + var processMessage = function(msg) { var topic = msg.topic || "_none"; + var promise; if (node.bytopic === "all") { topic = "_none"; } node.topics[topic] = node.topics[topic] || {}; if (msg.hasOwnProperty("reset") || ((node.reset !== '') && msg.hasOwnProperty("payload") && (msg.payload !== null) && msg.payload.toString && (msg.payload.toString() == node.reset)) ) { @@ -88,48 +123,88 @@ module.exports = function(RED) { } else { if (((!node.topics[topic].tout) && (node.topics[topic].tout !== 0)) || (node.loop === true)) { + promise = Promise.resolve(); if (node.op2type === "pay" || node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); } else if (node.op2Templated) { node.topics[topic].m2 = mustache.render(node.op2,msg); } else if (node.op2type !== "nul") { - node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg); - } - - if (node.op1type === "pay") { } - else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); } - else if (node.op1type !== "nul") { - msg.payload = RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg); - } - - if (node.duration === 0) { node.topics[topic].tout = 0; } - else if (node.loop === true) { - /* istanbul ignore else */ - if (node.topics[topic].tout) { clearInterval(node.topics[topic].tout); } - /* istanbul ignore else */ - if (node.op1type !== "nul") { - var msg2 = RED.util.cloneMessage(msg); - node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, node.duration); - } - } - else { - if (!node.topics[topic].tout) { - node.topics[topic].tout = setTimeout(function() { - var msg2 = null; - if (node.op2type !== "nul") { - msg2 = RED.util.cloneMessage(msg); - if (node.op2type === "flow" || node.op2type === "global") { - node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg); - } - msg2.payload = node.topics[topic].m2; - delete node.topics[topic]; - node.send(msg2); + promise = new Promise((resolve,reject) => { + RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => { + if (err) { + reject(err); + } else { + node.topics[topic].m2 = value; + resolve(); } - else { delete node.topics[topic]; } - node.status({}); - }, node.duration); - } + }); + }); } - node.status({fill:"blue",shape:"dot",text:" "}); - if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); } + + return promise.then(() => { + promise = Promise.resolve(); + if (node.op1type === "pay") { } + else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); } + else if (node.op1type !== "nul") { + promise = new Promise((resolve,reject) => { + RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg,(err,value) => { + if (err) { + reject(err); + } else { + msg.payload = value; + resolve(); + } + }); + }); + } + return promise.then(() => { + if (node.duration === 0) { node.topics[topic].tout = 0; } + else if (node.loop === true) { + /* istanbul ignore else */ + if (node.topics[topic].tout) { clearInterval(node.topics[topic].tout); } + /* istanbul ignore else */ + if (node.op1type !== "nul") { + var msg2 = RED.util.cloneMessage(msg); + node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, node.duration); + } + } + else { + if (!node.topics[topic].tout) { + node.topics[topic].tout = setTimeout(function() { + var msg2 = null; + if (node.op2type !== "nul") { + var promise = Promise.resolve(); + msg2 = RED.util.cloneMessage(msg); + if (node.op2type === "flow" || node.op2type === "global") { + promise = new Promise((resolve,reject) => { + RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => { + if (err) { + reject(err); + } else { + node.topics[topic].m2 = value; + resolve(); + } + }); + }); + } + promise.then(() => { + msg2.payload = node.topics[topic].m2; + delete node.topics[topic]; + node.send(msg2); + node.status({}); + }).catch(err => { + node.error(err); + }); + } else { + delete node.topics[topic]; + node.status({}); + } + + }, node.duration); + } + } + node.status({fill:"blue",shape:"dot",text:" "}); + if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); } + }); + }); } else if ((node.extend === "true" || node.extend === true) && (node.duration > 0)) { /* istanbul ignore else */ @@ -138,25 +213,43 @@ module.exports = function(RED) { if (node.topics[topic].tout) { clearTimeout(node.topics[topic].tout); } node.topics[topic].tout = setTimeout(function() { var msg2 = null; + var promise = Promise.resolve(); + if (node.op2type !== "nul") { if (node.op2type === "flow" || node.op2type === "global") { - node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg); - } - if (node.topics[topic] !== undefined) { - msg2 = RED.util.cloneMessage(msg); - msg2.payload = node.topics[topic].m2; + promise = new Promise((resolve,reject) => { + RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => { + if (err) { + reject(err); + } else { + node.topics[topic].m2 = value; + resolve(); + } + }); + }); } } - delete node.topics[topic]; - node.status({}); - node.send(msg2); + promise.then(() => { + if (node.op2type !== "nul") { + if (node.topics[topic] !== undefined) { + msg2 = RED.util.cloneMessage(msg); + msg2.payload = node.topics[topic].m2; + } + } + delete node.topics[topic]; + node.status({}); + node.send(msg2); + }).catch(err => { + node.error(err); + }); }, node.duration); } else { if (node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); } } } - }); + return Promise.resolve(); + } this.on("close", function() { for (var t in node.topics) { /* istanbul ignore else */ diff --git a/nodes/core/logic/10-switch.js b/nodes/core/logic/10-switch.js index d15be39de..79b6e20b3 100644 --- a/nodes/core/logic/10-switch.js +++ b/nodes/core/logic/10-switch.js @@ -451,7 +451,7 @@ module.exports = function(RED) { } this.on('input', function(msg) { - processMessageQueue(msg, true); + processMessageQueue(msg); }); this.on('close', function() { diff --git a/test/nodes/core/core/89-trigger_spec.js b/test/nodes/core/core/89-trigger_spec.js index d07ab2a9b..f19bc42fe 100644 --- a/test/nodes/core/core/89-trigger_spec.js +++ b/test/nodes/core/core/89-trigger_spec.js @@ -288,7 +288,7 @@ describe('trigger node', function() { it('should be able to return things from flow and global context variables', function(done) { var spy = sinon.stub(RED.util, 'evaluateNodeProperty', - function(arg1, arg2, arg3, arg4) { return arg1; } + function(arg1, arg2, arg3, arg4, arg5) { if (arg5) { arg5(null, arg1) } else { return arg1; } } ); var flow = [{"id":"n1", "type":"trigger", "name":"triggerNode", op1:"foo", op1type:"flow", op2:"bar", op2type:"global", duration:"20", wires:[["n2"]] }, {id:"n2", type:"helper"} ]; @@ -386,7 +386,7 @@ describe('trigger node', function() { it('should be able to extend the delay', function(done) { this.timeout(5000); // add extra time for flake var spy = sinon.stub(RED.util, 'evaluateNodeProperty', - function(arg1, arg2, arg3, arg4) { return arg1; } + function(arg1, arg2, arg3, arg4, arg5) { if (arg5) { arg5(null, arg1) } else { return arg1; } } ); var flow = [{"id":"n1", "type":"trigger", "name":"triggerNode", extend:"true", op1type:"flow", op1:"foo", op2:"bar", op2type:"global", duration:"100", wires:[["n2"]] }, {id:"n2", type:"helper"} ]; @@ -428,12 +428,10 @@ describe('trigger node', function() { n2.on("input", function(msg) { try { if (c === 0) { - console.log(c,Date.now() - ss,msg); msg.should.have.a.property("payload", "Hello"); c += 1; } else { - console.log(c,Date.now() - ss,msg); msg.should.have.a.property("payload", "World"); (Date.now() - ss).should.be.greaterThan(150); done();