mirror of
https://github.com/node-red/node-red.git
synced 2025-03-01 10:36:34 +00:00
Merge remote-tracking branch 'upstream/0.19' into json-schema
This commit is contained in:
@@ -237,10 +237,13 @@ If you want every 20 minutes from now - use the <i>"interval"</i> option.</p>
|
||||
} else {
|
||||
return this._("inject.timestamp")+suffix;
|
||||
}
|
||||
} else if (this.payloadType === 'flow' && this.payload.length < 19) {
|
||||
return 'flow.'+this.payload+suffix;
|
||||
} else if (this.payloadType === 'global' && this.payload.length < 17) {
|
||||
return 'global.'+this.payload+suffix;
|
||||
} else if (this.payloadType === 'flow' || this.payloadType === 'global') {
|
||||
var key = this.payload;
|
||||
var m = /^#:\((\S+?)\)::(.*)$/.exec(key);
|
||||
if (m) {
|
||||
key = m[2];
|
||||
}
|
||||
return 'flow.'+key+suffix;
|
||||
} else {
|
||||
return this._("inject.inject")+suffix;
|
||||
}
|
||||
|
@@ -63,21 +63,33 @@ module.exports = function(RED) {
|
||||
}
|
||||
|
||||
this.on("input",function(msg) {
|
||||
try {
|
||||
msg.topic = this.topic;
|
||||
if ( (this.payloadType == null && this.payload === "") || this.payloadType === "date") {
|
||||
msg.payload = Date.now();
|
||||
} else if (this.payloadType == null) {
|
||||
msg.payload = this.payload;
|
||||
} else if (this.payloadType === 'none') {
|
||||
msg.payload = "";
|
||||
} else {
|
||||
msg.payload = RED.util.evaluateNodeProperty(this.payload,this.payloadType,this,msg);
|
||||
msg.topic = this.topic;
|
||||
if (this.payloadType !== 'flow' && this.payloadType !== 'global') {
|
||||
try {
|
||||
if ( (this.payloadType == null && this.payload === "") || this.payloadType === "date") {
|
||||
msg.payload = Date.now();
|
||||
} else if (this.payloadType == null) {
|
||||
msg.payload = this.payload;
|
||||
} else if (this.payloadType === 'none') {
|
||||
msg.payload = "";
|
||||
} else {
|
||||
msg.payload = RED.util.evaluateNodeProperty(this.payload,this.payloadType,this,msg);
|
||||
}
|
||||
this.send(msg);
|
||||
msg = null;
|
||||
} catch(err) {
|
||||
this.error(err,msg);
|
||||
}
|
||||
this.send(msg);
|
||||
msg = null;
|
||||
} catch(err) {
|
||||
this.error(err,msg);
|
||||
} else {
|
||||
RED.util.evaluateNodeProperty(this.payload,this.payloadType,this,msg, function(err,res) {
|
||||
if (err) {
|
||||
node.error(err,msg);
|
||||
} else {
|
||||
msg.payload = res;
|
||||
node.send(msg);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@@ -76,8 +76,43 @@ module.exports = function(RED) {
|
||||
var node = this;
|
||||
node.topics = {};
|
||||
|
||||
this.on("input", function(msg) {
|
||||
var pendingMessages = [];
|
||||
var activeMessagePromise = null;
|
||||
var processMessageQueue = function(msg) {
|
||||
if (msg) {
|
||||
// A new message has arrived - add it to the message queue
|
||||
pendingMessages.push(msg);
|
||||
if (activeMessagePromise !== null) {
|
||||
// The node is currently processing a message, so do nothing
|
||||
// more with this message
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (pendingMessages.length === 0) {
|
||||
// There are no more messages to process, clear the active flag
|
||||
// and return
|
||||
activeMessagePromise = null;
|
||||
return;
|
||||
}
|
||||
|
||||
// There are more messages to process. Get the next message and
|
||||
// start processing it. Recurse back in to check for any more
|
||||
var nextMsg = pendingMessages.shift();
|
||||
activeMessagePromise = processMessage(nextMsg)
|
||||
.then(processMessageQueue)
|
||||
.catch((err) => {
|
||||
node.error(err,nextMsg);
|
||||
return processMessageQueue();
|
||||
});
|
||||
}
|
||||
|
||||
this.on('input', function(msg) {
|
||||
processMessageQueue(msg);
|
||||
});
|
||||
|
||||
var processMessage = function(msg) {
|
||||
var topic = msg.topic || "_none";
|
||||
var promise;
|
||||
if (node.bytopic === "all") { topic = "_none"; }
|
||||
node.topics[topic] = node.topics[topic] || {};
|
||||
if (msg.hasOwnProperty("reset") || ((node.reset !== '') && msg.hasOwnProperty("payload") && (msg.payload !== null) && msg.payload.toString && (msg.payload.toString() == node.reset)) ) {
|
||||
@@ -88,48 +123,88 @@ module.exports = function(RED) {
|
||||
}
|
||||
else {
|
||||
if (((!node.topics[topic].tout) && (node.topics[topic].tout !== 0)) || (node.loop === true)) {
|
||||
promise = Promise.resolve();
|
||||
if (node.op2type === "pay" || node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
|
||||
else if (node.op2Templated) { node.topics[topic].m2 = mustache.render(node.op2,msg); }
|
||||
else if (node.op2type !== "nul") {
|
||||
node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg);
|
||||
}
|
||||
|
||||
if (node.op1type === "pay") { }
|
||||
else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); }
|
||||
else if (node.op1type !== "nul") {
|
||||
msg.payload = RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg);
|
||||
}
|
||||
|
||||
if (node.duration === 0) { node.topics[topic].tout = 0; }
|
||||
else if (node.loop === true) {
|
||||
/* istanbul ignore else */
|
||||
if (node.topics[topic].tout) { clearInterval(node.topics[topic].tout); }
|
||||
/* istanbul ignore else */
|
||||
if (node.op1type !== "nul") {
|
||||
var msg2 = RED.util.cloneMessage(msg);
|
||||
node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, node.duration);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (!node.topics[topic].tout) {
|
||||
node.topics[topic].tout = setTimeout(function() {
|
||||
var msg2 = null;
|
||||
if (node.op2type !== "nul") {
|
||||
msg2 = RED.util.cloneMessage(msg);
|
||||
if (node.op2type === "flow" || node.op2type === "global") {
|
||||
node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg);
|
||||
}
|
||||
msg2.payload = node.topics[topic].m2;
|
||||
delete node.topics[topic];
|
||||
node.send(msg2);
|
||||
promise = new Promise((resolve,reject) => {
|
||||
RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
node.topics[topic].m2 = value;
|
||||
resolve();
|
||||
}
|
||||
else { delete node.topics[topic]; }
|
||||
node.status({});
|
||||
}, node.duration);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
node.status({fill:"blue",shape:"dot",text:" "});
|
||||
if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); }
|
||||
|
||||
return promise.then(() => {
|
||||
promise = Promise.resolve();
|
||||
if (node.op1type === "pay") { }
|
||||
else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); }
|
||||
else if (node.op1type !== "nul") {
|
||||
promise = new Promise((resolve,reject) => {
|
||||
RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg,(err,value) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
msg.payload = value;
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
return promise.then(() => {
|
||||
if (node.duration === 0) { node.topics[topic].tout = 0; }
|
||||
else if (node.loop === true) {
|
||||
/* istanbul ignore else */
|
||||
if (node.topics[topic].tout) { clearInterval(node.topics[topic].tout); }
|
||||
/* istanbul ignore else */
|
||||
if (node.op1type !== "nul") {
|
||||
var msg2 = RED.util.cloneMessage(msg);
|
||||
node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, node.duration);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (!node.topics[topic].tout) {
|
||||
node.topics[topic].tout = setTimeout(function() {
|
||||
var msg2 = null;
|
||||
if (node.op2type !== "nul") {
|
||||
var promise = Promise.resolve();
|
||||
msg2 = RED.util.cloneMessage(msg);
|
||||
if (node.op2type === "flow" || node.op2type === "global") {
|
||||
promise = new Promise((resolve,reject) => {
|
||||
RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
node.topics[topic].m2 = value;
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
promise.then(() => {
|
||||
msg2.payload = node.topics[topic].m2;
|
||||
delete node.topics[topic];
|
||||
node.send(msg2);
|
||||
node.status({});
|
||||
}).catch(err => {
|
||||
node.error(err);
|
||||
});
|
||||
} else {
|
||||
delete node.topics[topic];
|
||||
node.status({});
|
||||
}
|
||||
|
||||
}, node.duration);
|
||||
}
|
||||
}
|
||||
node.status({fill:"blue",shape:"dot",text:" "});
|
||||
if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); }
|
||||
});
|
||||
});
|
||||
}
|
||||
else if ((node.extend === "true" || node.extend === true) && (node.duration > 0)) {
|
||||
/* istanbul ignore else */
|
||||
@@ -138,25 +213,43 @@ module.exports = function(RED) {
|
||||
if (node.topics[topic].tout) { clearTimeout(node.topics[topic].tout); }
|
||||
node.topics[topic].tout = setTimeout(function() {
|
||||
var msg2 = null;
|
||||
var promise = Promise.resolve();
|
||||
|
||||
if (node.op2type !== "nul") {
|
||||
if (node.op2type === "flow" || node.op2type === "global") {
|
||||
node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg);
|
||||
}
|
||||
if (node.topics[topic] !== undefined) {
|
||||
msg2 = RED.util.cloneMessage(msg);
|
||||
msg2.payload = node.topics[topic].m2;
|
||||
promise = new Promise((resolve,reject) => {
|
||||
RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
node.topics[topic].m2 = value;
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
delete node.topics[topic];
|
||||
node.status({});
|
||||
node.send(msg2);
|
||||
promise.then(() => {
|
||||
if (node.op2type !== "nul") {
|
||||
if (node.topics[topic] !== undefined) {
|
||||
msg2 = RED.util.cloneMessage(msg);
|
||||
msg2.payload = node.topics[topic].m2;
|
||||
}
|
||||
}
|
||||
delete node.topics[topic];
|
||||
node.status({});
|
||||
node.send(msg2);
|
||||
}).catch(err => {
|
||||
node.error(err);
|
||||
});
|
||||
}, node.duration);
|
||||
}
|
||||
else {
|
||||
if (node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
|
||||
}
|
||||
}
|
||||
});
|
||||
return Promise.resolve();
|
||||
}
|
||||
this.on("close", function() {
|
||||
for (var t in node.topics) {
|
||||
/* istanbul ignore else */
|
||||
|
@@ -23,10 +23,6 @@ from time import sleep
|
||||
|
||||
bounce = 25;
|
||||
|
||||
if sys.version_info >= (3,0):
|
||||
print("Sorry - currently only configured to work with python 2.x")
|
||||
sys.exit(1)
|
||||
|
||||
if len(sys.argv) > 2:
|
||||
cmd = sys.argv[1].lower()
|
||||
pin = int(sys.argv[2])
|
||||
@@ -34,7 +30,7 @@ if len(sys.argv) > 2:
|
||||
GPIO.setwarnings(False)
|
||||
|
||||
if cmd == "pwm":
|
||||
#print "Initialised pin "+str(pin)+" to PWM"
|
||||
#print("Initialised pin "+str(pin)+" to PWM")
|
||||
try:
|
||||
freq = int(sys.argv[3])
|
||||
except:
|
||||
@@ -54,10 +50,10 @@ if len(sys.argv) > 2:
|
||||
GPIO.cleanup(pin)
|
||||
sys.exit(0)
|
||||
except Exception as ex:
|
||||
print "bad data: "+data
|
||||
print("bad data: "+data)
|
||||
|
||||
elif cmd == "buzz":
|
||||
#print "Initialised pin "+str(pin)+" to Buzz"
|
||||
#print("Initialised pin "+str(pin)+" to Buzz")
|
||||
GPIO.setup(pin,GPIO.OUT)
|
||||
p = GPIO.PWM(pin, 100)
|
||||
p.stop()
|
||||
@@ -76,10 +72,10 @@ if len(sys.argv) > 2:
|
||||
GPIO.cleanup(pin)
|
||||
sys.exit(0)
|
||||
except Exception as ex:
|
||||
print "bad data: "+data
|
||||
print("bad data: "+data)
|
||||
|
||||
elif cmd == "out":
|
||||
#print "Initialised pin "+str(pin)+" to OUT"
|
||||
#print("Initialised pin "+str(pin)+" to OUT")
|
||||
GPIO.setup(pin,GPIO.OUT)
|
||||
if len(sys.argv) == 4:
|
||||
GPIO.output(pin,int(sys.argv[3]))
|
||||
@@ -103,11 +99,11 @@ if len(sys.argv) > 2:
|
||||
GPIO.output(pin,data)
|
||||
|
||||
elif cmd == "in":
|
||||
#print "Initialised pin "+str(pin)+" to IN"
|
||||
#print("Initialised pin "+str(pin)+" to IN")
|
||||
bounce = float(sys.argv[4])
|
||||
def handle_callback(chan):
|
||||
sleep(bounce/1000.0)
|
||||
print GPIO.input(chan)
|
||||
print(GPIO.input(chan))
|
||||
|
||||
if sys.argv[3].lower() == "up":
|
||||
GPIO.setup(pin,GPIO.IN,GPIO.PUD_UP)
|
||||
@@ -116,7 +112,7 @@ if len(sys.argv) > 2:
|
||||
else:
|
||||
GPIO.setup(pin,GPIO.IN)
|
||||
|
||||
print GPIO.input(pin)
|
||||
print(GPIO.input(pin))
|
||||
GPIO.add_event_detect(pin, GPIO.BOTH, callback=handle_callback, bouncetime=int(bounce))
|
||||
|
||||
while True:
|
||||
@@ -129,7 +125,7 @@ if len(sys.argv) > 2:
|
||||
sys.exit(0)
|
||||
|
||||
elif cmd == "byte":
|
||||
#print "Initialised BYTE mode - "+str(pin)+
|
||||
#print("Initialised BYTE mode - "+str(pin)+)
|
||||
list = [7,11,13,12,15,16,18,22]
|
||||
GPIO.setup(list,GPIO.OUT)
|
||||
|
||||
@@ -152,7 +148,7 @@ if len(sys.argv) > 2:
|
||||
GPIO.output(list[bit], data & mask)
|
||||
|
||||
elif cmd == "borg":
|
||||
#print "Initialised BORG mode - "+str(pin)+
|
||||
#print("Initialised BORG mode - "+str(pin)+)
|
||||
GPIO.setup(11,GPIO.OUT)
|
||||
GPIO.setup(13,GPIO.OUT)
|
||||
GPIO.setup(15,GPIO.OUT)
|
||||
@@ -190,7 +186,7 @@ if len(sys.argv) > 2:
|
||||
button = ord( buf[0] ) & pin # mask out just the required button(s)
|
||||
if button != oldbutt: # only send if changed
|
||||
oldbutt = button
|
||||
print button
|
||||
print(button)
|
||||
|
||||
while True:
|
||||
try:
|
||||
@@ -215,7 +211,7 @@ if len(sys.argv) > 2:
|
||||
# type,code,value
|
||||
print("%u,%u" % (code, value))
|
||||
event = file.read(EVENT_SIZE)
|
||||
print "0,0"
|
||||
print("0,0")
|
||||
file.close()
|
||||
sys.exit(0)
|
||||
except:
|
||||
@@ -225,14 +221,14 @@ if len(sys.argv) > 2:
|
||||
elif len(sys.argv) > 1:
|
||||
cmd = sys.argv[1].lower()
|
||||
if cmd == "rev":
|
||||
print GPIO.RPI_REVISION
|
||||
print(GPIO.RPI_REVISION)
|
||||
elif cmd == "ver":
|
||||
print GPIO.VERSION
|
||||
print(GPIO.VERSION)
|
||||
elif cmd == "info":
|
||||
print GPIO.RPI_INFO
|
||||
print(GPIO.RPI_INFO)
|
||||
else:
|
||||
print "Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}"
|
||||
print " only ver (gpio version) and info (board information) accept no pin parameter."
|
||||
print("Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}")
|
||||
print(" only ver (gpio version) and info (board information) accept no pin parameter.")
|
||||
|
||||
else:
|
||||
print "Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}"
|
||||
print("Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}")
|
||||
|
@@ -99,11 +99,17 @@
|
||||
}
|
||||
return v;
|
||||
}
|
||||
function prop2name(key) {
|
||||
var result = RED.utils.parseContextKey(key);
|
||||
return result.key;
|
||||
}
|
||||
function getValueLabel(t,v) {
|
||||
if (t === 'str') {
|
||||
return '"'+clipValueLength(v)+'"';
|
||||
} else if (t === 'msg' || t==='flow' || t==='global') {
|
||||
} else if (t === 'msg') {
|
||||
return t+"."+clipValueLength(v);
|
||||
} else if (t === 'flow' || t === 'global') {
|
||||
return t+"."+clipValueLength(prop2name(v));
|
||||
}
|
||||
return clipValueLength(v);
|
||||
}
|
||||
|
@@ -59,21 +59,157 @@ module.exports = function(RED) {
|
||||
'else': function(a) { return a === true; }
|
||||
};
|
||||
|
||||
var _max_kept_msgs_count = undefined;
|
||||
var _maxKeptCount;
|
||||
|
||||
function max_kept_msgs_count(node) {
|
||||
if (_max_kept_msgs_count === undefined) {
|
||||
function getMaxKeptCount() {
|
||||
if (_maxKeptCount === undefined) {
|
||||
var name = "nodeMessageBufferMaxLength";
|
||||
if (RED.settings.hasOwnProperty(name)) {
|
||||
_max_kept_msgs_count = RED.settings[name];
|
||||
_maxKeptCount = RED.settings[name];
|
||||
}
|
||||
else {
|
||||
_max_kept_msgs_count = 0;
|
||||
_maxKeptCount = 0;
|
||||
}
|
||||
}
|
||||
return _max_kept_msgs_count;
|
||||
return _maxKeptCount;
|
||||
}
|
||||
|
||||
function getProperty(node,msg) {
|
||||
return new Promise((resolve,reject) => {
|
||||
if (node.propertyType === 'jsonata') {
|
||||
RED.util.evaluateJSONataExpression(node.property,msg,(err,value) => {
|
||||
if (err) {
|
||||
reject(RED._("switch.errors.invalid-expr",{error:err.message}));
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg,(err,value) => {
|
||||
if (err) {
|
||||
resolve(undefined);
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function getV1(node,msg,rule,hasParts) {
|
||||
return new Promise( (resolve,reject) => {
|
||||
if (rule.vt === 'prev') {
|
||||
resolve(node.previousValue);
|
||||
} else if (rule.vt === 'jsonata') {
|
||||
var exp = rule.v;
|
||||
if (rule.t === 'jsonata_exp') {
|
||||
if (hasParts) {
|
||||
exp.assign("I", msg.parts.index);
|
||||
exp.assign("N", msg.parts.count);
|
||||
}
|
||||
}
|
||||
RED.util.evaluateJSONataExpression(exp,msg,(err,value) => {
|
||||
if (err) {
|
||||
reject(RED._("switch.errors.invalid-expr",{error:err.message}));
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
} else if (rule.vt === 'json') {
|
||||
resolve("json");
|
||||
} else if (rule.vt === 'null') {
|
||||
resolve("null");
|
||||
} else {
|
||||
RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg, function(err,value) {
|
||||
if (err) {
|
||||
resolve(undefined);
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function getV2(node,msg,rule) {
|
||||
return new Promise((resolve,reject) => {
|
||||
var v2 = rule.v2;
|
||||
if (rule.v2t === 'prev') {
|
||||
resolve(node.previousValue);
|
||||
} else if (rule.v2t === 'jsonata') {
|
||||
RED.util.evaluateJSONataExpression(rule.v2,msg,(err,value) => {
|
||||
if (err) {
|
||||
reject(RED._("switch.errors.invalid-expr",{error:err.message}));
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
} else if (typeof v2 !== 'undefined') {
|
||||
RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg, function(err,value) {
|
||||
if (err) {
|
||||
resolve(undefined);
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
resolve(v2);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
function applyRule(node, msg, property, state) {
|
||||
return new Promise((resolve,reject) => {
|
||||
|
||||
var rule = node.rules[state.currentRule];
|
||||
var v1,v2;
|
||||
|
||||
getV1(node,msg,rule,state.hasParts).then(value => {
|
||||
v1 = value;
|
||||
}).then(()=>getV2(node,msg,rule)).then(value => {
|
||||
v2 = value;
|
||||
}).then(() => {
|
||||
if (rule.t == "else") {
|
||||
property = state.elseflag;
|
||||
state.elseflag = true;
|
||||
}
|
||||
if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) {
|
||||
state.onward.push(msg);
|
||||
state.elseflag = false;
|
||||
if (node.checkall == "false") {
|
||||
return resolve(false);
|
||||
}
|
||||
} else {
|
||||
state.onward.push(null);
|
||||
}
|
||||
resolve(state.currentRule < node.rules.length - 1);
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
function applyRules(node, msg, property,state) {
|
||||
if (!state) {
|
||||
state = {
|
||||
currentRule: 0,
|
||||
elseflag: true,
|
||||
onward: [],
|
||||
hasParts: msg.hasOwnProperty("parts") &&
|
||||
msg.parts.hasOwnProperty("id") &&
|
||||
msg.parts.hasOwnProperty("index")
|
||||
}
|
||||
}
|
||||
return applyRule(node,msg,property,state).then(hasMore => {
|
||||
if (hasMore) {
|
||||
state.currentRule++;
|
||||
return applyRules(node,msg,property,state);
|
||||
} else {
|
||||
node.previousValue = property;
|
||||
return state.onward;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
function SwitchNode(n) {
|
||||
RED.nodes.createNode(this, n);
|
||||
this.rules = n.rules || [];
|
||||
@@ -94,10 +230,10 @@ module.exports = function(RED) {
|
||||
var node = this;
|
||||
var valid = true;
|
||||
var repair = n.repair;
|
||||
var needs_count = repair;
|
||||
var needsCount = repair;
|
||||
for (var i=0; i<this.rules.length; i+=1) {
|
||||
var rule = this.rules[i];
|
||||
needs_count = needs_count || ((rule.t === "tail") || (rule.t === "jsonata_exp"));
|
||||
needsCount = needsCount || ((rule.t === "tail") || (rule.t === "jsonata_exp"));
|
||||
if (!rule.vt) {
|
||||
if (!isNaN(Number(rule.v))) {
|
||||
rule.vt = 'num';
|
||||
@@ -142,26 +278,26 @@ module.exports = function(RED) {
|
||||
return;
|
||||
}
|
||||
|
||||
var pending_count = 0;
|
||||
var pending_id = 0;
|
||||
var pending_in = {};
|
||||
var pending_out = {};
|
||||
var pendingCount = 0;
|
||||
var pendingId = 0;
|
||||
var pendingIn = {};
|
||||
var pendingOut = {};
|
||||
var received = {};
|
||||
|
||||
function add2group_in(id, msg, parts) {
|
||||
if (!(id in pending_in)) {
|
||||
pending_in[id] = {
|
||||
function addMessageToGroup(id, msg, parts) {
|
||||
if (!(id in pendingIn)) {
|
||||
pendingIn[id] = {
|
||||
count: undefined,
|
||||
msgs: [],
|
||||
seq_no: pending_id++
|
||||
seq_no: pendingId++
|
||||
};
|
||||
}
|
||||
var group = pending_in[id];
|
||||
var group = pendingIn[id];
|
||||
group.msgs.push(msg);
|
||||
pending_count++;
|
||||
var max_msgs = max_kept_msgs_count(node);
|
||||
if ((max_msgs > 0) && (pending_count > max_msgs)) {
|
||||
clear_pending();
|
||||
pendingCount++;
|
||||
var max_msgs = getMaxKeptCount();
|
||||
if ((max_msgs > 0) && (pendingCount > max_msgs)) {
|
||||
clearPending();
|
||||
node.error(RED._("switch.errors.too-many"), msg);
|
||||
}
|
||||
if (parts.hasOwnProperty("count")) {
|
||||
@@ -170,32 +306,29 @@ module.exports = function(RED) {
|
||||
return group;
|
||||
}
|
||||
|
||||
function del_group_in(id, group) {
|
||||
pending_count -= group.msgs.length;
|
||||
delete pending_in[id];
|
||||
}
|
||||
|
||||
function add2pending_in(msg) {
|
||||
function addMessageToPending(msg) {
|
||||
var parts = msg.parts;
|
||||
if (parts.hasOwnProperty("id") &&
|
||||
parts.hasOwnProperty("index")) {
|
||||
var group = add2group_in(parts.id, msg, parts);
|
||||
var msgs = group.msgs;
|
||||
var count = group.count;
|
||||
if (count === msgs.length) {
|
||||
for (var i = 0; i < msgs.length; i++) {
|
||||
var msg = msgs[i];
|
||||
// We've already checked the msg.parts has the require bits
|
||||
var group = addMessageToGroup(parts.id, msg, parts);
|
||||
var msgs = group.msgs;
|
||||
var count = group.count;
|
||||
if (count === msgs.length) {
|
||||
// We have a complete group - send the individual parts
|
||||
return msgs.reduce((promise, msg) => {
|
||||
return promise.then((result) => {
|
||||
msg.parts.count = count;
|
||||
process_msg(msg, false);
|
||||
}
|
||||
del_group_in(parts.id, group);
|
||||
}
|
||||
return true;
|
||||
return processMessage(msg, false);
|
||||
})
|
||||
}, Promise.resolve()).then( () => {
|
||||
pendingCount -= group.msgs.length;
|
||||
delete pendingIn[parts.id];
|
||||
});
|
||||
}
|
||||
return false;
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
function send_group(onwards, port_count) {
|
||||
function sendGroup(onwards, port_count) {
|
||||
var counts = new Array(port_count).fill(0);
|
||||
for (var i = 0; i < onwards.length; i++) {
|
||||
var onward = onwards[i];
|
||||
@@ -230,141 +363,104 @@ module.exports = function(RED) {
|
||||
}
|
||||
}
|
||||
|
||||
function send2ports(onward, msg) {
|
||||
function sendGroupMessages(onward, msg) {
|
||||
var parts = msg.parts;
|
||||
var gid = parts.id;
|
||||
received[gid] = ((gid in received) ? received[gid] : 0) +1;
|
||||
var send_ok = (received[gid] === parts.count);
|
||||
|
||||
if (!(gid in pending_out)) {
|
||||
pending_out[gid] = {
|
||||
if (!(gid in pendingOut)) {
|
||||
pendingOut[gid] = {
|
||||
onwards: []
|
||||
};
|
||||
}
|
||||
var group = pending_out[gid];
|
||||
var group = pendingOut[gid];
|
||||
var onwards = group.onwards;
|
||||
onwards.push(onward);
|
||||
pending_count++;
|
||||
pendingCount++;
|
||||
if (send_ok) {
|
||||
send_group(onwards, onward.length, msg);
|
||||
pending_count -= onward.length;
|
||||
delete pending_out[gid];
|
||||
sendGroup(onwards, onward.length, msg);
|
||||
pendingCount -= onward.length;
|
||||
delete pendingOut[gid];
|
||||
delete received[gid];
|
||||
}
|
||||
var max_msgs = max_kept_msgs_count(node);
|
||||
if ((max_msgs > 0) && (pending_count > max_msgs)) {
|
||||
clear_pending();
|
||||
var max_msgs = getMaxKeptCount();
|
||||
if ((max_msgs > 0) && (pendingCount > max_msgs)) {
|
||||
clearPending();
|
||||
node.error(RED._("switch.errors.too-many"), msg);
|
||||
}
|
||||
}
|
||||
|
||||
function msg_has_parts(msg) {
|
||||
if (msg.hasOwnProperty("parts")) {
|
||||
var parts = msg.parts;
|
||||
return (parts.hasOwnProperty("id") &&
|
||||
parts.hasOwnProperty("index"));
|
||||
|
||||
|
||||
|
||||
|
||||
function processMessage(msg, checkParts) {
|
||||
var hasParts = msg.hasOwnProperty("parts") &&
|
||||
msg.parts.hasOwnProperty("id") &&
|
||||
msg.parts.hasOwnProperty("index");
|
||||
|
||||
if (needsCount && checkParts && hasParts) {
|
||||
return addMessageToPending(msg);
|
||||
}
|
||||
return false;
|
||||
return getProperty(node,msg)
|
||||
.then(property => applyRules(node,msg,property))
|
||||
.then(onward => {
|
||||
if (!repair || !hasParts) {
|
||||
node.send(onward);
|
||||
}
|
||||
else {
|
||||
sendGroupMessages(onward, msg);
|
||||
}
|
||||
}).catch(err => {
|
||||
node.warn(err);
|
||||
});
|
||||
}
|
||||
|
||||
function process_msg(msg, check_parts) {
|
||||
var has_parts = msg_has_parts(msg);
|
||||
if (needs_count && check_parts && has_parts &&
|
||||
add2pending_in(msg)) {
|
||||
return;
|
||||
}
|
||||
var onward = [];
|
||||
try {
|
||||
var prop;
|
||||
if (node.propertyType === 'jsonata') {
|
||||
prop = RED.util.evaluateJSONataExpression(node.property,msg);
|
||||
} else {
|
||||
prop = RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg);
|
||||
}
|
||||
var elseflag = true;
|
||||
for (var i=0; i<node.rules.length; i+=1) {
|
||||
var rule = node.rules[i];
|
||||
var test = prop;
|
||||
var v1,v2;
|
||||
if (rule.vt === 'prev') {
|
||||
v1 = node.previousValue;
|
||||
} else if (rule.vt === 'jsonata') {
|
||||
try {
|
||||
var exp = rule.v;
|
||||
if (rule.t === 'jsonata_exp') {
|
||||
if (has_parts) {
|
||||
exp.assign("I", msg.parts.index);
|
||||
exp.assign("N", msg.parts.count);
|
||||
}
|
||||
}
|
||||
v1 = RED.util.evaluateJSONataExpression(exp,msg);
|
||||
} catch(err) {
|
||||
node.error(RED._("switch.errors.invalid-expr",{error:err.message}));
|
||||
return;
|
||||
}
|
||||
} else if (rule.vt === 'json') {
|
||||
v1 = "json";
|
||||
} else if (rule.vt === 'null') {
|
||||
v1 = "null";
|
||||
} else {
|
||||
try {
|
||||
v1 = RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg);
|
||||
} catch(err) {
|
||||
v1 = undefined;
|
||||
}
|
||||
}
|
||||
v2 = rule.v2;
|
||||
if (rule.v2t === 'prev') {
|
||||
v2 = node.previousValue;
|
||||
} else if (rule.v2t === 'jsonata') {
|
||||
try {
|
||||
v2 = RED.util.evaluateJSONataExpression(rule.v2,msg);
|
||||
} catch(err) {
|
||||
node.error(RED._("switch.errors.invalid-expr",{error:err.message}));
|
||||
return;
|
||||
}
|
||||
} else if (typeof v2 !== 'undefined') {
|
||||
try {
|
||||
v2 = RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg);
|
||||
} catch(err) {
|
||||
v2 = undefined;
|
||||
}
|
||||
}
|
||||
if (rule.t == "else") { test = elseflag; elseflag = true; }
|
||||
if (operators[rule.t](test,v1,v2,rule.case,msg.parts)) {
|
||||
onward.push(msg);
|
||||
elseflag = false;
|
||||
if (node.checkall == "false") { break; }
|
||||
} else {
|
||||
onward.push(null);
|
||||
}
|
||||
}
|
||||
node.previousValue = prop;
|
||||
if (!repair || !has_parts) {
|
||||
node.send(onward);
|
||||
}
|
||||
else {
|
||||
send2ports(onward, msg);
|
||||
}
|
||||
} catch(err) {
|
||||
node.warn(err);
|
||||
}
|
||||
}
|
||||
|
||||
function clear_pending() {
|
||||
pending_count = 0;
|
||||
pending_id = 0;
|
||||
pending_in = {};
|
||||
pending_out = {};
|
||||
function clearPending() {
|
||||
pendingCount = 0;
|
||||
pendingId = 0;
|
||||
pendingIn = {};
|
||||
pendingOut = {};
|
||||
received = {};
|
||||
}
|
||||
|
||||
var pendingMessages = [];
|
||||
var activeMessagePromise = null;
|
||||
var processMessageQueue = function(msg) {
|
||||
if (msg) {
|
||||
// A new message has arrived - add it to the message queue
|
||||
pendingMessages.push(msg);
|
||||
if (activeMessagePromise !== null) {
|
||||
// The node is currently processing a message, so do nothing
|
||||
// more with this message
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (pendingMessages.length === 0) {
|
||||
// There are no more messages to process, clear the active flag
|
||||
// and return
|
||||
activeMessagePromise = null;
|
||||
return;
|
||||
}
|
||||
|
||||
// There are more messages to process. Get the next message and
|
||||
// start processing it. Recurse back in to check for any more
|
||||
var nextMsg = pendingMessages.shift();
|
||||
activeMessagePromise = processMessage(nextMsg,true)
|
||||
.then(processMessageQueue)
|
||||
.catch((err) => {
|
||||
node.error(err,nextMsg);
|
||||
return processMessageQueue();
|
||||
});
|
||||
}
|
||||
|
||||
this.on('input', function(msg) {
|
||||
process_msg(msg, true);
|
||||
processMessageQueue(msg);
|
||||
});
|
||||
|
||||
this.on('close', function() {
|
||||
clear_pending();
|
||||
clearPending();
|
||||
});
|
||||
}
|
||||
|
||||
|
@@ -54,6 +54,10 @@
|
||||
outputs: 1,
|
||||
icon: "swap.png",
|
||||
label: function() {
|
||||
function prop2name(type, key) {
|
||||
var result = RED.utils.parseContextKey(key);
|
||||
return type +"." +result.key;
|
||||
}
|
||||
if (this.name) {
|
||||
return this.name;
|
||||
}
|
||||
@@ -70,13 +74,13 @@
|
||||
} else {
|
||||
if (this.rules.length == 1) {
|
||||
if (this.rules[0].t === "set") {
|
||||
return this._("change.label.set",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p});
|
||||
return this._("change.label.set",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)});
|
||||
} else if (this.rules[0].t === "change") {
|
||||
return this._("change.label.change",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p});
|
||||
return this._("change.label.change",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)});
|
||||
} else if (this.rules[0].t === "move") {
|
||||
return this._("change.label.move",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p});
|
||||
return this._("change.label.move",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)});
|
||||
} else {
|
||||
return this._("change.label.delete",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p});
|
||||
return this._("change.label.delete",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)});
|
||||
}
|
||||
} else {
|
||||
return this._("change.label.changeCount",{count:this.rules.length});
|
||||
|
@@ -98,44 +98,53 @@ module.exports = function(RED) {
|
||||
}
|
||||
}
|
||||
|
||||
function applyRule(msg,rule) {
|
||||
try {
|
||||
var property = rule.p;
|
||||
var value = rule.to;
|
||||
if (rule.tot === 'json') {
|
||||
value = JSON.parse(rule.to);
|
||||
} else if (rule.tot === 'bin') {
|
||||
value = Buffer.from(JSON.parse(rule.to))
|
||||
}
|
||||
var current;
|
||||
var fromValue;
|
||||
var fromType;
|
||||
var fromRE;
|
||||
if (rule.tot === "msg") {
|
||||
value = RED.util.getMessageProperty(msg,rule.to);
|
||||
} else if (rule.tot === 'flow') {
|
||||
value = node.context().flow.get(rule.to);
|
||||
} else if (rule.tot === 'global') {
|
||||
value = node.context().global.get(rule.to);
|
||||
} else if (rule.tot === 'date') {
|
||||
value = Date.now();
|
||||
} else if (rule.tot === 'jsonata') {
|
||||
try{
|
||||
value = RED.util.evaluateJSONataExpression(rule.to,msg);
|
||||
} catch(err) {
|
||||
node.error(RED._("change.errors.invalid-expr",{error:err.message}),msg);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (rule.t === 'change') {
|
||||
if (rule.fromt === 'msg' || rule.fromt === 'flow' || rule.fromt === 'global') {
|
||||
if (rule.fromt === "msg") {
|
||||
fromValue = RED.util.getMessageProperty(msg,rule.from);
|
||||
} else if (rule.fromt === 'flow') {
|
||||
fromValue = node.context().flow.get(rule.from);
|
||||
} else if (rule.fromt === 'global') {
|
||||
fromValue = node.context().global.get(rule.from);
|
||||
function getToValue(msg,rule) {
|
||||
var value = rule.to;
|
||||
if (rule.tot === 'json') {
|
||||
value = JSON.parse(rule.to);
|
||||
} else if (rule.tot === 'bin') {
|
||||
value = Buffer.from(JSON.parse(rule.to))
|
||||
}
|
||||
if (rule.tot === "msg") {
|
||||
value = RED.util.getMessageProperty(msg,rule.to);
|
||||
} else if (rule.tot === 'flow') {
|
||||
value = node.context().flow.get(rule.to);
|
||||
} else if (rule.tot === 'global') {
|
||||
value = node.context().global.get(rule.to);
|
||||
} else if (rule.tot === 'date') {
|
||||
value = Date.now();
|
||||
} else if (rule.tot === 'jsonata') {
|
||||
return new Promise((resolve,reject) => {
|
||||
RED.util.evaluateJSONataExpression(rule.to,msg, (err, value) => {
|
||||
if (err) {
|
||||
reject(RED._("change.errors.invalid-expr",{error:err.message}))
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
return Promise.resolve(value);
|
||||
}
|
||||
function getFromValue(msg,rule) {
|
||||
var fromValue;
|
||||
var fromType;
|
||||
var fromRE;
|
||||
if (rule.t === 'change') {
|
||||
if (rule.fromt === 'msg' || rule.fromt === 'flow' || rule.fromt === 'global') {
|
||||
return new Promise((resolve,reject) => {
|
||||
if (rule.fromt === "msg") {
|
||||
resolve(RED.util.getMessageProperty(msg,rule.from));
|
||||
} else if (rule.fromt === 'flow' || rule.fromt === 'global') {
|
||||
node.context()[rule.fromt].get(rule.from,(err,fromValue) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(fromValue);
|
||||
}
|
||||
});
|
||||
}
|
||||
}).then(fromValue => {
|
||||
if (typeof fromValue === 'number' || fromValue instanceof Number) {
|
||||
fromType = 'num';
|
||||
} else if (typeof fromValue === 'boolean') {
|
||||
@@ -149,108 +158,161 @@ module.exports = function(RED) {
|
||||
try {
|
||||
fromRE = new RegExp(fromRE, "g");
|
||||
} catch (e) {
|
||||
valid = false;
|
||||
node.error(RED._("change.errors.invalid-from",{error:e.message}),msg);
|
||||
reject(new Error(RED._("change.errors.invalid-from",{error:e.message})));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
node.error(RED._("change.errors.invalid-from",{error:"unsupported type: "+(typeof fromValue)}),msg);
|
||||
return
|
||||
reject(new Error(RED._("change.errors.invalid-from",{error:"unsupported type: "+(typeof fromValue)})));
|
||||
return;
|
||||
}
|
||||
return {
|
||||
fromType,
|
||||
fromValue,
|
||||
fromRE
|
||||
}
|
||||
});
|
||||
} else {
|
||||
fromType = rule.fromt;
|
||||
fromValue = rule.from;
|
||||
fromRE = rule.fromRE;
|
||||
}
|
||||
}
|
||||
return Promise.resolve({
|
||||
fromType,
|
||||
fromValue,
|
||||
fromRE
|
||||
});
|
||||
}
|
||||
function applyRule(msg,rule) {
|
||||
var property = rule.p;
|
||||
var current;
|
||||
var fromValue;
|
||||
var fromType;
|
||||
var fromRE;
|
||||
try {
|
||||
return getToValue(msg,rule).then(value => {
|
||||
return getFromValue(msg,rule).then(fromParts => {
|
||||
fromValue = fromParts.fromValue;
|
||||
fromType = fromParts.fromType;
|
||||
fromRE = fromParts.fromRE;
|
||||
if (rule.pt === 'msg') {
|
||||
try {
|
||||
if (rule.t === 'delete') {
|
||||
RED.util.setMessageProperty(msg,property,undefined);
|
||||
} else if (rule.t === 'set') {
|
||||
RED.util.setMessageProperty(msg,property,value);
|
||||
} else if (rule.t === 'change') {
|
||||
current = RED.util.getMessageProperty(msg,property);
|
||||
if (typeof current === 'string') {
|
||||
if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) {
|
||||
// str representation of exact from number/boolean
|
||||
// only replace if they match exactly
|
||||
RED.util.setMessageProperty(msg,property,value);
|
||||
} else {
|
||||
current = current.replace(fromRE,value);
|
||||
RED.util.setMessageProperty(msg,property,current);
|
||||
}
|
||||
} else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') {
|
||||
if (current == Number(fromValue)) {
|
||||
RED.util.setMessageProperty(msg,property,value);
|
||||
}
|
||||
} else if (typeof current === 'boolean' && fromType === 'bool') {
|
||||
if (current.toString() === fromValue) {
|
||||
RED.util.setMessageProperty(msg,property,value);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch(err) {}
|
||||
return msg;
|
||||
} else if (rule.pt === 'flow' || rule.pt === 'global') {
|
||||
return new Promise((resolve,reject) => {
|
||||
var target = node.context()[rule.pt];
|
||||
var callback = err => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(msg);
|
||||
}
|
||||
}
|
||||
if (rule.t === 'delete') {
|
||||
target.set(property,undefined,callback);
|
||||
} else if (rule.t === 'set') {
|
||||
target.set(property,value,callback);
|
||||
} else if (rule.t === 'change') {
|
||||
target.get(property,(err,current) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
if (typeof current === 'string') {
|
||||
if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) {
|
||||
// str representation of exact from number/boolean
|
||||
// only replace if they match exactly
|
||||
target.set(property,value,callback);
|
||||
} else {
|
||||
current = current.replace(fromRE,value);
|
||||
target.set(property,current,callback);
|
||||
}
|
||||
} else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') {
|
||||
if (current == Number(fromValue)) {
|
||||
target.set(property,value,callback);
|
||||
}
|
||||
} else if (typeof current === 'boolean' && fromType === 'bool') {
|
||||
if (current.toString() === fromValue) {
|
||||
target.set(property,value,callback);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}).catch(err => {
|
||||
node.error(err, msg);
|
||||
return null;
|
||||
});
|
||||
} catch(err) {
|
||||
return Promise.resolve(msg);
|
||||
}
|
||||
}
|
||||
function applyRules(msg, currentRule) {
|
||||
var r = node.rules[currentRule];
|
||||
var rulePromise;
|
||||
if (r.t === "move") {
|
||||
if ((r.tot !== r.pt) || (r.p.indexOf(r.to) !== -1)) {
|
||||
rulePromise = applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:r.p, tot:r.pt}).then(
|
||||
msg => applyRule(msg,{t:"delete", p:r.p, pt:r.pt})
|
||||
);
|
||||
}
|
||||
else { // 2 step move if we are moving from a child
|
||||
rulePromise = applyRule(msg,{t:"set", p:"_temp_move", pt:r.tot, to:r.p, tot:r.pt}).then(
|
||||
msg => applyRule(msg,{t:"delete", p:r.p, pt:r.pt})
|
||||
).then(
|
||||
msg => applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:"_temp_move", tot:r.pt})
|
||||
).then(
|
||||
msg => applyRule(msg,{t:"delete", p:"_temp_move", pt:r.pt})
|
||||
)
|
||||
}
|
||||
} else {
|
||||
rulePromise = applyRule(msg,r);
|
||||
}
|
||||
return rulePromise.then(
|
||||
msg => {
|
||||
if (!msg) {
|
||||
return
|
||||
} else if (currentRule === node.rules.length - 1) {
|
||||
return msg;
|
||||
} else {
|
||||
fromType = rule.fromt;
|
||||
fromValue = rule.from;
|
||||
fromRE = rule.fromRE;
|
||||
return applyRules(msg, currentRule+1);
|
||||
}
|
||||
}
|
||||
if (rule.pt === 'msg') {
|
||||
if (rule.t === 'delete') {
|
||||
RED.util.setMessageProperty(msg,property,undefined);
|
||||
} else if (rule.t === 'set') {
|
||||
RED.util.setMessageProperty(msg,property,value);
|
||||
} else if (rule.t === 'change') {
|
||||
current = RED.util.getMessageProperty(msg,property);
|
||||
if (typeof current === 'string') {
|
||||
if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) {
|
||||
// str representation of exact from number/boolean
|
||||
// only replace if they match exactly
|
||||
RED.util.setMessageProperty(msg,property,value);
|
||||
} else {
|
||||
current = current.replace(fromRE,value);
|
||||
RED.util.setMessageProperty(msg,property,current);
|
||||
}
|
||||
} else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') {
|
||||
if (current == Number(fromValue)) {
|
||||
RED.util.setMessageProperty(msg,property,value);
|
||||
}
|
||||
} else if (typeof current === 'boolean' && fromType === 'bool') {
|
||||
if (current.toString() === fromValue) {
|
||||
RED.util.setMessageProperty(msg,property,value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
var target;
|
||||
if (rule.pt === 'flow') {
|
||||
target = node.context().flow;
|
||||
} else if (rule.pt === 'global') {
|
||||
target = node.context().global;
|
||||
}
|
||||
if (target) {
|
||||
if (rule.t === 'delete') {
|
||||
target.set(property,undefined);
|
||||
} else if (rule.t === 'set') {
|
||||
target.set(property,value);
|
||||
} else if (rule.t === 'change') {
|
||||
current = target.get(property);
|
||||
if (typeof current === 'string') {
|
||||
if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) {
|
||||
// str representation of exact from number/boolean
|
||||
// only replace if they match exactly
|
||||
target.set(property,value);
|
||||
} else {
|
||||
current = current.replace(fromRE,value);
|
||||
target.set(property,current);
|
||||
}
|
||||
} else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') {
|
||||
if (current == Number(fromValue)) {
|
||||
target.set(property,value);
|
||||
}
|
||||
} else if (typeof current === 'boolean' && fromType === 'bool') {
|
||||
if (current.toString() === fromValue) {
|
||||
target.set(property,value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch(err) {/*console.log(err.stack)*/}
|
||||
return msg;
|
||||
);
|
||||
}
|
||||
if (valid) {
|
||||
this.on('input', function(msg) {
|
||||
for (var i=0; i<this.rules.length; i++) {
|
||||
if (this.rules[i].t === "move") {
|
||||
var r = this.rules[i];
|
||||
if ((r.tot !== r.pt) || (r.p.indexOf(r.to) !== -1)) {
|
||||
msg = applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:r.p, tot:r.pt});
|
||||
applyRule(msg,{t:"delete", p:r.p, pt:r.pt});
|
||||
}
|
||||
else { // 2 step move if we are moving from a child
|
||||
msg = applyRule(msg,{t:"set", p:"_temp_move", pt:r.tot, to:r.p, tot:r.pt});
|
||||
applyRule(msg,{t:"delete", p:r.p, pt:r.pt});
|
||||
msg = applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:"_temp_move", tot:r.pt});
|
||||
applyRule(msg,{t:"delete", p:"_temp_move", pt:r.pt});
|
||||
}
|
||||
} else {
|
||||
msg = applyRule(msg,this.rules[i]);
|
||||
}
|
||||
if (msg === null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
node.send(msg);
|
||||
applyRules(msg, 0)
|
||||
.then( msg => { if (msg) { node.send(msg) }} )
|
||||
.catch( err => node.error(err, msg))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@@ -233,7 +233,7 @@ module.exports = function(RED) {
|
||||
RED.nodes.registerType("split",SplitNode);
|
||||
|
||||
|
||||
var _max_kept_msgs_count = undefined;
|
||||
var _max_kept_msgs_count;
|
||||
|
||||
function max_kept_msgs_count(node) {
|
||||
if (_max_kept_msgs_count === undefined) {
|
||||
@@ -252,7 +252,15 @@ module.exports = function(RED) {
|
||||
exp.assign("I", index);
|
||||
exp.assign("N", count);
|
||||
exp.assign("A", accum);
|
||||
return RED.util.evaluateJSONataExpression(exp, msg);
|
||||
return new Promise((resolve,reject) => {
|
||||
RED.util.evaluateJSONataExpression(exp, msg, (err, result) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function apply_f(exp, accum, count) {
|
||||
@@ -269,32 +277,37 @@ module.exports = function(RED) {
|
||||
return exp
|
||||
}
|
||||
|
||||
function reduce_and_send_group(node, group) {
|
||||
function reduceAndSendGroup(node, group) {
|
||||
var is_right = node.reduce_right;
|
||||
var flag = is_right ? -1 : 1;
|
||||
var msgs = group.msgs;
|
||||
var accum = eval_exp(node, node.exp_init, node.exp_init_type);
|
||||
var reduce_exp = node.reduce_exp;
|
||||
var reduce_fixup = node.reduce_fixup;
|
||||
var count = group.count;
|
||||
msgs.sort(function(x,y) {
|
||||
var ix = x.parts.index;
|
||||
var iy = y.parts.index;
|
||||
if (ix < iy) return -flag;
|
||||
if (ix > iy) return flag;
|
||||
return 0;
|
||||
return getInitialReduceValue(node, node.exp_init, node.exp_init_type).then(accum => {
|
||||
var reduce_exp = node.reduce_exp;
|
||||
var reduce_fixup = node.reduce_fixup;
|
||||
var count = group.count;
|
||||
msgs.sort(function(x,y) {
|
||||
var ix = x.parts.index;
|
||||
var iy = y.parts.index;
|
||||
if (ix < iy) {return -flag;}
|
||||
if (ix > iy) {return flag;}
|
||||
return 0;
|
||||
});
|
||||
|
||||
return msgs.reduce((promise, msg) => promise.then(accum => apply_r(reduce_exp, accum, msg, msg.parts.index, count)), Promise.resolve(accum))
|
||||
.then(accum => {
|
||||
if(reduce_fixup !== undefined) {
|
||||
accum = apply_f(reduce_fixup, accum, count);
|
||||
}
|
||||
node.send({payload: accum});
|
||||
});
|
||||
}).catch(err => {
|
||||
throw new Error(RED._("join.errors.invalid-expr",{error:e.message}));
|
||||
});
|
||||
for(var msg of msgs) {
|
||||
accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count);
|
||||
}
|
||||
if(reduce_fixup !== undefined) {
|
||||
accum = apply_f(reduce_fixup, accum, count);
|
||||
}
|
||||
node.send({payload: accum});
|
||||
}
|
||||
|
||||
function reduce_msg(node, msg) {
|
||||
if(msg.hasOwnProperty('parts')) {
|
||||
var promise;
|
||||
if (msg.hasOwnProperty('parts')) {
|
||||
var parts = msg.parts;
|
||||
var pending = node.pending;
|
||||
var pending_count = node.pending_count;
|
||||
@@ -312,65 +325,82 @@ module.exports = function(RED) {
|
||||
var group = pending[gid];
|
||||
var msgs = group.msgs;
|
||||
if(parts.hasOwnProperty('count') &&
|
||||
(group.count === undefined)) {
|
||||
(group.count === undefined)) {
|
||||
group.count = count;
|
||||
}
|
||||
msgs.push(msg);
|
||||
pending_count++;
|
||||
var completeProcess = function() {
|
||||
node.pending_count = pending_count;
|
||||
var max_msgs = max_kept_msgs_count(node);
|
||||
if ((max_msgs > 0) && (pending_count > max_msgs)) {
|
||||
node.pending = {};
|
||||
node.pending_count = 0;
|
||||
var promise = Promise.reject(RED._("join.too-many"));
|
||||
promise.catch(()=>{});
|
||||
return promise;
|
||||
}
|
||||
return Promise.resolve();
|
||||
}
|
||||
if(msgs.length === group.count) {
|
||||
delete pending[gid];
|
||||
try {
|
||||
pending_count -= msgs.length;
|
||||
reduce_and_send_group(node, group);
|
||||
} catch(e) {
|
||||
node.error(RED._("join.errors.invalid-expr",{error:e.message})); }
|
||||
pending_count -= msgs.length;
|
||||
promise = reduceAndSendGroup(node, group).then(completeProcess);
|
||||
} else {
|
||||
promise = completeProcess();
|
||||
}
|
||||
node.pending_count = pending_count;
|
||||
var max_msgs = max_kept_msgs_count(node);
|
||||
if ((max_msgs > 0) && (pending_count > max_msgs)) {
|
||||
node.pending = {};
|
||||
node.pending_count = 0;
|
||||
node.error(RED._("join.too-many"), msg);
|
||||
}
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
node.send(msg);
|
||||
}
|
||||
if (!promise) {
|
||||
promise = Promise.resolve();
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
function eval_exp(node, exp, exp_type) {
|
||||
if(exp_type === "flow") {
|
||||
return node.context().flow.get(exp);
|
||||
}
|
||||
else if(exp_type === "global") {
|
||||
return node.context().global.get(exp);
|
||||
}
|
||||
else if(exp_type === "str") {
|
||||
return exp;
|
||||
}
|
||||
else if(exp_type === "num") {
|
||||
return Number(exp);
|
||||
}
|
||||
else if(exp_type === "bool") {
|
||||
if (exp === 'true') {
|
||||
return true;
|
||||
function getInitialReduceValue(node, exp, exp_type) {
|
||||
return new Promise((resolve,reject) => {
|
||||
if(exp_type === "flow" || exp_type === "global") {
|
||||
node.context()[exp_type].get(exp,(err,value) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
return;
|
||||
} else if(exp_type === "jsonata") {
|
||||
var jexp = RED.util.prepareJSONataExpression(exp, node);
|
||||
RED.util.evaluateJSONataExpression(jexp, {},(err,value) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
else if (exp === 'false') {
|
||||
return false;
|
||||
var result;
|
||||
if(exp_type === "str") {
|
||||
result = exp;
|
||||
} else if(exp_type === "num") {
|
||||
result = Number(exp);
|
||||
} else if(exp_type === "bool") {
|
||||
if (exp === 'true') {
|
||||
result = true;
|
||||
} else if (exp === 'false') {
|
||||
result = false;
|
||||
}
|
||||
} else if ((exp_type === "bin") || (exp_type === "json")) {
|
||||
result = JSON.parse(exp);
|
||||
} else if(exp_type === "date") {
|
||||
result = Date.now();
|
||||
} else {
|
||||
reject(new Error("unexpected initial value type"));
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if ((exp_type === "bin") ||
|
||||
(exp_type === "json")) {
|
||||
return JSON.parse(exp);
|
||||
}
|
||||
else if(exp_type === "date") {
|
||||
return Date.now();
|
||||
}
|
||||
else if(exp_type === "jsonata") {
|
||||
var jexp = RED.util.prepareJSONataExpression(exp, node);
|
||||
return RED.util.evaluateJSONataExpression(jexp, {});
|
||||
}
|
||||
throw new Error("unexpected initial value type");
|
||||
resolve(result);
|
||||
});
|
||||
}
|
||||
|
||||
function JoinNode(n) {
|
||||
@@ -437,7 +467,8 @@ module.exports = function(RED) {
|
||||
newArray = newArray.concat(n);
|
||||
})
|
||||
group.payload = newArray;
|
||||
} else if (group.type === 'buffer') {
|
||||
}
|
||||
else if (group.type === 'buffer') {
|
||||
var buffers = [];
|
||||
var bufferLen = 0;
|
||||
if (group.joinChar !== undefined) {
|
||||
@@ -450,7 +481,8 @@ module.exports = function(RED) {
|
||||
buffers.push(group.payload[i]);
|
||||
bufferLen += group.payload[i].length;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
bufferLen = group.bufferLen;
|
||||
buffers = group.payload;
|
||||
}
|
||||
@@ -463,7 +495,8 @@ module.exports = function(RED) {
|
||||
groupJoinChar = group.joinChar.toString();
|
||||
}
|
||||
RED.util.setMessageProperty(group.msg,node.property,group.payload.join(groupJoinChar));
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
if (node.propertyType === 'full') {
|
||||
group.msg = RED.util.cloneMessage(group.msg);
|
||||
}
|
||||
@@ -471,13 +504,48 @@ module.exports = function(RED) {
|
||||
}
|
||||
if (group.msg.hasOwnProperty('parts') && group.msg.parts.hasOwnProperty('parts')) {
|
||||
group.msg.parts = group.msg.parts.parts;
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
delete group.msg.parts;
|
||||
}
|
||||
delete group.msg.complete;
|
||||
node.send(group.msg);
|
||||
}
|
||||
|
||||
var pendingMessages = [];
|
||||
var activeMessagePromise = null;
|
||||
// In reduce mode, we must process messages fully in order otherwise
|
||||
// groups may overlap and cause unexpected results. The use of JSONata
|
||||
// means some async processing *might* occur if flow/global context is
|
||||
// accessed.
|
||||
var processReduceMessageQueue = function(msg) {
|
||||
if (msg) {
|
||||
// A new message has arrived - add it to the message queue
|
||||
pendingMessages.push(msg);
|
||||
if (activeMessagePromise !== null) {
|
||||
// The node is currently processing a message, so do nothing
|
||||
// more with this message
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (pendingMessages.length === 0) {
|
||||
// There are no more messages to process, clear the active flag
|
||||
// and return
|
||||
activeMessagePromise = null;
|
||||
return;
|
||||
}
|
||||
|
||||
// There are more messages to process. Get the next message and
|
||||
// start processing it. Recurse back in to check for any more
|
||||
var nextMsg = pendingMessages.shift();
|
||||
activeMessagePromise = reduce_msg(node, nextMsg)
|
||||
.then(processReduceMessageQueue)
|
||||
.catch((err) => {
|
||||
node.error(err,nextMsg);
|
||||
return processReduceMessageQueue();
|
||||
});
|
||||
}
|
||||
|
||||
this.on("input", function(msg) {
|
||||
try {
|
||||
var property;
|
||||
@@ -516,8 +584,7 @@ module.exports = function(RED) {
|
||||
propertyIndex = msg.parts.index;
|
||||
}
|
||||
else if (node.mode === 'reduce') {
|
||||
reduce_msg(node, msg);
|
||||
return;
|
||||
return processReduceMessageQueue(msg);
|
||||
}
|
||||
else {
|
||||
// Use the node configuration to identify all of the group information
|
||||
@@ -525,7 +592,7 @@ module.exports = function(RED) {
|
||||
payloadType = node.build;
|
||||
targetCount = node.count;
|
||||
joinChar = node.joiner;
|
||||
if (targetCount === 0 && msg.hasOwnProperty('parts')) {
|
||||
if (n.count === "" && msg.hasOwnProperty('parts')) {
|
||||
targetCount = msg.parts.count || 0;
|
||||
}
|
||||
if (node.build === 'object') {
|
||||
@@ -554,7 +621,7 @@ module.exports = function(RED) {
|
||||
payload:{},
|
||||
targetCount:targetCount,
|
||||
type:"object",
|
||||
msg:msg
|
||||
msg:RED.util.cloneMessage(msg)
|
||||
};
|
||||
}
|
||||
else if (node.accumulate === true) {
|
||||
@@ -564,7 +631,7 @@ module.exports = function(RED) {
|
||||
payload:{},
|
||||
targetCount:targetCount,
|
||||
type:payloadType,
|
||||
msg:msg
|
||||
msg:RED.util.cloneMessage(msg)
|
||||
}
|
||||
if (payloadType === 'string' || payloadType === 'array' || payloadType === 'buffer') {
|
||||
inflight[partId].payload = [];
|
||||
@@ -576,7 +643,7 @@ module.exports = function(RED) {
|
||||
payload:[],
|
||||
targetCount:targetCount,
|
||||
type:payloadType,
|
||||
msg:msg
|
||||
msg:RED.util.cloneMessage(msg)
|
||||
};
|
||||
if (payloadType === 'string') {
|
||||
inflight[partId].joinChar = joinChar;
|
||||
@@ -627,14 +694,14 @@ module.exports = function(RED) {
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: currently reuse the last received - add option to pick first received
|
||||
group.msg = msg;
|
||||
group.msg = Object.assign(group.msg, msg);
|
||||
var tcnt = group.targetCount;
|
||||
if (msg.hasOwnProperty("parts")) { tcnt = group.targetCount || msg.parts.count; }
|
||||
if ((tcnt > 0 && group.currentCount >= tcnt) || msg.hasOwnProperty('complete')) {
|
||||
completeSend(partId);
|
||||
}
|
||||
} catch(err) {
|
||||
}
|
||||
catch(err) {
|
||||
console.log(err.stack);
|
||||
}
|
||||
});
|
||||
|
@@ -17,7 +17,7 @@
|
||||
module.exports = function(RED) {
|
||||
"use strict";
|
||||
|
||||
var _max_kept_msgs_count = undefined;
|
||||
var _max_kept_msgs_count;
|
||||
|
||||
function max_kept_msgs_count(node) {
|
||||
if (_max_kept_msgs_count === undefined) {
|
||||
@@ -32,30 +32,20 @@ module.exports = function(RED) {
|
||||
return _max_kept_msgs_count;
|
||||
}
|
||||
|
||||
function eval_jsonata(node, code, val) {
|
||||
try {
|
||||
return RED.util.evaluateJSONataExpression(code, val);
|
||||
}
|
||||
catch (e) {
|
||||
node.error(RED._("sort.invalid-exp"));
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
function get_context_val(node, name, dval) {
|
||||
var context = node.context();
|
||||
var val = context.get(name);
|
||||
if (val === undefined) {
|
||||
context.set(name, dval);
|
||||
return dval;
|
||||
}
|
||||
return val;
|
||||
}
|
||||
// function get_context_val(node, name, dval) {
|
||||
// var context = node.context();
|
||||
// var val = context.get(name);
|
||||
// if (val === undefined) {
|
||||
// context.set(name, dval);
|
||||
// return dval;
|
||||
// }
|
||||
// return val;
|
||||
// }
|
||||
|
||||
function SortNode(n) {
|
||||
RED.nodes.createNode(this, n);
|
||||
var node = this;
|
||||
var pending = get_context_val(node, 'pending', {})
|
||||
var pending = {};//get_context_val(node, 'pending', {})
|
||||
var pending_count = 0;
|
||||
var pending_id = 0;
|
||||
var order = n.order || "ascending";
|
||||
@@ -76,11 +66,10 @@ module.exports = function(RED) {
|
||||
}
|
||||
}
|
||||
var dir = (order === "ascending") ? 1 : -1;
|
||||
var conv = as_num
|
||||
? function(x) { return Number(x); }
|
||||
: function(x) { return x; };
|
||||
var conv = as_num ? function(x) { return Number(x); }
|
||||
: function(x) { return x; };
|
||||
|
||||
function gen_comp(key) {
|
||||
function generateComparisonFunction(key) {
|
||||
return function(x, y) {
|
||||
var xp = conv(key(x));
|
||||
var yp = conv(key(y));
|
||||
@@ -90,74 +79,97 @@ module.exports = function(RED) {
|
||||
};
|
||||
}
|
||||
|
||||
function send_group(group) {
|
||||
var key = key_is_exp
|
||||
? function(msg) {
|
||||
return eval_jsonata(node, key_exp, msg);
|
||||
}
|
||||
: function(msg) {
|
||||
return RED.util.getMessageProperty(msg, key_prop);
|
||||
};
|
||||
var comp = gen_comp(key);
|
||||
function sortMessageGroup(group) {
|
||||
var promise;
|
||||
var msgs = group.msgs;
|
||||
try {
|
||||
msgs.sort(comp);
|
||||
}
|
||||
catch (e) {
|
||||
return; // not send when error
|
||||
}
|
||||
for (var i = 0; i < msgs.length; i++) {
|
||||
var msg = msgs[i];
|
||||
msg.parts.index = i;
|
||||
node.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
function sort_payload(msg) {
|
||||
var data = RED.util.getMessageProperty(msg, target_prop);
|
||||
if (Array.isArray(data)) {
|
||||
var key = key_is_exp
|
||||
? function(elem) {
|
||||
return eval_jsonata(node, key_exp, elem);
|
||||
}
|
||||
: function(elem) { return elem; };
|
||||
var comp = gen_comp(key);
|
||||
if (key_is_exp) {
|
||||
var evaluatedDataPromises = msgs.map(msg => {
|
||||
return new Promise((resolve,reject) => {
|
||||
RED.util.evaluateJSONataExpression(key_exp, msg, (err, result) => {
|
||||
resolve({
|
||||
item: msg,
|
||||
sortValue: result
|
||||
})
|
||||
});
|
||||
})
|
||||
});
|
||||
promise = Promise.all(evaluatedDataPromises).then(evaluatedElements => {
|
||||
// Once all of the sort keys are evaluated, sort by them
|
||||
var comp = generateComparisonFunction(elem=>elem.sortValue);
|
||||
return evaluatedElements.sort(comp).map(elem=>elem.item);
|
||||
});
|
||||
} else {
|
||||
var key = function(msg) {
|
||||
return ;
|
||||
}
|
||||
var comp = generateComparisonFunction(msg => RED.util.getMessageProperty(msg, key_prop));
|
||||
try {
|
||||
data.sort(comp);
|
||||
msgs.sort(comp);
|
||||
}
|
||||
catch (e) {
|
||||
return false;
|
||||
return; // not send when error
|
||||
}
|
||||
return true;
|
||||
promise = Promise.resolve(msgs);
|
||||
}
|
||||
return false;
|
||||
return promise.then(msgs => {
|
||||
for (var i = 0; i < msgs.length; i++) {
|
||||
var msg = msgs[i];
|
||||
msg.parts.index = i;
|
||||
node.send(msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function check_parts(parts) {
|
||||
if (parts.hasOwnProperty("id") &&
|
||||
parts.hasOwnProperty("index")) {
|
||||
return true;
|
||||
function sortMessageProperty(msg) {
|
||||
var data = RED.util.getMessageProperty(msg, target_prop);
|
||||
if (Array.isArray(data)) {
|
||||
if (key_is_exp) {
|
||||
// key is an expression. Evaluated the expression for each item
|
||||
// to get its sort value. As this could be async, need to do
|
||||
// it first.
|
||||
var evaluatedDataPromises = data.map(elem => {
|
||||
return new Promise((resolve,reject) => {
|
||||
RED.util.evaluateJSONataExpression(key_exp, elem, (err, result) => {
|
||||
resolve({
|
||||
item: elem,
|
||||
sortValue: result
|
||||
})
|
||||
});
|
||||
})
|
||||
})
|
||||
return Promise.all(evaluatedDataPromises).then(evaluatedElements => {
|
||||
// Once all of the sort keys are evaluated, sort by them
|
||||
// and reconstruct the original message item with the newly
|
||||
// sorted values.
|
||||
var comp = generateComparisonFunction(elem=>elem.sortValue);
|
||||
data = evaluatedElements.sort(comp).map(elem=>elem.item);
|
||||
RED.util.setMessageProperty(msg, target_prop,data);
|
||||
return true;
|
||||
})
|
||||
} else {
|
||||
var comp = generateComparisonFunction(elem=>elem);
|
||||
try {
|
||||
data.sort(comp);
|
||||
} catch (e) {
|
||||
return Promise.resolve(false);
|
||||
}
|
||||
return Promise.resolve(true);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return Promise.resolve(false);
|
||||
}
|
||||
|
||||
function clear_pending() {
|
||||
function removeOldestPending() {
|
||||
var oldest;
|
||||
var oldest_key;
|
||||
for(var key in pending) {
|
||||
node.log(RED._("sort.clear"), pending[key].msgs[0]);
|
||||
delete pending[key];
|
||||
}
|
||||
pending_count = 0;
|
||||
}
|
||||
|
||||
function remove_oldest_pending() {
|
||||
var oldest = undefined;
|
||||
var oldest_key = undefined;
|
||||
for(var key in pending) {
|
||||
var item = pending[key];
|
||||
if((oldest === undefined) ||
|
||||
(oldest.seq_no > item.seq_no)) {
|
||||
oldest = item;
|
||||
oldest_key = key;
|
||||
if (pending.hasOwnProperty(key)) {
|
||||
var item = pending[key];
|
||||
if((oldest === undefined) ||
|
||||
(oldest.seq_no > item.seq_no)) {
|
||||
oldest = item;
|
||||
oldest_key = key;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(oldest !== undefined) {
|
||||
@@ -166,16 +178,18 @@ module.exports = function(RED) {
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
function process_msg(msg) {
|
||||
|
||||
function processMessage(msg) {
|
||||
if (target_is_prop) {
|
||||
if (sort_payload(msg)) {
|
||||
node.send(msg);
|
||||
}
|
||||
return;
|
||||
sortMessageProperty(msg).then(send => {
|
||||
if (send) {
|
||||
node.send(msg);
|
||||
}
|
||||
}).catch(err => {
|
||||
});
|
||||
}
|
||||
var parts = msg.parts;
|
||||
if (!check_parts(parts)) {
|
||||
if (!parts.hasOwnProperty("id") || !parts.hasOwnProperty("index")) {
|
||||
return;
|
||||
}
|
||||
var gid = parts.id;
|
||||
@@ -195,23 +209,29 @@ module.exports = function(RED) {
|
||||
pending_count++;
|
||||
if (group.count === msgs.length) {
|
||||
delete pending[gid]
|
||||
send_group(group);
|
||||
sortMessageGroup(group);
|
||||
pending_count -= msgs.length;
|
||||
}
|
||||
var max_msgs = max_kept_msgs_count(node);
|
||||
if ((max_msgs > 0) && (pending_count > max_msgs)) {
|
||||
pending_count -= remove_oldest_pending();
|
||||
node.error(RED._("sort.too-many"), msg);
|
||||
} else {
|
||||
var max_msgs = max_kept_msgs_count(node);
|
||||
if ((max_msgs > 0) && (pending_count > max_msgs)) {
|
||||
pending_count -= removeOldestPending();
|
||||
node.error(RED._("sort.too-many"), msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
this.on("input", function(msg) {
|
||||
process_msg(msg);
|
||||
processMessage(msg);
|
||||
});
|
||||
|
||||
this.on("close", function() {
|
||||
clear_pending();
|
||||
})
|
||||
for(var key in pending) {
|
||||
if (pending.hasOwnProperty(key)) {
|
||||
node.log(RED._("sort.clear"), pending[key].msgs[0]);
|
||||
delete pending[key];
|
||||
}
|
||||
}
|
||||
pending_count = 0; })
|
||||
}
|
||||
|
||||
RED.nodes.registerType("sort", SortNode);
|
||||
|
Reference in New Issue
Block a user