mirror of
https://github.com/node-red/node-red.git
synced 2025-03-01 10:36:34 +00:00
Merge branch 'pr_2751' into dev
This commit is contained in:
@@ -82,10 +82,10 @@ module.exports = function(RED) {
|
||||
var npay = {};
|
||||
var pendingMessages = [];
|
||||
var activeMessagePromise = null;
|
||||
var processMessageQueue = function(msg) {
|
||||
if (msg) {
|
||||
var processMessageQueue = function(msgInfo) {
|
||||
if (msgInfo) {
|
||||
// A new message has arrived - add it to the message queue
|
||||
pendingMessages.push(msg);
|
||||
pendingMessages.push(msgInfo);
|
||||
if (activeMessagePromise !== null) {
|
||||
// The node is currently processing a message, so do nothing
|
||||
// more with this message
|
||||
@@ -101,17 +101,17 @@ module.exports = function(RED) {
|
||||
|
||||
// There are more messages to process. Get the next message and
|
||||
// start processing it. Recurse back in to check for any more
|
||||
var nextMsg = pendingMessages.shift();
|
||||
activeMessagePromise = processMessage(nextMsg)
|
||||
var nextMsgInfo = pendingMessages.shift();
|
||||
activeMessagePromise = processMessage(nextMsgInfo)
|
||||
.then(processMessageQueue)
|
||||
.catch((err) => {
|
||||
node.error(err,nextMsg);
|
||||
nextMsgInfo.done(err);
|
||||
return processMessageQueue();
|
||||
});
|
||||
}
|
||||
|
||||
this.on('input', function(msg) {
|
||||
processMessageQueue(msg);
|
||||
this.on('input', function(msg, send, done) {
|
||||
processMessageQueue({msg, send, done});
|
||||
});
|
||||
|
||||
var stat = function() {
|
||||
@@ -121,7 +121,8 @@ module.exports = function(RED) {
|
||||
else return {fill:"blue",shape:"dot",text:l};
|
||||
}
|
||||
|
||||
var processMessage = function(msg) {
|
||||
var processMessage = function(msgInfo) {
|
||||
let msg = msgInfo.msg;
|
||||
var topic = RED.util.getMessageProperty(msg,node.topic) || "_none";
|
||||
var promise;
|
||||
var delayDuration = node.duration;
|
||||
@@ -181,7 +182,7 @@ module.exports = function(RED) {
|
||||
var msg2 = RED.util.cloneMessage(msg);
|
||||
node.topics[topic].tout = setInterval(function() {
|
||||
if (node.op1type === "date") { msg2.payload = Date.now(); }
|
||||
node.send(RED.util.cloneMessage(msg2));
|
||||
msgInfo.send(RED.util.cloneMessage(msg2));
|
||||
}, delayDuration);
|
||||
}
|
||||
}
|
||||
@@ -206,15 +207,15 @@ module.exports = function(RED) {
|
||||
}
|
||||
promise.then(() => {
|
||||
if (node.op2type === "payl") {
|
||||
if (node.second === true) { node.send([null,npay[topic]]); }
|
||||
else { node.send(npay[topic]); }
|
||||
if (node.second === true) { msgInfo.send([null,npay[topic]]); }
|
||||
else { msgInfo.send(npay[topic]); }
|
||||
delete npay[topic];
|
||||
}
|
||||
else {
|
||||
msg2.payload = node.topics[topic].m2;
|
||||
if (node.op2type === "date") { msg2.payload = Date.now(); }
|
||||
if (node.second === true) { node.send([null,msg2]); }
|
||||
else { node.send(msg2); }
|
||||
if (node.second === true) { msgInfo.send([null,msg2]); }
|
||||
else { msgInfo.send(msg2); }
|
||||
}
|
||||
delete node.topics[topic];
|
||||
node.status(stat());
|
||||
@@ -229,8 +230,9 @@ module.exports = function(RED) {
|
||||
}, delayDuration);
|
||||
}
|
||||
}
|
||||
msgInfo.done();
|
||||
node.status(stat());
|
||||
if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); }
|
||||
if (node.op1type !== "nul") { msgInfo.send(RED.util.cloneMessage(msg)); }
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -266,8 +268,8 @@ module.exports = function(RED) {
|
||||
}
|
||||
delete node.topics[topic];
|
||||
node.status(stat());
|
||||
if (node.second === true) { node.send([null,msg2]); }
|
||||
else { node.send(msg2); }
|
||||
if (node.second === true) { msgInfo.send([null,msg2]); }
|
||||
else { msgInfo.send(msg2); }
|
||||
}).catch(err => {
|
||||
node.error(err);
|
||||
});
|
||||
@@ -277,6 +279,7 @@ module.exports = function(RED) {
|
||||
// if (node.op2type === "payl") {node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
|
||||
// }
|
||||
}
|
||||
msgInfo.done();
|
||||
return Promise.resolve();
|
||||
}
|
||||
this.on("close", function() {
|
||||
|
Reference in New Issue
Block a user