1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Add async message handling to Trigger node

This commit is contained in:
Nick O'Leary 2018-07-09 14:12:44 +01:00
parent b0d7e11d48
commit d7adff9a65
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
3 changed files with 143 additions and 52 deletions

View File

@ -76,8 +76,43 @@ module.exports = function(RED) {
var node = this; var node = this;
node.topics = {}; node.topics = {};
this.on("input", function(msg) { var pendingMessages = [];
var activeMessagePromise = null;
var processMessageQueue = function(msg) {
if (msg) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
if (activeMessagePromise !== null) {
// The node is currently processing a message, so do nothing
// more with this message
return;
}
}
if (pendingMessages.length === 0) {
// There are no more messages to process, clear the active flag
// and return
activeMessagePromise = null;
return;
}
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = processMessage(nextMsg)
.then(processMessageQueue)
.catch((err) => {
node.error(err,nextMsg);
return processMessageQueue();
});
}
this.on('input', function(msg) {
processMessageQueue(msg);
});
var processMessage = function(msg) {
var topic = msg.topic || "_none"; var topic = msg.topic || "_none";
var promise;
if (node.bytopic === "all") { topic = "_none"; } if (node.bytopic === "all") { topic = "_none"; }
node.topics[topic] = node.topics[topic] || {}; node.topics[topic] = node.topics[topic] || {};
if (msg.hasOwnProperty("reset") || ((node.reset !== '') && msg.hasOwnProperty("payload") && (msg.payload !== null) && msg.payload.toString && (msg.payload.toString() == node.reset)) ) { if (msg.hasOwnProperty("reset") || ((node.reset !== '') && msg.hasOwnProperty("payload") && (msg.payload !== null) && msg.payload.toString && (msg.payload.toString() == node.reset)) ) {
@ -88,18 +123,39 @@ module.exports = function(RED) {
} }
else { else {
if (((!node.topics[topic].tout) && (node.topics[topic].tout !== 0)) || (node.loop === true)) { if (((!node.topics[topic].tout) && (node.topics[topic].tout !== 0)) || (node.loop === true)) {
promise = Promise.resolve();
if (node.op2type === "pay" || node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); } if (node.op2type === "pay" || node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
else if (node.op2Templated) { node.topics[topic].m2 = mustache.render(node.op2,msg); } else if (node.op2Templated) { node.topics[topic].m2 = mustache.render(node.op2,msg); }
else if (node.op2type !== "nul") { else if (node.op2type !== "nul") {
node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg); promise = new Promise((resolve,reject) => {
RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => {
if (err) {
reject(err);
} else {
node.topics[topic].m2 = value;
resolve();
}
});
});
} }
return promise.then(() => {
promise = Promise.resolve();
if (node.op1type === "pay") { } if (node.op1type === "pay") { }
else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); } else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); }
else if (node.op1type !== "nul") { else if (node.op1type !== "nul") {
msg.payload = RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg); promise = new Promise((resolve,reject) => {
RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg,(err,value) => {
if (err) {
reject(err);
} else {
msg.payload = value;
resolve();
} }
});
});
}
return promise.then(() => {
if (node.duration === 0) { node.topics[topic].tout = 0; } if (node.duration === 0) { node.topics[topic].tout = 0; }
else if (node.loop === true) { else if (node.loop === true) {
/* istanbul ignore else */ /* istanbul ignore else */
@ -115,21 +171,40 @@ module.exports = function(RED) {
node.topics[topic].tout = setTimeout(function() { node.topics[topic].tout = setTimeout(function() {
var msg2 = null; var msg2 = null;
if (node.op2type !== "nul") { if (node.op2type !== "nul") {
var promise = Promise.resolve();
msg2 = RED.util.cloneMessage(msg); msg2 = RED.util.cloneMessage(msg);
if (node.op2type === "flow" || node.op2type === "global") { if (node.op2type === "flow" || node.op2type === "global") {
node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg); promise = new Promise((resolve,reject) => {
RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => {
if (err) {
reject(err);
} else {
node.topics[topic].m2 = value;
resolve();
} }
});
});
}
promise.then(() => {
msg2.payload = node.topics[topic].m2; msg2.payload = node.topics[topic].m2;
delete node.topics[topic]; delete node.topics[topic];
node.send(msg2); node.send(msg2);
}
else { delete node.topics[topic]; }
node.status({}); node.status({});
}).catch(err => {
node.error(err);
});
} else {
delete node.topics[topic];
node.status({});
}
}, node.duration); }, node.duration);
} }
} }
node.status({fill:"blue",shape:"dot",text:" "}); node.status({fill:"blue",shape:"dot",text:" "});
if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); } if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); }
});
});
} }
else if ((node.extend === "true" || node.extend === true) && (node.duration > 0)) { else if ((node.extend === "true" || node.extend === true) && (node.duration > 0)) {
/* istanbul ignore else */ /* istanbul ignore else */
@ -138,10 +213,24 @@ module.exports = function(RED) {
if (node.topics[topic].tout) { clearTimeout(node.topics[topic].tout); } if (node.topics[topic].tout) { clearTimeout(node.topics[topic].tout); }
node.topics[topic].tout = setTimeout(function() { node.topics[topic].tout = setTimeout(function() {
var msg2 = null; var msg2 = null;
var promise = Promise.resolve();
if (node.op2type !== "nul") { if (node.op2type !== "nul") {
if (node.op2type === "flow" || node.op2type === "global") { if (node.op2type === "flow" || node.op2type === "global") {
node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg); promise = new Promise((resolve,reject) => {
RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => {
if (err) {
reject(err);
} else {
node.topics[topic].m2 = value;
resolve();
} }
});
});
}
}
promise.then(() => {
if (node.op2type !== "nul") {
if (node.topics[topic] !== undefined) { if (node.topics[topic] !== undefined) {
msg2 = RED.util.cloneMessage(msg); msg2 = RED.util.cloneMessage(msg);
msg2.payload = node.topics[topic].m2; msg2.payload = node.topics[topic].m2;
@ -150,13 +239,17 @@ module.exports = function(RED) {
delete node.topics[topic]; delete node.topics[topic];
node.status({}); node.status({});
node.send(msg2); node.send(msg2);
}).catch(err => {
node.error(err);
});
}, node.duration); }, node.duration);
} }
else { else {
if (node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); } if (node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
} }
} }
}); return Promise.resolve();
}
this.on("close", function() { this.on("close", function() {
for (var t in node.topics) { for (var t in node.topics) {
/* istanbul ignore else */ /* istanbul ignore else */

View File

@ -451,7 +451,7 @@ module.exports = function(RED) {
} }
this.on('input', function(msg) { this.on('input', function(msg) {
processMessageQueue(msg, true); processMessageQueue(msg);
}); });
this.on('close', function() { this.on('close', function() {

View File

@ -288,7 +288,7 @@ describe('trigger node', function() {
it('should be able to return things from flow and global context variables', function(done) { it('should be able to return things from flow and global context variables', function(done) {
var spy = sinon.stub(RED.util, 'evaluateNodeProperty', var spy = sinon.stub(RED.util, 'evaluateNodeProperty',
function(arg1, arg2, arg3, arg4) { return arg1; } function(arg1, arg2, arg3, arg4, arg5) { if (arg5) { arg5(null, arg1) } else { return arg1; } }
); );
var flow = [{"id":"n1", "type":"trigger", "name":"triggerNode", op1:"foo", op1type:"flow", op2:"bar", op2type:"global", duration:"20", wires:[["n2"]] }, var flow = [{"id":"n1", "type":"trigger", "name":"triggerNode", op1:"foo", op1type:"flow", op2:"bar", op2type:"global", duration:"20", wires:[["n2"]] },
{id:"n2", type:"helper"} ]; {id:"n2", type:"helper"} ];
@ -386,7 +386,7 @@ describe('trigger node', function() {
it('should be able to extend the delay', function(done) { it('should be able to extend the delay', function(done) {
this.timeout(5000); // add extra time for flake this.timeout(5000); // add extra time for flake
var spy = sinon.stub(RED.util, 'evaluateNodeProperty', var spy = sinon.stub(RED.util, 'evaluateNodeProperty',
function(arg1, arg2, arg3, arg4) { return arg1; } function(arg1, arg2, arg3, arg4, arg5) { if (arg5) { arg5(null, arg1) } else { return arg1; } }
); );
var flow = [{"id":"n1", "type":"trigger", "name":"triggerNode", extend:"true", op1type:"flow", op1:"foo", op2:"bar", op2type:"global", duration:"100", wires:[["n2"]] }, var flow = [{"id":"n1", "type":"trigger", "name":"triggerNode", extend:"true", op1type:"flow", op1:"foo", op2:"bar", op2type:"global", duration:"100", wires:[["n2"]] },
{id:"n2", type:"helper"} ]; {id:"n2", type:"helper"} ];
@ -428,12 +428,10 @@ describe('trigger node', function() {
n2.on("input", function(msg) { n2.on("input", function(msg) {
try { try {
if (c === 0) { if (c === 0) {
console.log(c,Date.now() - ss,msg);
msg.should.have.a.property("payload", "Hello"); msg.should.have.a.property("payload", "Hello");
c += 1; c += 1;
} }
else { else {
console.log(c,Date.now() - ss,msg);
msg.should.have.a.property("payload", "World"); msg.should.have.a.property("payload", "World");
(Date.now() - ss).should.be.greaterThan(150); (Date.now() - ss).should.be.greaterThan(150);
done(); done();