Messaging API support in Trigger node

This commit is contained in:
Kunihiko Toumura
2020-11-05 15:09:41 +09:00
parent 15a600c763
commit f038069fe2
2 changed files with 61 additions and 18 deletions

View File

@@ -82,10 +82,10 @@ module.exports = function(RED) {
var npay = {};
var pendingMessages = [];
var activeMessagePromise = null;
var processMessageQueue = function(msg) {
if (msg) {
var processMessageQueue = function(msgInfo) {
if (msgInfo) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
pendingMessages.push(msgInfo);
if (activeMessagePromise !== null) {
// The node is currently processing a message, so do nothing
// more with this message
@@ -101,17 +101,17 @@ module.exports = function(RED) {
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = processMessage(nextMsg)
var nextMsgInfo = pendingMessages.shift();
activeMessagePromise = processMessage(nextMsgInfo)
.then(processMessageQueue)
.catch((err) => {
node.error(err,nextMsg);
nextMsgInfo.done(err);
return processMessageQueue();
});
}
this.on('input', function(msg) {
processMessageQueue(msg);
this.on('input', function(msg, send, done) {
processMessageQueue({msg, send, done});
});
var stat = function() {
@@ -121,7 +121,8 @@ module.exports = function(RED) {
else return {fill:"blue",shape:"dot",text:l};
}
var processMessage = function(msg) {
var processMessage = function(msgInfo) {
let msg = msgInfo.msg;
var topic = RED.util.getMessageProperty(msg,node.topic) || "_none";
var promise;
var delayDuration = node.duration;
@@ -179,7 +180,7 @@ module.exports = function(RED) {
/* istanbul ignore else */
if (node.op1type !== "nul") {
var msg2 = RED.util.cloneMessage(msg);
node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, delayDuration);
node.topics[topic].tout = setInterval(function() { msgInfo.send(RED.util.cloneMessage(msg2)); }, delayDuration);
}
}
else {
@@ -203,14 +204,14 @@ module.exports = function(RED) {
}
promise.then(() => {
if (node.op2type === "payl") {
if (node.second === true) { node.send([null,npay[topic]]); }
else { node.send(npay[topic]); }
if (node.second === true) { msgInfo.send([null,npay[topic]]); }
else { msgInfo.send(npay[topic]); }
delete npay[topic];
}
else {
msg2.payload = node.topics[topic].m2;
if (node.second === true) { node.send([null,msg2]); }
else { node.send(msg2); }
if (node.second === true) { msgInfo.send([null,msg2]); }
else { msgInfo.send(msg2); }
}
delete node.topics[topic];
node.status(stat());
@@ -225,8 +226,9 @@ module.exports = function(RED) {
}, delayDuration);
}
}
msgInfo.done();
node.status(stat());
if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); }
if (node.op1type !== "nul") { msgInfo.send(RED.util.cloneMessage(msg)); }
});
});
}
@@ -262,8 +264,8 @@ module.exports = function(RED) {
}
delete node.topics[topic];
node.status(stat());
if (node.second === true) { node.send([null,msg2]); }
else { node.send(msg2); }
if (node.second === true) { msgInfo.send([null,msg2]); }
else { msgInfo.send(msg2); }
}).catch(err => {
node.error(err);
});
@@ -273,6 +275,7 @@ module.exports = function(RED) {
// if (node.op2type === "payl") {node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
// }
}
msgInfo.done();
return Promise.resolve();
}
this.on("close", function() {