Merge branch 'dev' into mqtt5

This commit is contained in:
Steve-Mcl
2021-02-18 18:50:26 +00:00
247 changed files with 10223 additions and 2444 deletions

View File

@@ -18,7 +18,7 @@
color:"#c0edc0",
defaults: {
name: {value:""},
scope: {value:[]},
scope: {value:[], type:"*[]"},
uncaught: {value:false}
},
inputs:0,

View File

@@ -30,7 +30,7 @@
color:"#e49191",
defaults: {
name: {value:""},
scope: {value:null},
scope: {value:null, type:"*[]"},
uncaught: {value:false}
},
inputs:0,

View File

@@ -26,7 +26,7 @@
color:"#94c1d0",
defaults: {
name: {value:""},
scope: {value:null}
scope: {value:null, type:"*[]"}
},
inputs:0,
outputs:1,

View File

@@ -187,7 +187,7 @@
color:"#ddd",//"#87D8CF",
defaults: {
name: {value:""},
links: { value: [] }
links: { value: [], type:"link out[]" }
},
inputs:0,
outputs:1,
@@ -216,7 +216,7 @@
color:"#ddd",//"#87D8CF",
defaults: {
name: {value:""},
links: { value: []}
links: { value: [], type:"link in[]"}
},
align:"right",
inputs:1,

View File

@@ -168,6 +168,10 @@ module.exports = function(RED) {
return getFromValueType(RED.util.getMessageProperty(msg,rule.from),done);
} else if (rule.fromt === 'flow' || rule.fromt === 'global') {
var contextKey = RED.util.parseContextStore(rule.from);
if (/\[msg\./.test(context.key)) {
// The key has a nest msg. reference to evaluate first
context.key = RED.util.normalisePropertyExpression(contextKey.key,msg,true);
}
node.context()[rule.fromt].get(contextKey.key, contextKey.store, (err,fromValue) => {
if (err) {
done(err)
@@ -243,6 +247,10 @@ module.exports = function(RED) {
return done(undefined,msg);
} else if (rule.pt === 'flow' || rule.pt === 'global') {
var contextKey = RED.util.parseContextStore(property);
if (/\[msg/.test(contextKey.key)) {
// The key has a nest msg. reference to evaluate first
contextKey.key = RED.util.normalisePropertyExpression(contextKey.key, msg, true)
}
var target = node.context()[rule.pt];
var callback = err => {
if (err) {

View File

@@ -80,10 +80,10 @@ module.exports = function(RED) {
this.drop = n.drop;
var node = this;
function ourTimeout(handler, delay) {
function ourTimeout(handler, delay, clearHandler) {
var toutID = setTimeout(handler, delay);
return {
clear: function() { clearTimeout(toutID); },
clear: function() { clearTimeout(toutID); clearHandler(); },
trigger: function() { clearTimeout(toutID); return handler(); }
};
}
@@ -113,14 +113,15 @@ module.exports = function(RED) {
}
if (node.pauseType === "delay") {
node.on("input", function(msg) {
if (msg.hasOwnProperty("flush")) { flushDelayList(); }
node.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("flush")) { flushDelayList(); done(); }
else {
var id = ourTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
if (node.idList.length === 0) { node.status({}); }
node.send(msg);
}, node.timeout);
send(msg);
done();
}, node.timeout, () => done());
node.idList.push(id);
if ((node.timeout > 1000) && (node.idList.length !== 0)) {
node.status({fill:"blue",shape:"dot",text:" "});
@@ -131,7 +132,7 @@ module.exports = function(RED) {
node.on("close", function() { clearDelayList(); });
}
else if (node.pauseType === "delayv") {
node.on("input", function(msg) {
node.on("input", function(msg, send, done) {
var delayvar = Number(node.timeout);
if (msg.hasOwnProperty("delay") && !isNaN(parseFloat(msg.delay))) {
delayvar = parseFloat(msg.delay);
@@ -140,8 +141,9 @@ module.exports = function(RED) {
var id = ourTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
if (node.idList.length === 0) { node.status({}); }
node.send(msg);
}, delayvar);
send(msg);
done();
}, delayvar, () => done());
node.idList.push(id);
if ((delayvar >= 0) && (node.idList.length !== 0)) {
node.status({fill:"blue",shape:"dot",text:delayvar/1000+"s"});
@@ -152,7 +154,7 @@ module.exports = function(RED) {
node.on("close", function() { clearDelayList(); });
}
else if (node.pauseType === "rate") {
node.on("input", function(msg) {
node.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
if (node.intervalID !== -1 ) {
clearInterval(node.intervalID);
@@ -161,17 +163,18 @@ module.exports = function(RED) {
delete node.lastSent;
node.buffer = [];
node.status({text:"reset"});
done();
return;
}
if (!node.drop) {
var m = RED.util.cloneMessage(msg);
delete m.flush;
if (node.intervalID !== -1) {
node.buffer.push(m);
node.buffer.push({msg: m, send: send, done: done});
node.reportDepth();
}
else {
node.send(m);
send(m);
node.reportDepth();
node.intervalID = setInterval(function() {
if (node.buffer.length === 0) {
@@ -179,16 +182,22 @@ module.exports = function(RED) {
node.intervalID = -1;
}
if (node.buffer.length > 0) {
node.send(node.buffer.shift());
const msgInfo = node.buffer.shift();
msgInfo.send(msgInfo.msg);
msgInfo.done();
}
node.reportDepth();
}, node.rate);
done();
}
if (msg.hasOwnProperty("flush")) {
while (node.buffer.length > 0) {
node.send(node.buffer.shift());
const msgInfo = node.buffer.shift();
msgInfo.send(msgInfo.msg);
msgInfo.done();
}
node.status({});
done();
}
}
else {
@@ -198,17 +207,19 @@ module.exports = function(RED) {
}
if (!node.lastSent) { // ensuring that we always send the first message
node.lastSent = process.hrtime();
node.send(msg);
send(msg);
}
else if ( ( (timeSinceLast[0] * SECONDS_TO_NANOS) + timeSinceLast[1] ) > (node.rate * MILLIS_TO_NANOS) ) {
node.lastSent = process.hrtime();
node.send(msg);
send(msg);
}
done();
}
});
node.on("close", function() {
clearInterval(node.intervalID);
clearTimeout(node.busy);
node.buffer.forEach((msgInfo) => msgInfo.done());
node.buffer = [];
node.status({});
});
@@ -217,57 +228,75 @@ module.exports = function(RED) {
node.intervalID = setInterval(function() {
if (node.pauseType === "queue") {
if (node.buffer.length > 0) {
node.send(node.buffer.shift()); // send the first on the queue
const msgInfo = node.buffer.shift();
msgInfo.send(msgInfo.msg); // send the first on the queue
msgInfo.done();
}
}
else {
while (node.buffer.length > 0) { // send the whole queue
node.send(node.buffer.shift());
const msgInfo = node.buffer.shift();
msgInfo.send(msgInfo.msg);
msgInfo.done();
}
}
node.reportDepth();
},node.rate);
var hit;
node.on("input", function(msg) {
node.on("input", function(msg, send, done) {
if (!msg.hasOwnProperty("topic")) { msg.topic = "_none_"; }
hit = false;
for (var b in node.buffer) { // check if already in queue
if (msg.topic === node.buffer[b].topic) {
node.buffer[b] = msg; // if so - replace existing entry
if (msg.topic === node.buffer[b].msg.topic) {
node.buffer[b].done();
node.buffer[b] = {msg, send, done}; // if so - replace existing entry
hit = true;
break;
}
}
if (!hit) {
node.buffer.push(msg); // if not add to end of queue
node.buffer.push({msg, send, done}); // if not add to end of queue
node.reportDepth();
}
if (msg.hasOwnProperty("reset")) {
while (node.buffer.length > 0) {
const msgInfo = node.buffer.shift();
msgInfo.done();
}
node.buffer = [];
node.status({text:"reset"});
done();
}
if (msg.hasOwnProperty("flush")) {
while (node.buffer.length > 0) {
node.send(node.buffer.shift());
const msgInfo = node.buffer.shift();
msgInfo.send(msgInfo.msg);
msgInfo.done();
}
node.status({});
done();
}
});
node.on("close", function() {
clearInterval(node.intervalID);
while (node.buffer.length > 0) {
const msgInfo = node.buffer.shift();
msgInfo.done();
}
node.buffer = [];
node.status({});
});
}
else if (node.pauseType === "random") {
node.on("input", function(msg) {
node.on("input", function(msg, send, done) {
var wait = node.randomFirst + (node.diff * Math.random());
var id = ourTimeout(function() {
node.idList.splice(node.idList.indexOf(id),1);
node.send(msg);
send(msg);
node.status({});
}, wait);
done();
}, wait, () => done());
node.idList.push(id);
if ((node.timeout >= 1000) && (node.idList.length !== 0)) {
node.status({fill:"blue",shape:"dot",text:parseInt(wait/10)/100+"s"});

View File

@@ -82,10 +82,10 @@ module.exports = function(RED) {
var npay = {};
var pendingMessages = [];
var activeMessagePromise = null;
var processMessageQueue = function(msg) {
if (msg) {
var processMessageQueue = function(msgInfo) {
if (msgInfo) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
pendingMessages.push(msgInfo);
if (activeMessagePromise !== null) {
// The node is currently processing a message, so do nothing
// more with this message
@@ -101,17 +101,17 @@ module.exports = function(RED) {
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = processMessage(nextMsg)
var nextMsgInfo = pendingMessages.shift();
activeMessagePromise = processMessage(nextMsgInfo)
.then(processMessageQueue)
.catch((err) => {
node.error(err,nextMsg);
nextMsgInfo.done(err);
return processMessageQueue();
});
}
this.on('input', function(msg) {
processMessageQueue(msg);
this.on('input', function(msg, send, done) {
processMessageQueue({msg, send, done});
});
var stat = function() {
@@ -121,7 +121,8 @@ module.exports = function(RED) {
else return {fill:"blue",shape:"dot",text:l};
}
var processMessage = function(msg) {
var processMessage = function(msgInfo) {
let msg = msgInfo.msg;
var topic = RED.util.getMessageProperty(msg,node.topic) || "_none";
var promise;
var delayDuration = node.duration;
@@ -179,7 +180,10 @@ module.exports = function(RED) {
/* istanbul ignore else */
if (node.op1type !== "nul") {
var msg2 = RED.util.cloneMessage(msg);
node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, delayDuration);
node.topics[topic].tout = setInterval(function() {
if (node.op1type === "date") { msg2.payload = Date.now(); }
msgInfo.send(RED.util.cloneMessage(msg2));
}, delayDuration);
}
}
else {
@@ -203,14 +207,15 @@ module.exports = function(RED) {
}
promise.then(() => {
if (node.op2type === "payl") {
if (node.second === true) { node.send([null,npay[topic]]); }
else { node.send(npay[topic]); }
if (node.second === true) { msgInfo.send([null,npay[topic]]); }
else { msgInfo.send(npay[topic]); }
delete npay[topic];
}
else {
msg2.payload = node.topics[topic].m2;
if (node.second === true) { node.send([null,msg2]); }
else { node.send(msg2); }
if (node.op2type === "date") { msg2.payload = Date.now(); }
if (node.second === true) { msgInfo.send([null,msg2]); }
else { msgInfo.send(msg2); }
}
delete node.topics[topic];
node.status(stat());
@@ -225,8 +230,9 @@ module.exports = function(RED) {
}, delayDuration);
}
}
msgInfo.done();
node.status(stat());
if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); }
if (node.op1type !== "nul") { msgInfo.send(RED.util.cloneMessage(msg)); }
});
});
}
@@ -262,8 +268,8 @@ module.exports = function(RED) {
}
delete node.topics[topic];
node.status(stat());
if (node.second === true) { node.send([null,msg2]); }
else { node.send(msg2); }
if (node.second === true) { msgInfo.send([null,msg2]); }
else { msgInfo.send(msg2); }
}).catch(err => {
node.error(err);
});
@@ -273,6 +279,7 @@ module.exports = function(RED) {
// if (node.op2type === "payl") {node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
// }
}
msgInfo.done();
return Promise.resolve();
}
this.on("close", function() {

View File

@@ -52,7 +52,7 @@
color:"darksalmon",
defaults: {
command: {value:""},
addpay: {value:true},
addpay: {value:false},
append: {value:""},
useSpawn: {value:"false"},
timer: {value:""},

View File

@@ -31,12 +31,12 @@ module.exports = function(RED) {
this.timer = Number(n.timer || 0)*1000;
this.activeProcesses = {};
this.oldrc = (n.oldrc || false).toString();
this.execOpt = {encoding:'binary', maxBuffer:10000000};
this.execOpt = {encoding:'binary', maxBuffer:RED.settings.execMaxBufferSize||10000000};
var node = this;
if (process.platform === 'linux' && fs.existsSync('/bin/bash')) { node.execOpt.shell = '/bin/bash'; }
var cleanup = function(p) {
if (process.platform === 'linux' && fs.existsSync('/bin/bash')) { node.execOpt.shell = '/bin/bash'; }
var cleanup = function(p) {
node.activeProcesses[p].kill();
//node.status({fill:"red",shape:"dot",text:"timeout"});
//node.error("Exec node timeout");

View File

@@ -210,7 +210,7 @@ module.exports = function(RED) {
var httpMiddleware = function(req,res,next) { next(); }
if (RED.settings.httpNodeMiddleware) {
if (typeof RED.settings.httpNodeMiddleware === "function") {
if (typeof RED.settings.httpNodeMiddleware === "function" || Array.isArray(RED.settings.httpNodeMiddleware)) {
httpMiddleware = RED.settings.httpNodeMiddleware;
}
}

View File

@@ -235,6 +235,7 @@
oneditprepare: function() {
var previous = null;
$("#node-input-out").on('focus', function () { previous = this.value; }).on("change", function() {
$("#node-input-splitc").show();
if (previous === null) { previous = $("#node-input-out").val(); }
if ($("#node-input-out").val() == "char") {
if (previous != "char") { $("#node-input-splitc").val("\\n"); }
@@ -247,6 +248,7 @@
else if ($("#node-input-out").val() == "immed") {
if (previous != "immed") { $("#node-input-splitc").val(" "); }
$("#node-units").text("");
$("#node-input-splitc").hide();
}
else if ($("#node-input-out").val() == "count") {
if (previous != "count") { $("#node-input-splitc").val("12"); }
@@ -255,6 +257,7 @@
else {
if (previous != "sit") { $("#node-input-splitc").val(" "); }
$("#node-units").text("");
$("#node-input-splitc").hide();
}
});
}

View File

@@ -18,7 +18,7 @@ module.exports = function(RED) {
"use strict";
function CSVNode(n) {
RED.nodes.createNode(this,n);
this.template = (n.temp || "").split(",");
this.template = (n.temp || "");
this.sep = (n.sep || ',').replace("\\t","\t").replace("\\n","\n").replace("\\r","\r");
this.quo = '"';
this.ret = (n.ret || "\n").replace("\\n","\n").replace("\\r","\r");
@@ -38,16 +38,12 @@ module.exports = function(RED) {
if (this.hdrout === true) { this.hdrout = "all"; }
var tmpwarn = true;
var node = this;
var re = new RegExp(',(?=(?:(?:[^"]*"){2})*[^"]*$)','g');
// pass in an array of column names to be trimed, de-quoted and retrimed
var clean = function(col) {
for (var t = 0; t < col.length; t++) {
col[t] = col[t].trim(); // remove leading and trailing whitespace
if (col[t].charAt(0) === '"' && col[t].charAt(col[t].length -1) === '"') {
// remove leading and trailing quotes (if they exist) - and remove whitepace again.
col[t] = col[t].substr(1,col[t].length -2).trim();
}
}
col = col.trim().split(re) || [""];
col = col.map(x => x.replace(/"/g,'').trim());
if ((col.length === 1) && (col[0] === "")) { node.goodtmpl = false; }
else { node.goodtmpl = true; }
return col;
@@ -55,7 +51,7 @@ module.exports = function(RED) {
node.template = clean(node.template);
node.hdrSent = false;
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
node.hdrSent = false;
}
@@ -67,13 +63,14 @@ module.exports = function(RED) {
if (node.hdrout !== "none" && node.hdrSent === false) {
if ((node.template.length === 1) && (node.template[0] === '')) {
if (msg.hasOwnProperty("columns")) {
node.template = clean((msg.columns || "").split(","));
node.template = clean(msg.columns || "");
}
else {
node.template = Object.keys(msg.payload[0]);
}
}
ou += node.template.join(node.sep) + node.ret;
// ou += node.template.join(node.sep) + node.ret;
ou += node.template.map(v => v.indexOf(node.sep)!==-1 ? '"'+v+'"' : v).join(node.sep) + node.ret;
if (node.hdrout === "once") { node.hdrSent = true; }
}
for (var s = 0; s < msg.payload.length; s++) {
@@ -93,7 +90,7 @@ module.exports = function(RED) {
}
else {
if ((node.template.length === 1) && (node.template[0] === '') && (msg.hasOwnProperty("columns"))) {
node.template = clean((msg.columns || "").split(","));
node.template = clean(msg.columns || "")//.split(","));
}
if ((node.template.length === 1) && (node.template[0] === '')) {
/* istanbul ignore else */
@@ -144,10 +141,11 @@ module.exports = function(RED) {
}
}
msg.payload = ou;
msg.columns = node.template.join(',');
if (msg.payload !== '') { node.send(msg); }
msg.columns = node.template.map(v => v.indexOf(',')!==-1 ? '"'+v+'"' : v).join(',');
if (msg.payload !== '') { send(msg); }
done();
}
catch(e) { node.error(e,msg); }
catch(e) { done(e); }
}
else if (typeof msg.payload == "string") { // convert CSV string to object
try {
@@ -178,7 +176,7 @@ module.exports = function(RED) {
if ((node.hdrin === true) && first) { // if the template is in the first line
if ((line[i] === "\n")||(line[i] === "\r")||(line.length - i === 1)) { // look for first line break
if (line.length - i === 1) { tmp += line[i]; }
node.template = clean(tmp.split(node.sep));
node.template = clean(tmp);
first = false;
}
else { tmp += line[i]; }
@@ -254,22 +252,22 @@ module.exports = function(RED) {
}
if (msg.parts.index + 1 === msg.parts.count) {
msg.payload = node.store;
msg.columns = node.template.filter(val => val).join(',');
msg.columns = node.template.map(v => v.indexOf(',')!==-1 ? '"'+v+'"' : v).filter(v => v).join(',');
delete msg.parts;
node.send(msg);
send(msg);
node.store = [];
}
}
else {
msg.columns = node.template.filter(val => val).join(',');
node.send(msg); // finally send the array
msg.columns = node.template.map(v => v.indexOf(',')!==-1 ? '"'+v+'"' : v).filter(v => v).join(',');
send(msg); // finally send the array
}
}
else {
var len = a.length;
for (var i = 0; i < len; i++) {
var newMessage = RED.util.cloneMessage(msg);
newMessage.columns = node.template.filter(val => val).join(',');
newMessage.columns = node.template.map(v => v.indexOf(',')!==-1 ? '"'+v+'"' : v).filter(v => v).join(',');
newMessage.payload = a[i];
if (!has_parts) {
newMessage.parts = {
@@ -286,19 +284,21 @@ module.exports = function(RED) {
newMessage.parts.count -= 1;
}
}
node.send(newMessage);
send(newMessage);
}
}
node.linecount = 0;
done();
}
catch(e) { node.error(e,msg); }
catch(e) { done(e); }
}
else { node.warn(RED._("csv.errors.csv_js")); }
else { node.warn(RED._("csv.errors.csv_js")); done(); }
}
else {
if (!msg.hasOwnProperty("reset")) {
node.send(msg); // If no payload and not reset - just pass it on.
send(msg); // If no payload and not reset - just pass it on.
}
done();
}
});
}

View File

@@ -17,18 +17,18 @@
module.exports = function(RED) {
"use strict";
function sendArray(node,msg,array) {
function sendArray(node,msg,array,send) {
for (var i = 0; i < array.length-1; i++) {
msg.payload = array[i];
msg.parts.index = node.c++;
if (node.stream !== true) { msg.parts.count = array.length; }
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
if (node.stream !== true) {
msg.payload = array[i];
msg.parts.index = node.c++;
msg.parts.count = array.length;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.c = 0;
}
else { node.remainder = array[i]; }
@@ -67,7 +67,8 @@ module.exports = function(RED) {
}
node.c = 0;
node.buffer = Buffer.from([]);
this.on("input", function(msg) {
node.pendingDones = [];
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("payload")) {
if (msg.hasOwnProperty("parts")) { msg.parts = { parts:msg.parts }; } // push existing parts to a stack
else { msg.parts = {}; }
@@ -93,14 +94,23 @@ module.exports = function(RED) {
msg.payload = data.substring(pos,pos+node.splt);
msg.parts.index = node.c++;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
node.remainder = data.substring(pos);
if ((node.stream !== true) || (node.remainder.length === node.splt)) {
msg.payload = node.remainder;
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones = [];
done();
node.remainder = "";
} else {
node.pendingDones.push(done);
}
}
else {
@@ -115,7 +125,8 @@ module.exports = function(RED) {
a = msg.payload.split(node.splt);
msg.parts.ch = node.splt; // pass the split char to other end for rejoin
}
sendArray(node,msg,a);
sendArray(node,msg,a,send);
done();
}
}
else if (Array.isArray(msg.payload)) { // then split array into messages
@@ -135,8 +146,9 @@ module.exports = function(RED) {
}
msg.parts.index = i;
pos += node.arraySplt;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
done();
}
else if ((typeof msg.payload === "object") && !Buffer.isBuffer(msg.payload)) {
var j = 0;
@@ -152,10 +164,11 @@ module.exports = function(RED) {
msg.parts.key = p;
msg.parts.index = j;
msg.parts.count = l;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
j += 1;
}
}
done();
}
else if (Buffer.isBuffer(msg.payload)) {
var len = node.buffer.length + msg.payload.length;
@@ -176,14 +189,23 @@ module.exports = function(RED) {
msg.payload = buff.slice(pos,pos+node.splt);
msg.parts.index = node.c++;
pos += node.splt;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
node.buffer = buff.slice(pos);
if ((node.stream !== true) || (node.buffer.length === node.splt)) {
msg.payload = node.buffer;
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones = [];
done();
node.buffer = Buffer.from([]);
} else {
node.pendingDones.push(done);
}
}
else {
@@ -210,23 +232,34 @@ module.exports = function(RED) {
while (pos > -1) {
msg.payload = buff.slice(p,pos);
msg.parts.index = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
i++;
p = pos+node.splt.length;
pos = buff.indexOf(node.splt,p);
}
if (count > 1) {
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
if ((node.stream !== true) && (p < buff.length)) {
msg.payload = buff.slice(p,buff.length);
msg.parts.index = node.c++;
msg.parts.count = node.c++;
node.send(RED.util.cloneMessage(msg));
send(RED.util.cloneMessage(msg));
node.pendingDones.forEach(d => d());
node.pendingDones = [];
}
else {
node.buffer = buff.slice(p,buff.length);
node.pendingDones.push(done);
}
if (node.buffer.length == 0) {
done();
}
}
}
//else { } // otherwise drop the message.
} else { // otherwise drop the message.
done();
}
}
});
}
@@ -264,16 +297,16 @@ module.exports = function(RED) {
}
function reduceMessageGroup(node,msgs,exp,fixup,count,accumulator,done) {
var msg = msgs.shift();
exp.assign("I", msg.parts.index);
function reduceMessageGroup(node,msgInfos,exp,fixup,count,accumulator,done) {
var msgInfo = msgInfos.shift();
exp.assign("I", msgInfo.msg.parts.index);
exp.assign("N", count);
exp.assign("A", accumulator);
RED.util.evaluateJSONataExpression(exp, msg, (err,result) => {
RED.util.evaluateJSONataExpression(exp, msgInfo.msg, (err,result) => {
if (err) {
return done(err);
}
if (msgs.length === 0) {
if (msgInfos.length === 0) {
if (fixup) {
fixup.assign("N", count);
fixup.assign("A", result);
@@ -281,39 +314,43 @@ module.exports = function(RED) {
if (err) {
return done(err);
}
node.send({payload: result});
msgInfo.send({payload: result});
done();
});
} else {
node.send({payload: result});
msgInfo.send({payload: result});
done();
}
} else {
reduceMessageGroup(node,msgs,exp,fixup,count,result,done);
reduceMessageGroup(node,msgInfos,exp,fixup,count,result,done);
}
});
}
function reduceAndSendGroup(node, group, done) {
var is_right = node.reduce_right;
var flag = is_right ? -1 : 1;
var msgs = group.msgs;
var msgInfos = group.msgs;
const preservedMsgInfos = [...msgInfos];
try {
RED.util.evaluateNodeProperty(node.exp_init, node.exp_init_type, node, {}, (err,accum) => {
var reduceExpression = node.reduceExpression;
var fixupExpression = node.fixupExpression;
var count = group.count;
msgs.sort(function(x,y) {
var ix = x.parts.index;
var iy = y.parts.index;
msgInfos.sort(function(x,y) {
var ix = x.msg.parts.index;
var iy = y.msg.parts.index;
if (ix < iy) {return -flag;}
if (ix > iy) {return flag;}
return 0;
});
reduceMessageGroup(node, msgs,reduceExpression,fixupExpression,count,accum,(err,result) => {
reduceMessageGroup(node, msgInfos,reduceExpression,fixupExpression,count,accum,(err,result) => {
if (err) {
preservedMsgInfos.pop(); // omit last message to emit error message
preservedMsgInfos.forEach(mInfo => mInfo.done());
done(err);
return;
} else {
preservedMsgInfos.forEach(mInfo => mInfo.done());
done();
}
})
@@ -323,7 +360,8 @@ module.exports = function(RED) {
}
}
function reduceMessage(node, msg, done) {
function reduceMessage(node, msgInfo, done) {
let msg = msgInfo.msg;
if (msg.hasOwnProperty('parts')) {
var parts = msg.parts;
var pending = node.pending;
@@ -344,7 +382,7 @@ module.exports = function(RED) {
if (parts.hasOwnProperty('count') && (group.count === undefined)) {
group.count = parts.count;
}
msgs.push(msg);
msgs.push(msgInfo);
pending_count++;
var completeProcess = function(err) {
if (err) {
@@ -353,6 +391,13 @@ module.exports = function(RED) {
node.pending_count = pending_count;
var max_msgs = maxKeptMsgsCount(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
Object.values(node.pending).forEach(group => {
group.msgs.forEach(mInfo => {
if (mInfo.msg._msgid !== msgInfo.msg._msgid) {
mInfo.done();
}
});
});
node.pending = {};
node.pending_count = 0;
done(RED._("join.too-many"));
@@ -368,7 +413,8 @@ module.exports = function(RED) {
completeProcess();
}
} else {
node.send(msg);
msgInfo.send(msg);
msgInfo.done();
done();
}
}
@@ -480,7 +526,9 @@ module.exports = function(RED) {
delete group.msg.parts;
}
delete group.msg.complete;
node.send(RED.util.cloneMessage(group.msg));
group.send(RED.util.cloneMessage(group.msg));
group.dones.forEach(f => f());
group.dones = [];
}
var pendingMessages = [];
@@ -489,10 +537,10 @@ module.exports = function(RED) {
// groups may overlap and cause unexpected results. The use of JSONata
// means some async processing *might* occur if flow/global context is
// accessed.
var processReduceMessageQueue = function(msg) {
if (msg) {
var processReduceMessageQueue = function(msgInfo) {
if (msgInfo) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
pendingMessages.push(msgInfo);
if (activeMessage !== null) {
// The node is currently processing a message, so do nothing
// more with this message
@@ -508,22 +556,23 @@ module.exports = function(RED) {
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
var nextMsgInfo = pendingMessages.shift();
activeMessage = true;
reduceMessage(node, nextMsg, err => {
reduceMessage(node, nextMsgInfo, err => {
if (err) {
node.error(err,nextMsg);
}
nextMsgInfo.done(err);//.error(err,nextMsg);
}
activeMessage = null;
processReduceMessageQueue();
})
}
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
try {
var property;
if (node.mode === 'auto' && (!msg.hasOwnProperty("parts")||!msg.parts.hasOwnProperty("id"))) {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
done();
return;
}
@@ -535,6 +584,7 @@ module.exports = function(RED) {
property = RED.util.getMessageProperty(msg,node.property);
} catch(err) {
node.warn("Message property "+node.property+" not found");
done();
return;
}
}
@@ -557,7 +607,7 @@ module.exports = function(RED) {
propertyIndex = msg.parts.index;
}
else if (node.mode === 'reduce') {
return processReduceMessageQueue(msg);
return processReduceMessageQueue({msg, send, done});
}
else {
// Use the node configuration to identify all of the group information
@@ -578,9 +628,11 @@ module.exports = function(RED) {
if (inflight[partId].timeout) {
clearTimeout(inflight[partId].timeout);
}
inflight[partId].dones.forEach(f => f());
delete inflight[partId]
}
return
done();
return;
}
if ((payloadType === 'object') && (propertyKey === null || propertyKey === undefined || propertyKey === "")) {
@@ -591,6 +643,7 @@ module.exports = function(RED) {
if (msg.hasOwnProperty('complete')) {
if (inflight[partId]) {
inflight[partId].msg.complete = msg.complete;
inflight[partId].send = send;
completeSend(partId);
}
}
@@ -598,6 +651,7 @@ module.exports = function(RED) {
node.warn("Message missing key property 'msg."+node.key+"' - cannot add to object")
}
}
done();
return;
}
@@ -608,7 +662,9 @@ module.exports = function(RED) {
payload:{},
targetCount:targetCount,
type:"object",
msg:RED.util.cloneMessage(msg)
msg:RED.util.cloneMessage(msg),
send: send,
dones: []
};
}
else {
@@ -617,7 +673,9 @@ module.exports = function(RED) {
payload:[],
targetCount:targetCount,
type:payloadType,
msg:RED.util.cloneMessage(msg)
msg:RED.util.cloneMessage(msg),
send: send,
dones: []
};
if (payloadType === 'string') {
inflight[partId].joinChar = joinChar;
@@ -634,6 +692,7 @@ module.exports = function(RED) {
}, node.timer)
}
}
inflight[partId].dones.push(done);
var group = inflight[partId];
if (payloadType === 'buffer') {
@@ -642,7 +701,7 @@ module.exports = function(RED) {
inflight[partId].bufferLen += property.length;
}
else {
node.error(RED._("join.errors.invalid-type",{error:(typeof property)}),msg);
done(RED._("join.errors.invalid-type",{error:(typeof property)}));
return;
}
}
@@ -676,13 +735,18 @@ module.exports = function(RED) {
}
}
group.msg = Object.assign(group.msg, msg);
group.send = send;
var tcnt = group.targetCount;
if (msg.hasOwnProperty("parts")) { tcnt = group.targetCount || msg.parts.count; }
if (msg.hasOwnProperty("parts")) {
tcnt = group.targetCount || msg.parts.count;
group.targetCount = tcnt;
}
if ((tcnt > 0 && group.currentCount >= tcnt) || msg.hasOwnProperty('complete')) {
completeSend(partId);
}
}
catch(err) {
done(err);
console.log(err.stack);
}
});
@@ -691,9 +755,11 @@ module.exports = function(RED) {
for (var i in inflight) {
if (inflight.hasOwnProperty(i)) {
clearTimeout(inflight[i].timeout);
inflight[i].dones.forEach(d => d());
}
}
});
}
RED.nodes.registerType("join",JoinNode);
}

View File

@@ -81,16 +81,16 @@ module.exports = function(RED) {
function sortMessageGroup(group) {
var promise;
var msgs = group.msgs;
var msgInfos = group.msgInfos;
if (key_is_exp) {
var evaluatedDataPromises = msgs.map(msg => {
var evaluatedDataPromises = msgInfos.map(mInfo => {
return new Promise((resolve,reject) => {
RED.util.evaluateJSONataExpression(key_exp, msg, (err, result) => {
RED.util.evaluateJSONataExpression(key_exp, mInfo.msg, (err, result) => {
if (err) {
reject(RED._("sort.invalid-exp",{message:err.toString()}));
} else {
resolve({
item: msg,
item: mInfo,
sortValue: result
})
}
@@ -106,20 +106,21 @@ module.exports = function(RED) {
var key = function(msg) {
return ;
}
var comp = generateComparisonFunction(msg => RED.util.getMessageProperty(msg, key_prop));
var comp = generateComparisonFunction(mInfo => RED.util.getMessageProperty(mInfo.msg, key_prop));
try {
msgs.sort(comp);
msgInfos.sort(comp);
}
catch (e) {
return; // not send when error
}
promise = Promise.resolve(msgs);
promise = Promise.resolve(msgInfos);
}
return promise.then(msgs => {
for (var i = 0; i < msgs.length; i++) {
var msg = msgs[i];
return promise.then(msgInfos => {
for (let i = 0; i < msgInfos.length; i++) {
const msg = msgInfos[i].msg;
msg.parts.index = i;
node.send(msg);
msgInfos[i].send(msg);
msgInfos[i].done();
}
});
}
@@ -181,65 +182,79 @@ module.exports = function(RED) {
}
}
if(oldest !== undefined) {
oldest.msgInfos[oldest.msgInfos.length - 1].done(RED._("sort.too-many"));
for (let i = 0; i < oldest.msgInfos.length - 1; i++) {
oldest.msgInfos[i].done();
}
delete pending[oldest_key];
return oldest.msgs.length;
return oldest.msgInfos.length;
}
return 0;
}
function processMessage(msg) {
function processMessage(msgInfo) {
const msg = msgInfo.msg;
if (target_is_prop) {
sortMessageProperty(msg).then(send => {
if (send) {
node.send(msg);
msgInfo.send(msg);
}
msgInfo.done();
}).catch(err => {
node.error(err,msg);
msgInfo.done(err);
});
return;
}
var parts = msg.parts;
if (!parts || !parts.hasOwnProperty("id") || !parts.hasOwnProperty("index")) {
msgInfo.done();
return;
}
var gid = parts.id;
if (!pending.hasOwnProperty(gid)) {
pending[gid] = {
count: undefined,
msgs: [],
msgInfos: [],
seq_no: pending_id++
};
}
var group = pending[gid];
var msgs = group.msgs;
msgs.push(msg);
var msgInfos = group.msgInfos;
msgInfos.push(msgInfo);
if (parts.hasOwnProperty("count")) {
group.count = parts.count;
}
pending_count++;
if (group.count === msgs.length) {
if (group.count === msgInfos.length) {
delete pending[gid]
sortMessageGroup(group).catch(err => {
node.error(err,msg);
// throw an error for last message, and just call done() for remaining messages
msgInfos[msgInfos.length-1].done(err);
for (let i = 0; i < msgInfos.length - 1; i++) {
msgInfos[i].done()
};
});
pending_count -= msgs.length;
pending_count -= msgInfos.length;
} else {
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
pending_count -= removeOldestPending();
node.error(RED._("sort.too-many"), msg);
}
}
}
this.on("input", function(msg) {
processMessage(msg);
this.on("input", function(msg, send, done) {
processMessage({msg, send, done});
});
this.on("close", function() {
for(var key in pending) {
if (pending.hasOwnProperty(key)) {
node.log(RED._("sort.clear"), pending[key].msgs[0]);
node.log(RED._("sort.clear"), pending[key].msgInfos[0]);
const group = pending[key];
group.msgInfos.forEach(mInfo => {
mInfo.done();
});
delete pending[key];
}
}

View File

@@ -32,11 +32,11 @@ module.exports = function(RED) {
return _max_kept_msgs_count;
}
function send_msgs(node, msgs, clone_msg) {
var count = msgs.length;
var msg_id = msgs[0]._msgid;
function send_msgs(node, msgInfos, clone_msg) {
var count = msgInfos.length;
var msg_id = msgInfos[0].msg._msgid;
for (var i = 0; i < count; i++) {
var msg = clone_msg ? RED.util.cloneMessage(msgs[i]) : msgs[i];
var msg = clone_msg ? RED.util.cloneMessage(msgInfos[i].msg) : msgInfos[i].msg;
if (!msg.hasOwnProperty("parts")) {
msg.parts = {};
}
@@ -44,14 +44,16 @@ module.exports = function(RED) {
parts.id = msg_id;
parts.index = i;
parts.count = count;
node.send(msg);
msgInfos[i].send(msg);
//msgInfos[i].done();
}
}
function send_interval(node, allow_empty_seq) {
let msgs = node.pending;
if (msgs.length > 0) {
send_msgs(node, msgs, false);
let msgInfos = node.pending;
if (msgInfos.length > 0) {
send_msgs(node, msgInfos, false);
msgInfos.forEach(e => e.done());
node.pending = [];
}
else {
@@ -108,19 +110,20 @@ module.exports = function(RED) {
return;
}
}
var msgs = [];
var msgInfos = [];
for (var topic of topics) {
var t_msgs = get_msgs_of_topic(pending, topic);
msgs = msgs.concat(t_msgs);
var t_msgInfos = get_msgs_of_topic(pending, topic);
msgInfos = msgInfos.concat(t_msgInfos);
}
for (var topic of topics) {
remove_topic(pending, topic);
}
send_msgs(node, msgs, true);
node.pending_count -= msgs.length;
send_msgs(node, msgInfos, true);
msgInfos.forEach(e => e.done() );
node.pending_count -= msgInfos.length;
}
function add_to_topic_group(pending, topic, gid, msg) {
function add_to_topic_group(pending, topic, gid, msgInfo) {
if (!pending.hasOwnProperty(topic)) {
pending[topic] = { groups: {}, gids: [] };
}
@@ -132,32 +135,43 @@ module.exports = function(RED) {
gids.push(gid);
}
var group = groups[gid];
group.msgs.push(msg);
group.msgs.push(msgInfo);
if ((group.count === undefined) &&
msg.parts.hasOwnProperty('count')) {
group.count = msg.parts.count;
msgInfo.msg.parts.hasOwnProperty('count')) {
group.count = msgInfo.msg.parts.count;
}
}
function concat_msg(node, msg) {
function concat_msg(node, msg, send, done) {
var topic = msg.topic;
if(node.topics.indexOf(topic) >= 0) {
if (!msg.hasOwnProperty("parts") ||
!msg.parts.hasOwnProperty("id") ||
!msg.parts.hasOwnProperty("index") ||
!msg.parts.hasOwnProperty("count")) {
node.error(RED._("batch.no-parts"), msg);
done(RED._("batch.no-parts"));
return;
}
var gid = msg.parts.id;
var pending = node.pending;
add_to_topic_group(pending, topic, gid, msg);
add_to_topic_group(pending, topic, gid, {msg, send, done});
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
Object.values(node.pending).forEach(p_topic => {
Object.values(p_topic.groups).forEach(group => {
group.msgs.forEach(msgInfo => {
if (msgInfo.msg.id === msg.id) {
// the message that caused the overflow
msgInfo.done(RED._("batch.too-many"));
} else {
msgInfo.done();
}
})
})
});
node.pending = {};
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
try_concat(node, pending);
}
@@ -178,29 +192,37 @@ module.exports = function(RED) {
return;
}
node.pending = [];
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
done();
return;
}
var queue = node.pending;
queue.push(msg);
queue.push({msg, send, done});
node.pending_count++;
if (queue.length === count) {
send_msgs(node, queue, is_overlap);
for (let i = 0; i < queue.length-overlap; i++) {
queue[i].done();
}
node.pending =
(overlap === 0) ? [] : queue.slice(-overlap);
node.pending_count = 0;
}
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
let lastMInfo = node.pending.pop();
lastMInfo.done(RED._("batch.too-many"));
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
});
this.on("close", function() {
node.pending.forEach(e=> e.done());
node.pending_count = 0;
node.pending = [];
});
@@ -217,31 +239,36 @@ module.exports = function(RED) {
if (interval > 0) {
timer = setInterval(msgHandler, interval);
}
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
if (timer !== undefined) {
clearInterval(timer);
}
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
done();
if (interval > 0) {
timer = setInterval(msgHandler, interval);
}
return;
}
node.pending.push(msg);
node.pending.push({msg, send, done});
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
let lastMInfo = node.pending.pop();
lastMInfo.done(RED._("batch.too-many"));
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
});
this.on("close", function() {
if (timer !== undefined) {
clearInterval(timer);
}
node.pending.forEach(e => e.done());
node.pending = [];
node.pending_count = 0;
});
@@ -251,15 +278,26 @@ module.exports = function(RED) {
return x.topic;
});
node.pending = {};
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
Object.values(node.pending).forEach(p_topic => {
Object.values(p_topic.groups).forEach(group => {
group.msgs.forEach(e => e.done());
});
});
node.pending = {};
node.pending_count = 0;
done();
return;
}
concat_msg(node, msg);
concat_msg(node, msg, send, done);
});
this.on("close", function() {
Object.values(node.pending).forEach(p_topic => {
Object.values(p_topic.groups).forEach(group => {
group.msgs.forEach(e => e.done());
});
});
node.pending = {};
node.pending_count = 0;
});