1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Merge branch '0.19' of https://github.com/node-red/node-red into api-access-token

This commit is contained in:
Hideki Nakamura 2018-07-10 09:33:25 -07:00
commit 42188b9f49
24 changed files with 1207 additions and 579 deletions

View File

@ -24,6 +24,10 @@ module.exports = function(grunt) {
nodemonArgs.push(flowFile);
}
var nonHeadless = grunt.option('non-headless');
if (nonHeadless) {
process.env.NODE_RED_NON_HEADLESS = 'true';
}
grunt.initConfig({
pkg: grunt.file.readJSON('package.json'),
paths: {

View File

@ -15,16 +15,11 @@
**/
(function($) {
var contextParse = function(v) {
var parts = {};
var m = /^#:\((\S+?)\)::(.*)$/.exec(v);
if (m) {
parts.option = m[1];
parts.value = m[2];
} else {
parts.value = v;
parts.option = RED.settings.context.default;
var parts = RED.utils.parseContextKey(v);
return {
option: parts.store,
value: parts.key
}
return parts;
}
var contextExport = function(v,opt) {
if (!opt) {

View File

@ -828,6 +828,20 @@ RED.utils = (function() {
return payload;
}
function parseContextKey(key) {
var parts = {};
var m = /^#:\((\S+?)\)::(.*)$/.exec(key);
if (m) {
parts.store = m[1];
parts.key = m[2];
} else {
parts.key = key;
if (RED.settings.context) {
parts.store = RED.settings.context.default;
}
}
return parts;
}
return {
createObjectElement: buildMessageElement,
@ -839,6 +853,7 @@ RED.utils = (function() {
getNodeIcon: getNodeIcon,
getNodeLabel: getNodeLabel,
addSpinnerOverlay: addSpinnerOverlay,
decodeObject: decodeObject
decodeObject: decodeObject,
parseContextKey: parseContextKey
}
})();

View File

@ -63,21 +63,33 @@ module.exports = function(RED) {
}
this.on("input",function(msg) {
try {
msg.topic = this.topic;
if ( (this.payloadType == null && this.payload === "") || this.payloadType === "date") {
msg.payload = Date.now();
} else if (this.payloadType == null) {
msg.payload = this.payload;
} else if (this.payloadType === 'none') {
msg.payload = "";
} else {
msg.payload = RED.util.evaluateNodeProperty(this.payload,this.payloadType,this,msg);
msg.topic = this.topic;
if (this.payloadType !== 'flow' && this.payloadType !== 'global') {
try {
if ( (this.payloadType == null && this.payload === "") || this.payloadType === "date") {
msg.payload = Date.now();
} else if (this.payloadType == null) {
msg.payload = this.payload;
} else if (this.payloadType === 'none') {
msg.payload = "";
} else {
msg.payload = RED.util.evaluateNodeProperty(this.payload,this.payloadType,this,msg);
}
this.send(msg);
msg = null;
} catch(err) {
this.error(err,msg);
}
this.send(msg);
msg = null;
} catch(err) {
this.error(err,msg);
} else {
RED.util.evaluateNodeProperty(this.payload,this.payloadType,this,msg, function(err,res) {
if (err) {
node.error(err,msg);
} else {
msg.payload = res;
node.send(msg);
}
});
}
});
}

View File

@ -76,8 +76,43 @@ module.exports = function(RED) {
var node = this;
node.topics = {};
this.on("input", function(msg) {
var pendingMessages = [];
var activeMessagePromise = null;
var processMessageQueue = function(msg) {
if (msg) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
if (activeMessagePromise !== null) {
// The node is currently processing a message, so do nothing
// more with this message
return;
}
}
if (pendingMessages.length === 0) {
// There are no more messages to process, clear the active flag
// and return
activeMessagePromise = null;
return;
}
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = processMessage(nextMsg)
.then(processMessageQueue)
.catch((err) => {
node.error(err,nextMsg);
return processMessageQueue();
});
}
this.on('input', function(msg) {
processMessageQueue(msg);
});
var processMessage = function(msg) {
var topic = msg.topic || "_none";
var promise;
if (node.bytopic === "all") { topic = "_none"; }
node.topics[topic] = node.topics[topic] || {};
if (msg.hasOwnProperty("reset") || ((node.reset !== '') && msg.hasOwnProperty("payload") && (msg.payload !== null) && msg.payload.toString && (msg.payload.toString() == node.reset)) ) {
@ -88,48 +123,88 @@ module.exports = function(RED) {
}
else {
if (((!node.topics[topic].tout) && (node.topics[topic].tout !== 0)) || (node.loop === true)) {
promise = Promise.resolve();
if (node.op2type === "pay" || node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
else if (node.op2Templated) { node.topics[topic].m2 = mustache.render(node.op2,msg); }
else if (node.op2type !== "nul") {
node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg);
}
if (node.op1type === "pay") { }
else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); }
else if (node.op1type !== "nul") {
msg.payload = RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg);
}
if (node.duration === 0) { node.topics[topic].tout = 0; }
else if (node.loop === true) {
/* istanbul ignore else */
if (node.topics[topic].tout) { clearInterval(node.topics[topic].tout); }
/* istanbul ignore else */
if (node.op1type !== "nul") {
var msg2 = RED.util.cloneMessage(msg);
node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, node.duration);
}
}
else {
if (!node.topics[topic].tout) {
node.topics[topic].tout = setTimeout(function() {
var msg2 = null;
if (node.op2type !== "nul") {
msg2 = RED.util.cloneMessage(msg);
if (node.op2type === "flow" || node.op2type === "global") {
node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg);
}
msg2.payload = node.topics[topic].m2;
delete node.topics[topic];
node.send(msg2);
promise = new Promise((resolve,reject) => {
RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => {
if (err) {
reject(err);
} else {
node.topics[topic].m2 = value;
resolve();
}
else { delete node.topics[topic]; }
node.status({});
}, node.duration);
}
});
});
}
node.status({fill:"blue",shape:"dot",text:" "});
if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); }
return promise.then(() => {
promise = Promise.resolve();
if (node.op1type === "pay") { }
else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); }
else if (node.op1type !== "nul") {
promise = new Promise((resolve,reject) => {
RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg,(err,value) => {
if (err) {
reject(err);
} else {
msg.payload = value;
resolve();
}
});
});
}
return promise.then(() => {
if (node.duration === 0) { node.topics[topic].tout = 0; }
else if (node.loop === true) {
/* istanbul ignore else */
if (node.topics[topic].tout) { clearInterval(node.topics[topic].tout); }
/* istanbul ignore else */
if (node.op1type !== "nul") {
var msg2 = RED.util.cloneMessage(msg);
node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, node.duration);
}
}
else {
if (!node.topics[topic].tout) {
node.topics[topic].tout = setTimeout(function() {
var msg2 = null;
if (node.op2type !== "nul") {
var promise = Promise.resolve();
msg2 = RED.util.cloneMessage(msg);
if (node.op2type === "flow" || node.op2type === "global") {
promise = new Promise((resolve,reject) => {
RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => {
if (err) {
reject(err);
} else {
node.topics[topic].m2 = value;
resolve();
}
});
});
}
promise.then(() => {
msg2.payload = node.topics[topic].m2;
delete node.topics[topic];
node.send(msg2);
node.status({});
}).catch(err => {
node.error(err);
});
} else {
delete node.topics[topic];
node.status({});
}
}, node.duration);
}
}
node.status({fill:"blue",shape:"dot",text:" "});
if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); }
});
});
}
else if ((node.extend === "true" || node.extend === true) && (node.duration > 0)) {
/* istanbul ignore else */
@ -138,25 +213,43 @@ module.exports = function(RED) {
if (node.topics[topic].tout) { clearTimeout(node.topics[topic].tout); }
node.topics[topic].tout = setTimeout(function() {
var msg2 = null;
var promise = Promise.resolve();
if (node.op2type !== "nul") {
if (node.op2type === "flow" || node.op2type === "global") {
node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg);
}
if (node.topics[topic] !== undefined) {
msg2 = RED.util.cloneMessage(msg);
msg2.payload = node.topics[topic].m2;
promise = new Promise((resolve,reject) => {
RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => {
if (err) {
reject(err);
} else {
node.topics[topic].m2 = value;
resolve();
}
});
});
}
}
delete node.topics[topic];
node.status({});
node.send(msg2);
promise.then(() => {
if (node.op2type !== "nul") {
if (node.topics[topic] !== undefined) {
msg2 = RED.util.cloneMessage(msg);
msg2.payload = node.topics[topic].m2;
}
}
delete node.topics[topic];
node.status({});
node.send(msg2);
}).catch(err => {
node.error(err);
});
}, node.duration);
}
else {
if (node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
}
}
});
return Promise.resolve();
}
this.on("close", function() {
for (var t in node.topics) {
/* istanbul ignore else */

View File

@ -23,10 +23,6 @@ from time import sleep
bounce = 25;
if sys.version_info >= (3,0):
print("Sorry - currently only configured to work with python 2.x")
sys.exit(1)
if len(sys.argv) > 2:
cmd = sys.argv[1].lower()
pin = int(sys.argv[2])
@ -34,7 +30,7 @@ if len(sys.argv) > 2:
GPIO.setwarnings(False)
if cmd == "pwm":
#print "Initialised pin "+str(pin)+" to PWM"
#print("Initialised pin "+str(pin)+" to PWM")
try:
freq = int(sys.argv[3])
except:
@ -54,10 +50,10 @@ if len(sys.argv) > 2:
GPIO.cleanup(pin)
sys.exit(0)
except Exception as ex:
print "bad data: "+data
print("bad data: "+data)
elif cmd == "buzz":
#print "Initialised pin "+str(pin)+" to Buzz"
#print("Initialised pin "+str(pin)+" to Buzz")
GPIO.setup(pin,GPIO.OUT)
p = GPIO.PWM(pin, 100)
p.stop()
@ -76,10 +72,10 @@ if len(sys.argv) > 2:
GPIO.cleanup(pin)
sys.exit(0)
except Exception as ex:
print "bad data: "+data
print("bad data: "+data)
elif cmd == "out":
#print "Initialised pin "+str(pin)+" to OUT"
#print("Initialised pin "+str(pin)+" to OUT")
GPIO.setup(pin,GPIO.OUT)
if len(sys.argv) == 4:
GPIO.output(pin,int(sys.argv[3]))
@ -103,11 +99,11 @@ if len(sys.argv) > 2:
GPIO.output(pin,data)
elif cmd == "in":
#print "Initialised pin "+str(pin)+" to IN"
#print("Initialised pin "+str(pin)+" to IN")
bounce = float(sys.argv[4])
def handle_callback(chan):
sleep(bounce/1000.0)
print GPIO.input(chan)
print(GPIO.input(chan))
if sys.argv[3].lower() == "up":
GPIO.setup(pin,GPIO.IN,GPIO.PUD_UP)
@ -116,7 +112,7 @@ if len(sys.argv) > 2:
else:
GPIO.setup(pin,GPIO.IN)
print GPIO.input(pin)
print(GPIO.input(pin))
GPIO.add_event_detect(pin, GPIO.BOTH, callback=handle_callback, bouncetime=int(bounce))
while True:
@ -129,7 +125,7 @@ if len(sys.argv) > 2:
sys.exit(0)
elif cmd == "byte":
#print "Initialised BYTE mode - "+str(pin)+
#print("Initialised BYTE mode - "+str(pin)+)
list = [7,11,13,12,15,16,18,22]
GPIO.setup(list,GPIO.OUT)
@ -152,7 +148,7 @@ if len(sys.argv) > 2:
GPIO.output(list[bit], data & mask)
elif cmd == "borg":
#print "Initialised BORG mode - "+str(pin)+
#print("Initialised BORG mode - "+str(pin)+)
GPIO.setup(11,GPIO.OUT)
GPIO.setup(13,GPIO.OUT)
GPIO.setup(15,GPIO.OUT)
@ -190,7 +186,7 @@ if len(sys.argv) > 2:
button = ord( buf[0] ) & pin # mask out just the required button(s)
if button != oldbutt: # only send if changed
oldbutt = button
print button
print(button)
while True:
try:
@ -215,7 +211,7 @@ if len(sys.argv) > 2:
# type,code,value
print("%u,%u" % (code, value))
event = file.read(EVENT_SIZE)
print "0,0"
print("0,0")
file.close()
sys.exit(0)
except:
@ -225,14 +221,14 @@ if len(sys.argv) > 2:
elif len(sys.argv) > 1:
cmd = sys.argv[1].lower()
if cmd == "rev":
print GPIO.RPI_REVISION
print(GPIO.RPI_REVISION)
elif cmd == "ver":
print GPIO.VERSION
print(GPIO.VERSION)
elif cmd == "info":
print GPIO.RPI_INFO
print(GPIO.RPI_INFO)
else:
print "Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}"
print " only ver (gpio version) and info (board information) accept no pin parameter."
print("Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}")
print(" only ver (gpio version) and info (board information) accept no pin parameter.")
else:
print "Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}"
print("Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}")

View File

@ -697,7 +697,9 @@
"errors": {
"dropped-object": "Ignored non-object payload",
"dropped": "Ignored unsupported payload type",
"dropped-error": "Failed to convert payload"
"dropped-error": "Failed to convert payload",
"schema-error": "JSON Schema error",
"schema-error-compile": "JSON Schema error: failed to compile schema"
},
"label": {
"o2j": "Object to JSON options",
@ -924,8 +926,8 @@
"ascending" : "ascending",
"descending" : "descending",
"as-number" : "as number",
"invalid-exp" : "invalid JSONata expression in sort node",
"too-many" : "too many pending messages in sort node",
"invalid-exp" : "Invalid JSONata expression in sort node: __message__",
"too-many" : "Too many pending messages in sort node",
"clear" : "clear pending message in sort node"
},
"batch" : {

View File

@ -99,11 +99,17 @@
}
return v;
}
function prop2name(key) {
var result = RED.utils.parseContextKey(key);
return result.key;
}
function getValueLabel(t,v) {
if (t === 'str') {
return '"'+clipValueLength(v)+'"';
} else if (t === 'msg' || t==='flow' || t==='global') {
} else if (t === 'msg') {
return t+"."+clipValueLength(v);
} else if (t === 'flow' || t === 'global') {
return t+"."+clipValueLength(prop2name(v));
}
return clipValueLength(v);
}

View File

@ -59,21 +59,157 @@ module.exports = function(RED) {
'else': function(a) { return a === true; }
};
var _max_kept_msgs_count = undefined;
var _maxKeptCount;
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
function getMaxKeptCount() {
if (_maxKeptCount === undefined) {
var name = "nodeMessageBufferMaxLength";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
_maxKeptCount = RED.settings[name];
}
else {
_max_kept_msgs_count = 0;
_maxKeptCount = 0;
}
}
return _max_kept_msgs_count;
return _maxKeptCount;
}
function getProperty(node,msg) {
return new Promise((resolve,reject) => {
if (node.propertyType === 'jsonata') {
RED.util.evaluateJSONataExpression(node.property,msg,(err,value) => {
if (err) {
reject(RED._("switch.errors.invalid-expr",{error:err.message}));
} else {
resolve(value);
}
});
} else {
RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg,(err,value) => {
if (err) {
resolve(undefined);
} else {
resolve(value);
}
});
}
});
}
function getV1(node,msg,rule,hasParts) {
return new Promise( (resolve,reject) => {
if (rule.vt === 'prev') {
resolve(node.previousValue);
} else if (rule.vt === 'jsonata') {
var exp = rule.v;
if (rule.t === 'jsonata_exp') {
if (hasParts) {
exp.assign("I", msg.parts.index);
exp.assign("N", msg.parts.count);
}
}
RED.util.evaluateJSONataExpression(exp,msg,(err,value) => {
if (err) {
reject(RED._("switch.errors.invalid-expr",{error:err.message}));
} else {
resolve(value);
}
});
} else if (rule.vt === 'json') {
resolve("json");
} else if (rule.vt === 'null') {
resolve("null");
} else {
RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg, function(err,value) {
if (err) {
resolve(undefined);
} else {
resolve(value);
}
});
}
});
}
function getV2(node,msg,rule) {
return new Promise((resolve,reject) => {
var v2 = rule.v2;
if (rule.v2t === 'prev') {
resolve(node.previousValue);
} else if (rule.v2t === 'jsonata') {
RED.util.evaluateJSONataExpression(rule.v2,msg,(err,value) => {
if (err) {
reject(RED._("switch.errors.invalid-expr",{error:err.message}));
} else {
resolve(value);
}
});
} else if (typeof v2 !== 'undefined') {
RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg, function(err,value) {
if (err) {
resolve(undefined);
} else {
resolve(value);
}
});
} else {
resolve(v2);
}
})
}
function applyRule(node, msg, property, state) {
return new Promise((resolve,reject) => {
var rule = node.rules[state.currentRule];
var v1,v2;
getV1(node,msg,rule,state.hasParts).then(value => {
v1 = value;
}).then(()=>getV2(node,msg,rule)).then(value => {
v2 = value;
}).then(() => {
if (rule.t == "else") {
property = state.elseflag;
state.elseflag = true;
}
if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) {
state.onward.push(msg);
state.elseflag = false;
if (node.checkall == "false") {
return resolve(false);
}
} else {
state.onward.push(null);
}
resolve(state.currentRule < node.rules.length - 1);
});
})
}
function applyRules(node, msg, property,state) {
if (!state) {
state = {
currentRule: 0,
elseflag: true,
onward: [],
hasParts: msg.hasOwnProperty("parts") &&
msg.parts.hasOwnProperty("id") &&
msg.parts.hasOwnProperty("index")
}
}
return applyRule(node,msg,property,state).then(hasMore => {
if (hasMore) {
state.currentRule++;
return applyRules(node,msg,property,state);
} else {
node.previousValue = property;
return state.onward;
}
});
}
function SwitchNode(n) {
RED.nodes.createNode(this, n);
this.rules = n.rules || [];
@ -94,10 +230,10 @@ module.exports = function(RED) {
var node = this;
var valid = true;
var repair = n.repair;
var needs_count = repair;
var needsCount = repair;
for (var i=0; i<this.rules.length; i+=1) {
var rule = this.rules[i];
needs_count = needs_count || ((rule.t === "tail") || (rule.t === "jsonata_exp"));
needsCount = needsCount || ((rule.t === "tail") || (rule.t === "jsonata_exp"));
if (!rule.vt) {
if (!isNaN(Number(rule.v))) {
rule.vt = 'num';
@ -142,26 +278,26 @@ module.exports = function(RED) {
return;
}
var pending_count = 0;
var pending_id = 0;
var pending_in = {};
var pending_out = {};
var pendingCount = 0;
var pendingId = 0;
var pendingIn = {};
var pendingOut = {};
var received = {};
function add2group_in(id, msg, parts) {
if (!(id in pending_in)) {
pending_in[id] = {
function addMessageToGroup(id, msg, parts) {
if (!(id in pendingIn)) {
pendingIn[id] = {
count: undefined,
msgs: [],
seq_no: pending_id++
seq_no: pendingId++
};
}
var group = pending_in[id];
var group = pendingIn[id];
group.msgs.push(msg);
pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
clear_pending();
pendingCount++;
var max_msgs = getMaxKeptCount();
if ((max_msgs > 0) && (pendingCount > max_msgs)) {
clearPending();
node.error(RED._("switch.errors.too-many"), msg);
}
if (parts.hasOwnProperty("count")) {
@ -170,32 +306,29 @@ module.exports = function(RED) {
return group;
}
function del_group_in(id, group) {
pending_count -= group.msgs.length;
delete pending_in[id];
}
function add2pending_in(msg) {
function addMessageToPending(msg) {
var parts = msg.parts;
if (parts.hasOwnProperty("id") &&
parts.hasOwnProperty("index")) {
var group = add2group_in(parts.id, msg, parts);
var msgs = group.msgs;
var count = group.count;
if (count === msgs.length) {
for (var i = 0; i < msgs.length; i++) {
var msg = msgs[i];
// We've already checked the msg.parts has the require bits
var group = addMessageToGroup(parts.id, msg, parts);
var msgs = group.msgs;
var count = group.count;
if (count === msgs.length) {
// We have a complete group - send the individual parts
return msgs.reduce((promise, msg) => {
return promise.then((result) => {
msg.parts.count = count;
process_msg(msg, false);
}
del_group_in(parts.id, group);
}
return true;
return processMessage(msg, false);
})
}, Promise.resolve()).then( () => {
pendingCount -= group.msgs.length;
delete pendingIn[parts.id];
});
}
return false;
return Promise.resolve();
}
function send_group(onwards, port_count) {
function sendGroup(onwards, port_count) {
var counts = new Array(port_count).fill(0);
for (var i = 0; i < onwards.length; i++) {
var onward = onwards[i];
@ -230,141 +363,104 @@ module.exports = function(RED) {
}
}
function send2ports(onward, msg) {
function sendGroupMessages(onward, msg) {
var parts = msg.parts;
var gid = parts.id;
received[gid] = ((gid in received) ? received[gid] : 0) +1;
var send_ok = (received[gid] === parts.count);
if (!(gid in pending_out)) {
pending_out[gid] = {
if (!(gid in pendingOut)) {
pendingOut[gid] = {
onwards: []
};
}
var group = pending_out[gid];
var group = pendingOut[gid];
var onwards = group.onwards;
onwards.push(onward);
pending_count++;
pendingCount++;
if (send_ok) {
send_group(onwards, onward.length, msg);
pending_count -= onward.length;
delete pending_out[gid];
sendGroup(onwards, onward.length, msg);
pendingCount -= onward.length;
delete pendingOut[gid];
delete received[gid];
}
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
clear_pending();
var max_msgs = getMaxKeptCount();
if ((max_msgs > 0) && (pendingCount > max_msgs)) {
clearPending();
node.error(RED._("switch.errors.too-many"), msg);
}
}
function msg_has_parts(msg) {
if (msg.hasOwnProperty("parts")) {
var parts = msg.parts;
return (parts.hasOwnProperty("id") &&
parts.hasOwnProperty("index"));
function processMessage(msg, checkParts) {
var hasParts = msg.hasOwnProperty("parts") &&
msg.parts.hasOwnProperty("id") &&
msg.parts.hasOwnProperty("index");
if (needsCount && checkParts && hasParts) {
return addMessageToPending(msg);
}
return false;
return getProperty(node,msg)
.then(property => applyRules(node,msg,property))
.then(onward => {
if (!repair || !hasParts) {
node.send(onward);
}
else {
sendGroupMessages(onward, msg);
}
}).catch(err => {
node.warn(err);
});
}
function process_msg(msg, check_parts) {
var has_parts = msg_has_parts(msg);
if (needs_count && check_parts && has_parts &&
add2pending_in(msg)) {
return;
}
var onward = [];
try {
var prop;
if (node.propertyType === 'jsonata') {
prop = RED.util.evaluateJSONataExpression(node.property,msg);
} else {
prop = RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg);
}
var elseflag = true;
for (var i=0; i<node.rules.length; i+=1) {
var rule = node.rules[i];
var test = prop;
var v1,v2;
if (rule.vt === 'prev') {
v1 = node.previousValue;
} else if (rule.vt === 'jsonata') {
try {
var exp = rule.v;
if (rule.t === 'jsonata_exp') {
if (has_parts) {
exp.assign("I", msg.parts.index);
exp.assign("N", msg.parts.count);
}
}
v1 = RED.util.evaluateJSONataExpression(exp,msg);
} catch(err) {
node.error(RED._("switch.errors.invalid-expr",{error:err.message}));
return;
}
} else if (rule.vt === 'json') {
v1 = "json";
} else if (rule.vt === 'null') {
v1 = "null";
} else {
try {
v1 = RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg);
} catch(err) {
v1 = undefined;
}
}
v2 = rule.v2;
if (rule.v2t === 'prev') {
v2 = node.previousValue;
} else if (rule.v2t === 'jsonata') {
try {
v2 = RED.util.evaluateJSONataExpression(rule.v2,msg);
} catch(err) {
node.error(RED._("switch.errors.invalid-expr",{error:err.message}));
return;
}
} else if (typeof v2 !== 'undefined') {
try {
v2 = RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg);
} catch(err) {
v2 = undefined;
}
}
if (rule.t == "else") { test = elseflag; elseflag = true; }
if (operators[rule.t](test,v1,v2,rule.case,msg.parts)) {
onward.push(msg);
elseflag = false;
if (node.checkall == "false") { break; }
} else {
onward.push(null);
}
}
node.previousValue = prop;
if (!repair || !has_parts) {
node.send(onward);
}
else {
send2ports(onward, msg);
}
} catch(err) {
node.warn(err);
}
}
function clear_pending() {
pending_count = 0;
pending_id = 0;
pending_in = {};
pending_out = {};
function clearPending() {
pendingCount = 0;
pendingId = 0;
pendingIn = {};
pendingOut = {};
received = {};
}
var pendingMessages = [];
var activeMessagePromise = null;
var processMessageQueue = function(msg) {
if (msg) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
if (activeMessagePromise !== null) {
// The node is currently processing a message, so do nothing
// more with this message
return;
}
}
if (pendingMessages.length === 0) {
// There are no more messages to process, clear the active flag
// and return
activeMessagePromise = null;
return;
}
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = processMessage(nextMsg,true)
.then(processMessageQueue)
.catch((err) => {
node.error(err,nextMsg);
return processMessageQueue();
});
}
this.on('input', function(msg) {
process_msg(msg, true);
processMessageQueue(msg);
});
this.on('close', function() {
clear_pending();
clearPending();
});
}

View File

@ -54,6 +54,10 @@
outputs: 1,
icon: "swap.png",
label: function() {
function prop2name(type, key) {
var result = RED.utils.parseContextKey(key);
return type +"." +result.key;
}
if (this.name) {
return this.name;
}
@ -70,13 +74,13 @@
} else {
if (this.rules.length == 1) {
if (this.rules[0].t === "set") {
return this._("change.label.set",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p});
return this._("change.label.set",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)});
} else if (this.rules[0].t === "change") {
return this._("change.label.change",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p});
return this._("change.label.change",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)});
} else if (this.rules[0].t === "move") {
return this._("change.label.move",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p});
return this._("change.label.move",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)});
} else {
return this._("change.label.delete",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p});
return this._("change.label.delete",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)});
}
} else {
return this._("change.label.changeCount",{count:this.rules.length});

View File

@ -98,44 +98,53 @@ module.exports = function(RED) {
}
}
function applyRule(msg,rule) {
try {
var property = rule.p;
var value = rule.to;
if (rule.tot === 'json') {
value = JSON.parse(rule.to);
} else if (rule.tot === 'bin') {
value = Buffer.from(JSON.parse(rule.to))
}
var current;
var fromValue;
var fromType;
var fromRE;
if (rule.tot === "msg") {
value = RED.util.getMessageProperty(msg,rule.to);
} else if (rule.tot === 'flow') {
value = node.context().flow.get(rule.to);
} else if (rule.tot === 'global') {
value = node.context().global.get(rule.to);
} else if (rule.tot === 'date') {
value = Date.now();
} else if (rule.tot === 'jsonata') {
try{
value = RED.util.evaluateJSONataExpression(rule.to,msg);
} catch(err) {
node.error(RED._("change.errors.invalid-expr",{error:err.message}),msg);
return;
}
}
if (rule.t === 'change') {
if (rule.fromt === 'msg' || rule.fromt === 'flow' || rule.fromt === 'global') {
if (rule.fromt === "msg") {
fromValue = RED.util.getMessageProperty(msg,rule.from);
} else if (rule.fromt === 'flow') {
fromValue = node.context().flow.get(rule.from);
} else if (rule.fromt === 'global') {
fromValue = node.context().global.get(rule.from);
function getToValue(msg,rule) {
var value = rule.to;
if (rule.tot === 'json') {
value = JSON.parse(rule.to);
} else if (rule.tot === 'bin') {
value = Buffer.from(JSON.parse(rule.to))
}
if (rule.tot === "msg") {
value = RED.util.getMessageProperty(msg,rule.to);
} else if (rule.tot === 'flow') {
value = node.context().flow.get(rule.to);
} else if (rule.tot === 'global') {
value = node.context().global.get(rule.to);
} else if (rule.tot === 'date') {
value = Date.now();
} else if (rule.tot === 'jsonata') {
return new Promise((resolve,reject) => {
RED.util.evaluateJSONataExpression(rule.to,msg, (err, value) => {
if (err) {
reject(RED._("change.errors.invalid-expr",{error:err.message}))
} else {
resolve(value);
}
});
});
}
return Promise.resolve(value);
}
function getFromValue(msg,rule) {
var fromValue;
var fromType;
var fromRE;
if (rule.t === 'change') {
if (rule.fromt === 'msg' || rule.fromt === 'flow' || rule.fromt === 'global') {
return new Promise((resolve,reject) => {
if (rule.fromt === "msg") {
resolve(RED.util.getMessageProperty(msg,rule.from));
} else if (rule.fromt === 'flow' || rule.fromt === 'global') {
node.context()[rule.fromt].get(rule.from,(err,fromValue) => {
if (err) {
reject(err);
} else {
resolve(fromValue);
}
});
}
}).then(fromValue => {
if (typeof fromValue === 'number' || fromValue instanceof Number) {
fromType = 'num';
} else if (typeof fromValue === 'boolean') {
@ -149,108 +158,161 @@ module.exports = function(RED) {
try {
fromRE = new RegExp(fromRE, "g");
} catch (e) {
valid = false;
node.error(RED._("change.errors.invalid-from",{error:e.message}),msg);
reject(new Error(RED._("change.errors.invalid-from",{error:e.message})));
return;
}
} else {
node.error(RED._("change.errors.invalid-from",{error:"unsupported type: "+(typeof fromValue)}),msg);
return
reject(new Error(RED._("change.errors.invalid-from",{error:"unsupported type: "+(typeof fromValue)})));
return;
}
return {
fromType,
fromValue,
fromRE
}
});
} else {
fromType = rule.fromt;
fromValue = rule.from;
fromRE = rule.fromRE;
}
}
return Promise.resolve({
fromType,
fromValue,
fromRE
});
}
function applyRule(msg,rule) {
var property = rule.p;
var current;
var fromValue;
var fromType;
var fromRE;
try {
return getToValue(msg,rule).then(value => {
return getFromValue(msg,rule).then(fromParts => {
fromValue = fromParts.fromValue;
fromType = fromParts.fromType;
fromRE = fromParts.fromRE;
if (rule.pt === 'msg') {
try {
if (rule.t === 'delete') {
RED.util.setMessageProperty(msg,property,undefined);
} else if (rule.t === 'set') {
RED.util.setMessageProperty(msg,property,value);
} else if (rule.t === 'change') {
current = RED.util.getMessageProperty(msg,property);
if (typeof current === 'string') {
if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) {
// str representation of exact from number/boolean
// only replace if they match exactly
RED.util.setMessageProperty(msg,property,value);
} else {
current = current.replace(fromRE,value);
RED.util.setMessageProperty(msg,property,current);
}
} else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') {
if (current == Number(fromValue)) {
RED.util.setMessageProperty(msg,property,value);
}
} else if (typeof current === 'boolean' && fromType === 'bool') {
if (current.toString() === fromValue) {
RED.util.setMessageProperty(msg,property,value);
}
}
}
} catch(err) {}
return msg;
} else if (rule.pt === 'flow' || rule.pt === 'global') {
return new Promise((resolve,reject) => {
var target = node.context()[rule.pt];
var callback = err => {
if (err) {
reject(err);
} else {
resolve(msg);
}
}
if (rule.t === 'delete') {
target.set(property,undefined,callback);
} else if (rule.t === 'set') {
target.set(property,value,callback);
} else if (rule.t === 'change') {
target.get(property,(err,current) => {
if (err) {
reject(err);
return;
}
if (typeof current === 'string') {
if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) {
// str representation of exact from number/boolean
// only replace if they match exactly
target.set(property,value,callback);
} else {
current = current.replace(fromRE,value);
target.set(property,current,callback);
}
} else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') {
if (current == Number(fromValue)) {
target.set(property,value,callback);
}
} else if (typeof current === 'boolean' && fromType === 'bool') {
if (current.toString() === fromValue) {
target.set(property,value,callback);
}
}
});
}
});
}
});
}).catch(err => {
node.error(err, msg);
return null;
});
} catch(err) {
return Promise.resolve(msg);
}
}
function applyRules(msg, currentRule) {
var r = node.rules[currentRule];
var rulePromise;
if (r.t === "move") {
if ((r.tot !== r.pt) || (r.p.indexOf(r.to) !== -1)) {
rulePromise = applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:r.p, tot:r.pt}).then(
msg => applyRule(msg,{t:"delete", p:r.p, pt:r.pt})
);
}
else { // 2 step move if we are moving from a child
rulePromise = applyRule(msg,{t:"set", p:"_temp_move", pt:r.tot, to:r.p, tot:r.pt}).then(
msg => applyRule(msg,{t:"delete", p:r.p, pt:r.pt})
).then(
msg => applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:"_temp_move", tot:r.pt})
).then(
msg => applyRule(msg,{t:"delete", p:"_temp_move", pt:r.pt})
)
}
} else {
rulePromise = applyRule(msg,r);
}
return rulePromise.then(
msg => {
if (!msg) {
return
} else if (currentRule === node.rules.length - 1) {
return msg;
} else {
fromType = rule.fromt;
fromValue = rule.from;
fromRE = rule.fromRE;
return applyRules(msg, currentRule+1);
}
}
if (rule.pt === 'msg') {
if (rule.t === 'delete') {
RED.util.setMessageProperty(msg,property,undefined);
} else if (rule.t === 'set') {
RED.util.setMessageProperty(msg,property,value);
} else if (rule.t === 'change') {
current = RED.util.getMessageProperty(msg,property);
if (typeof current === 'string') {
if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) {
// str representation of exact from number/boolean
// only replace if they match exactly
RED.util.setMessageProperty(msg,property,value);
} else {
current = current.replace(fromRE,value);
RED.util.setMessageProperty(msg,property,current);
}
} else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') {
if (current == Number(fromValue)) {
RED.util.setMessageProperty(msg,property,value);
}
} else if (typeof current === 'boolean' && fromType === 'bool') {
if (current.toString() === fromValue) {
RED.util.setMessageProperty(msg,property,value);
}
}
}
}
else {
var target;
if (rule.pt === 'flow') {
target = node.context().flow;
} else if (rule.pt === 'global') {
target = node.context().global;
}
if (target) {
if (rule.t === 'delete') {
target.set(property,undefined);
} else if (rule.t === 'set') {
target.set(property,value);
} else if (rule.t === 'change') {
current = target.get(property);
if (typeof current === 'string') {
if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) {
// str representation of exact from number/boolean
// only replace if they match exactly
target.set(property,value);
} else {
current = current.replace(fromRE,value);
target.set(property,current);
}
} else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') {
if (current == Number(fromValue)) {
target.set(property,value);
}
} else if (typeof current === 'boolean' && fromType === 'bool') {
if (current.toString() === fromValue) {
target.set(property,value);
}
}
}
}
}
} catch(err) {/*console.log(err.stack)*/}
return msg;
);
}
if (valid) {
this.on('input', function(msg) {
for (var i=0; i<this.rules.length; i++) {
if (this.rules[i].t === "move") {
var r = this.rules[i];
if ((r.tot !== r.pt) || (r.p.indexOf(r.to) !== -1)) {
msg = applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:r.p, tot:r.pt});
applyRule(msg,{t:"delete", p:r.p, pt:r.pt});
}
else { // 2 step move if we are moving from a child
msg = applyRule(msg,{t:"set", p:"_temp_move", pt:r.tot, to:r.p, tot:r.pt});
applyRule(msg,{t:"delete", p:r.p, pt:r.pt});
msg = applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:"_temp_move", tot:r.pt});
applyRule(msg,{t:"delete", p:"_temp_move", pt:r.pt});
}
} else {
msg = applyRule(msg,this.rules[i]);
}
if (msg === null) {
return;
}
}
node.send(msg);
applyRules(msg, 0)
.then( msg => { if (msg) { node.send(msg) }} )
.catch( err => node.error(err, msg))
});
}
}

View File

@ -233,7 +233,7 @@ module.exports = function(RED) {
RED.nodes.registerType("split",SplitNode);
var _max_kept_msgs_count = undefined;
var _max_kept_msgs_count;
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
@ -252,7 +252,15 @@ module.exports = function(RED) {
exp.assign("I", index);
exp.assign("N", count);
exp.assign("A", accum);
return RED.util.evaluateJSONataExpression(exp, msg);
return new Promise((resolve,reject) => {
RED.util.evaluateJSONataExpression(exp, msg, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
}
function apply_f(exp, accum, count) {
@ -269,32 +277,37 @@ module.exports = function(RED) {
return exp
}
function reduce_and_send_group(node, group) {
function reduceAndSendGroup(node, group) {
var is_right = node.reduce_right;
var flag = is_right ? -1 : 1;
var msgs = group.msgs;
var accum = eval_exp(node, node.exp_init, node.exp_init_type);
var reduce_exp = node.reduce_exp;
var reduce_fixup = node.reduce_fixup;
var count = group.count;
msgs.sort(function(x,y) {
var ix = x.parts.index;
var iy = y.parts.index;
if (ix < iy) return -flag;
if (ix > iy) return flag;
return 0;
return getInitialReduceValue(node, node.exp_init, node.exp_init_type).then(accum => {
var reduce_exp = node.reduce_exp;
var reduce_fixup = node.reduce_fixup;
var count = group.count;
msgs.sort(function(x,y) {
var ix = x.parts.index;
var iy = y.parts.index;
if (ix < iy) {return -flag;}
if (ix > iy) {return flag;}
return 0;
});
return msgs.reduce((promise, msg) => promise.then(accum => apply_r(reduce_exp, accum, msg, msg.parts.index, count)), Promise.resolve(accum))
.then(accum => {
if(reduce_fixup !== undefined) {
accum = apply_f(reduce_fixup, accum, count);
}
node.send({payload: accum});
});
}).catch(err => {
throw new Error(RED._("join.errors.invalid-expr",{error:e.message}));
});
for(var msg of msgs) {
accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count);
}
if(reduce_fixup !== undefined) {
accum = apply_f(reduce_fixup, accum, count);
}
node.send({payload: accum});
}
function reduce_msg(node, msg) {
if(msg.hasOwnProperty('parts')) {
var promise;
if (msg.hasOwnProperty('parts')) {
var parts = msg.parts;
var pending = node.pending;
var pending_count = node.pending_count;
@ -312,65 +325,82 @@ module.exports = function(RED) {
var group = pending[gid];
var msgs = group.msgs;
if(parts.hasOwnProperty('count') &&
(group.count === undefined)) {
(group.count === undefined)) {
group.count = count;
}
msgs.push(msg);
pending_count++;
var completeProcess = function() {
node.pending_count = pending_count;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
var promise = Promise.reject(RED._("join.too-many"));
promise.catch(()=>{});
return promise;
}
return Promise.resolve();
}
if(msgs.length === group.count) {
delete pending[gid];
try {
pending_count -= msgs.length;
reduce_and_send_group(node, group);
} catch(e) {
node.error(RED._("join.errors.invalid-expr",{error:e.message})); }
pending_count -= msgs.length;
promise = reduceAndSendGroup(node, group).then(completeProcess);
} else {
promise = completeProcess();
}
node.pending_count = pending_count;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
node.error(RED._("join.too-many"), msg);
}
}
else {
} else {
node.send(msg);
}
if (!promise) {
promise = Promise.resolve();
}
return promise;
}
function eval_exp(node, exp, exp_type) {
if(exp_type === "flow") {
return node.context().flow.get(exp);
}
else if(exp_type === "global") {
return node.context().global.get(exp);
}
else if(exp_type === "str") {
return exp;
}
else if(exp_type === "num") {
return Number(exp);
}
else if(exp_type === "bool") {
if (exp === 'true') {
return true;
function getInitialReduceValue(node, exp, exp_type) {
return new Promise((resolve,reject) => {
if(exp_type === "flow" || exp_type === "global") {
node.context()[exp_type].get(exp,(err,value) => {
if (err) {
reject(err);
} else {
resolve(value);
}
});
return;
} else if(exp_type === "jsonata") {
var jexp = RED.util.prepareJSONataExpression(exp, node);
RED.util.evaluateJSONataExpression(jexp, {},(err,value) => {
if (err) {
reject(err);
} else {
resolve(value);
}
});
return;
}
else if (exp === 'false') {
return false;
var result;
if(exp_type === "str") {
result = exp;
} else if(exp_type === "num") {
result = Number(exp);
} else if(exp_type === "bool") {
if (exp === 'true') {
result = true;
} else if (exp === 'false') {
result = false;
}
} else if ((exp_type === "bin") || (exp_type === "json")) {
result = JSON.parse(exp);
} else if(exp_type === "date") {
result = Date.now();
} else {
reject(new Error("unexpected initial value type"));
return;
}
}
else if ((exp_type === "bin") ||
(exp_type === "json")) {
return JSON.parse(exp);
}
else if(exp_type === "date") {
return Date.now();
}
else if(exp_type === "jsonata") {
var jexp = RED.util.prepareJSONataExpression(exp, node);
return RED.util.evaluateJSONataExpression(jexp, {});
}
throw new Error("unexpected initial value type");
resolve(result);
});
}
function JoinNode(n) {
@ -437,7 +467,8 @@ module.exports = function(RED) {
newArray = newArray.concat(n);
})
group.payload = newArray;
} else if (group.type === 'buffer') {
}
else if (group.type === 'buffer') {
var buffers = [];
var bufferLen = 0;
if (group.joinChar !== undefined) {
@ -450,7 +481,8 @@ module.exports = function(RED) {
buffers.push(group.payload[i]);
bufferLen += group.payload[i].length;
}
} else {
}
else {
bufferLen = group.bufferLen;
buffers = group.payload;
}
@ -463,7 +495,8 @@ module.exports = function(RED) {
groupJoinChar = group.joinChar.toString();
}
RED.util.setMessageProperty(group.msg,node.property,group.payload.join(groupJoinChar));
} else {
}
else {
if (node.propertyType === 'full') {
group.msg = RED.util.cloneMessage(group.msg);
}
@ -471,13 +504,48 @@ module.exports = function(RED) {
}
if (group.msg.hasOwnProperty('parts') && group.msg.parts.hasOwnProperty('parts')) {
group.msg.parts = group.msg.parts.parts;
} else {
}
else {
delete group.msg.parts;
}
delete group.msg.complete;
node.send(group.msg);
}
var pendingMessages = [];
var activeMessagePromise = null;
// In reduce mode, we must process messages fully in order otherwise
// groups may overlap and cause unexpected results. The use of JSONata
// means some async processing *might* occur if flow/global context is
// accessed.
var processReduceMessageQueue = function(msg) {
if (msg) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
if (activeMessagePromise !== null) {
// The node is currently processing a message, so do nothing
// more with this message
return;
}
}
if (pendingMessages.length === 0) {
// There are no more messages to process, clear the active flag
// and return
activeMessagePromise = null;
return;
}
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = reduce_msg(node, nextMsg)
.then(processReduceMessageQueue)
.catch((err) => {
node.error(err,nextMsg);
return processReduceMessageQueue();
});
}
this.on("input", function(msg) {
try {
var property;
@ -516,8 +584,7 @@ module.exports = function(RED) {
propertyIndex = msg.parts.index;
}
else if (node.mode === 'reduce') {
reduce_msg(node, msg);
return;
return processReduceMessageQueue(msg);
}
else {
// Use the node configuration to identify all of the group information
@ -525,7 +592,7 @@ module.exports = function(RED) {
payloadType = node.build;
targetCount = node.count;
joinChar = node.joiner;
if (targetCount === 0 && msg.hasOwnProperty('parts')) {
if (n.count === "" && msg.hasOwnProperty('parts')) {
targetCount = msg.parts.count || 0;
}
if (node.build === 'object') {
@ -554,7 +621,7 @@ module.exports = function(RED) {
payload:{},
targetCount:targetCount,
type:"object",
msg:msg
msg:RED.util.cloneMessage(msg)
};
}
else if (node.accumulate === true) {
@ -564,7 +631,7 @@ module.exports = function(RED) {
payload:{},
targetCount:targetCount,
type:payloadType,
msg:msg
msg:RED.util.cloneMessage(msg)
}
if (payloadType === 'string' || payloadType === 'array' || payloadType === 'buffer') {
inflight[partId].payload = [];
@ -576,7 +643,7 @@ module.exports = function(RED) {
payload:[],
targetCount:targetCount,
type:payloadType,
msg:msg
msg:RED.util.cloneMessage(msg)
};
if (payloadType === 'string') {
inflight[partId].joinChar = joinChar;
@ -627,14 +694,14 @@ module.exports = function(RED) {
}
}
}
// TODO: currently reuse the last received - add option to pick first received
group.msg = msg;
group.msg = Object.assign(group.msg, msg);
var tcnt = group.targetCount;
if (msg.hasOwnProperty("parts")) { tcnt = group.targetCount || msg.parts.count; }
if ((tcnt > 0 && group.currentCount >= tcnt) || msg.hasOwnProperty('complete')) {
completeSend(partId);
}
} catch(err) {
}
catch(err) {
console.log(err.stack);
}
});

View File

@ -17,7 +17,7 @@
module.exports = function(RED) {
"use strict";
var _max_kept_msgs_count = undefined;
var _max_kept_msgs_count;
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
@ -32,30 +32,20 @@ module.exports = function(RED) {
return _max_kept_msgs_count;
}
function eval_jsonata(node, code, val) {
try {
return RED.util.evaluateJSONataExpression(code, val);
}
catch (e) {
node.error(RED._("sort.invalid-exp"));
throw e;
}
}
function get_context_val(node, name, dval) {
var context = node.context();
var val = context.get(name);
if (val === undefined) {
context.set(name, dval);
return dval;
}
return val;
}
// function get_context_val(node, name, dval) {
// var context = node.context();
// var val = context.get(name);
// if (val === undefined) {
// context.set(name, dval);
// return dval;
// }
// return val;
// }
function SortNode(n) {
RED.nodes.createNode(this, n);
var node = this;
var pending = get_context_val(node, 'pending', {})
var pending = {};//get_context_val(node, 'pending', {})
var pending_count = 0;
var pending_id = 0;
var order = n.order || "ascending";
@ -71,16 +61,15 @@ module.exports = function(RED) {
key_exp = RED.util.prepareJSONataExpression(key_exp, this);
}
catch (e) {
node.error(RED._("sort.invalid-exp"));
node.error(RED._("sort.invalid-exp",{message:e.toString()}));
return;
}
}
var dir = (order === "ascending") ? 1 : -1;
var conv = as_num
? function(x) { return Number(x); }
: function(x) { return x; };
var conv = as_num ? function(x) { return Number(x); }
: function(x) { return x; };
function gen_comp(key) {
function generateComparisonFunction(key) {
return function(x, y) {
var xp = conv(key(x));
var yp = conv(key(y));
@ -90,74 +79,105 @@ module.exports = function(RED) {
};
}
function send_group(group) {
var key = key_is_exp
? function(msg) {
return eval_jsonata(node, key_exp, msg);
}
: function(msg) {
return RED.util.getMessageProperty(msg, key_prop);
};
var comp = gen_comp(key);
function sortMessageGroup(group) {
var promise;
var msgs = group.msgs;
try {
msgs.sort(comp);
}
catch (e) {
return; // not send when error
}
for (var i = 0; i < msgs.length; i++) {
var msg = msgs[i];
msg.parts.index = i;
node.send(msg);
}
}
function sort_payload(msg) {
var data = RED.util.getMessageProperty(msg, target_prop);
if (Array.isArray(data)) {
var key = key_is_exp
? function(elem) {
return eval_jsonata(node, key_exp, elem);
}
: function(elem) { return elem; };
var comp = gen_comp(key);
if (key_is_exp) {
var evaluatedDataPromises = msgs.map(msg => {
return new Promise((resolve,reject) => {
RED.util.evaluateJSONataExpression(key_exp, msg, (err, result) => {
if (err) {
reject(RED._("sort.invalid-exp",{message:err.toString()}));
} else {
resolve({
item: msg,
sortValue: result
})
}
});
})
});
promise = Promise.all(evaluatedDataPromises).then(evaluatedElements => {
// Once all of the sort keys are evaluated, sort by them
var comp = generateComparisonFunction(elem=>elem.sortValue);
return evaluatedElements.sort(comp).map(elem=>elem.item);
});
} else {
var key = function(msg) {
return ;
}
var comp = generateComparisonFunction(msg => RED.util.getMessageProperty(msg, key_prop));
try {
data.sort(comp);
msgs.sort(comp);
}
catch (e) {
return false;
return; // not send when error
}
return true;
promise = Promise.resolve(msgs);
}
return false;
return promise.then(msgs => {
for (var i = 0; i < msgs.length; i++) {
var msg = msgs[i];
msg.parts.index = i;
node.send(msg);
}
});
}
function check_parts(parts) {
if (parts.hasOwnProperty("id") &&
parts.hasOwnProperty("index")) {
return true;
function sortMessageProperty(msg) {
var data = RED.util.getMessageProperty(msg, target_prop);
if (Array.isArray(data)) {
if (key_is_exp) {
// key is an expression. Evaluated the expression for each item
// to get its sort value. As this could be async, need to do
// it first.
var evaluatedDataPromises = data.map(elem => {
return new Promise((resolve,reject) => {
RED.util.evaluateJSONataExpression(key_exp, elem, (err, result) => {
if (err) {
reject(RED._("sort.invalid-exp",{message:err.toString()}));
} else {
resolve({
item: elem,
sortValue: result
})
}
});
})
})
return Promise.all(evaluatedDataPromises).then(evaluatedElements => {
// Once all of the sort keys are evaluated, sort by them
// and reconstruct the original message item with the newly
// sorted values.
var comp = generateComparisonFunction(elem=>elem.sortValue);
data = evaluatedElements.sort(comp).map(elem=>elem.item);
RED.util.setMessageProperty(msg, target_prop,data);
return true;
})
} else {
var comp = generateComparisonFunction(elem=>elem);
try {
data.sort(comp);
} catch (e) {
return Promise.resolve(false);
}
return Promise.resolve(true);
}
}
return false;
return Promise.resolve(false);
}
function clear_pending() {
function removeOldestPending() {
var oldest;
var oldest_key;
for(var key in pending) {
node.log(RED._("sort.clear"), pending[key].msgs[0]);
delete pending[key];
}
pending_count = 0;
}
function remove_oldest_pending() {
var oldest = undefined;
var oldest_key = undefined;
for(var key in pending) {
var item = pending[key];
if((oldest === undefined) ||
(oldest.seq_no > item.seq_no)) {
oldest = item;
oldest_key = key;
if (pending.hasOwnProperty(key)) {
var item = pending[key];
if((oldest === undefined) ||
(oldest.seq_no > item.seq_no)) {
oldest = item;
oldest_key = key;
}
}
}
if(oldest !== undefined) {
@ -167,15 +187,18 @@ module.exports = function(RED) {
return 0;
}
function process_msg(msg) {
function processMessage(msg) {
if (target_is_prop) {
if (sort_payload(msg)) {
node.send(msg);
}
return;
sortMessageProperty(msg).then(send => {
if (send) {
node.send(msg);
}
}).catch(err => {
node.error(err,msg);
});
}
var parts = msg.parts;
if (!check_parts(parts)) {
if (!parts.hasOwnProperty("id") || !parts.hasOwnProperty("index")) {
return;
}
var gid = parts.id;
@ -195,23 +218,31 @@ module.exports = function(RED) {
pending_count++;
if (group.count === msgs.length) {
delete pending[gid]
send_group(group);
sortMessageGroup(group).catch(err => {
node.error(err,msg);
});
pending_count -= msgs.length;
}
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
pending_count -= remove_oldest_pending();
node.error(RED._("sort.too-many"), msg);
} else {
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
pending_count -= removeOldestPending();
node.error(RED._("sort.too-many"), msg);
}
}
}
this.on("input", function(msg) {
process_msg(msg);
processMessage(msg);
});
this.on("close", function() {
clear_pending();
})
for(var key in pending) {
if (pending.hasOwnProperty(key)) {
node.log(RED._("sort.clear"), pending[key].msgs[0]);
delete pending[key];
}
}
pending_count = 0; })
}
RED.nodes.registerType("sort", SortNode);

View File

@ -31,6 +31,8 @@
<dl class="message-properties">
<dt>payload<span class="property-type">object | string</span></dt>
<dd>A JavaScript object or JSON string.</dd>
<dt>schema<span class="property-type">object</span></dt>
<dd>An optional JSON Schema object to validate the payload against.</dd>
</dl>
<h3>Outputs</h3>
<dl class="message-properties">
@ -41,6 +43,9 @@
<li>If the input is a JavaScript object it creates a JSON string. The string can optionally be well-formatted.</li>
</ul>
</dd>
<dt>schemaError<span class="property-type">array</span></dt>
<dd>If JSON schema validation fails, the catch node will have a <code>schemaError</code> property
containing an array of errors.</dd>
</dl>
<h3>Details</h3>
<p>By default, the node operates on <code>msg.payload</code>, but can be configured
@ -53,6 +58,8 @@
receives a String, no further checks will be made of the property. It will
not check the String is valid JSON nor will it reformat it if the format option
is selected.</p>
<p>For more details about JSON Schema you can consult the specification
<a href="http://json-schema.org/latest/json-schema-validation.html">here</a>.</p>
</script>
<script type="text/javascript">

View File

@ -16,21 +16,52 @@
module.exports = function(RED) {
"use strict";
const Ajv = require('ajv');
const ajv = new Ajv({allErrors: true, schemaId: 'auto'});
ajv.addMetaSchema(require('ajv/lib/refs/json-schema-draft-04.json'));
function JSONNode(n) {
RED.nodes.createNode(this,n);
this.indent = n.pretty ? 4 : 0;
this.action = n.action||"";
this.property = n.property||"payload";
this.schema = null;
this.compiledSchema = null;
var node = this;
this.on("input", function(msg) {
var validate = false;
if (msg.schema) {
// If input schema is different, re-compile it
if (JSON.stringify(this.schema) != JSON.stringify(msg.schema)) {
try {
this.compiledSchema = ajv.compile(msg.schema);
this.schema = msg.schema;
} catch(e) {
this.schema = null;
this.compiledSchema = null;
node.error(RED._("json.errors.schema-error-compile"), msg);
return;
}
}
validate = true;
}
var value = RED.util.getMessageProperty(msg,node.property);
if (value !== undefined) {
if (typeof value === "string") {
if (node.action === "" || node.action === "obj") {
try {
RED.util.setMessageProperty(msg,node.property,JSON.parse(value));
node.send(msg);
if (validate) {
if (this.compiledSchema(msg[node.property])) {
node.send(msg);
} else {
msg.schemaError = this.compiledSchema.errors;
node.error(`${RED._("json.errors.schema-error")}: ${ajv.errorsText(this.compiledSchema.errors)}`, msg);
}
} else {
node.send(msg);
}
}
catch(e) { node.error(e.message,msg); }
} else {
@ -41,8 +72,19 @@ module.exports = function(RED) {
if (node.action === "" || node.action === "str") {
if (!Buffer.isBuffer(value)) {
try {
RED.util.setMessageProperty(msg,node.property,JSON.stringify(value,null,node.indent));
node.send(msg);
if (validate) {
if (this.compiledSchema(value)) {
RED.util.setMessageProperty(msg,node.property,JSON.stringify(value,null,node.indent));
node.send(msg);
} else {
msg.schemaError = this.compiledSchema.errors;
node.error(`${RED._("json.errors.schema-error")}: ${ajv.errorsText(this.compiledSchema.errors)}`, msg);
}
} else {
RED.util.setMessageProperty(msg,node.property,JSON.stringify(value,null,node.indent));
node.send(msg);
}
}
catch(e) { node.error(RED._("json.errors.dropped-error")); }
}

View File

@ -33,6 +33,7 @@
"flow"
],
"dependencies": {
"ajv": "6.5.1",
"basic-auth": "2.0.0",
"bcryptjs": "2.4.3",
"body-parser": "1.18.3",

View File

@ -143,7 +143,13 @@ LocalFileSystem.prototype.open = function(){
self.cache.set(scope,key,data[key]);
})
});
})
}).catch(function(err){
if(err.code == 'ENOENT') {
return fs.mkdir(self.storageBaseDir);
}else{
return Promise.reject(err);
}
});
} else {
return Promise.resolve();
}

View File

@ -335,7 +335,7 @@ var parseContextStore = function(key) {
}
function evaluateNodeProperty(value, type, node, msg, callback) {
var result;
var result = value;
if (type === 'str') {
result = ""+value;
} else if (type === 'num') {
@ -350,7 +350,16 @@ function evaluateNodeProperty(value, type, node, msg, callback) {
var data = JSON.parse(value);
result = Buffer.from(data);
} else if (type === 'msg' && msg) {
result = getMessageProperty(msg,value);
try {
result = getMessageProperty(msg,value);
} catch(err) {
if (callback) {
callback(err);
} else {
throw err;
}
return;
}
} else if ((type === 'flow' || type === 'global') && node) {
var contextKey = parseContextStore(value);
result = node.context()[type].get(contextKey.key,contextKey.store,callback);
@ -366,9 +375,9 @@ function evaluateNodeProperty(value, type, node, msg, callback) {
result = evaluteEnvProperty(value);
}
if (callback) {
callback(result);
callback(null,result);
} else {
return value;
return result;
}
}
@ -385,15 +394,44 @@ function prepareJSONataExpression(value,node) {
})
expr.registerFunction('clone', cloneMessage, '<(oa)-:o>');
expr._legacyMode = /(^|[^a-zA-Z0-9_'"])msg([^a-zA-Z0-9_'"]|$)/.test(value);
expr._node = node;
return expr;
}
function evaluateJSONataExpression(expr,msg) {
function evaluateJSONataExpression(expr,msg,callback) {
var context = msg;
if (expr._legacyMode) {
context = {msg:msg};
}
return expr.evaluate(context);
var bindings = {};
if (callback) {
// If callback provided, need to override the pre-assigned sync
// context functions to be their async variants
bindings.flowContext = function(val) {
return new Promise((resolve,reject) => {
expr._node.context().flow.get(val, function(err,value) {
if (err) {
reject(err);
} else {
resolve(value);
}
})
});
}
bindings.globalContext = function(val) {
return new Promise((resolve,reject) => {
expr._node.context().global.get(val, function(err,value) {
if (err) {
reject(err);
} else {
resolve(value);
}
})
});
}
}
return expr.evaluate(context, bindings, callback);
}

View File

@ -62,10 +62,11 @@ exports.config = {
//
browserName: 'chrome',
chromeOptions: {
// Runs tests without opening a broser.
args: ['--headless', '--disable-gpu', 'window-size=1920,1080'],
// Runs tests with opening a broser.
// args: ['--disable-gpu'],
args: process.env.NODE_RED_NON_HEADLESS
// Runs tests with opening a browser.
? ['--disable-gpu']
// Runs tests without opening a browser.
: ['--headless', '--disable-gpu', 'window-size=1920,1080']
},
}],
//

View File

@ -288,7 +288,7 @@ describe('trigger node', function() {
it('should be able to return things from flow and global context variables', function(done) {
var spy = sinon.stub(RED.util, 'evaluateNodeProperty',
function(arg1, arg2, arg3, arg4) { return arg1; }
function(arg1, arg2, arg3, arg4, arg5) { if (arg5) { arg5(null, arg1) } else { return arg1; } }
);
var flow = [{"id":"n1", "type":"trigger", "name":"triggerNode", op1:"foo", op1type:"flow", op2:"bar", op2type:"global", duration:"20", wires:[["n2"]] },
{id:"n2", type:"helper"} ];
@ -386,7 +386,7 @@ describe('trigger node', function() {
it('should be able to extend the delay', function(done) {
this.timeout(5000); // add extra time for flake
var spy = sinon.stub(RED.util, 'evaluateNodeProperty',
function(arg1, arg2, arg3, arg4) { return arg1; }
function(arg1, arg2, arg3, arg4, arg5) { if (arg5) { arg5(null, arg1) } else { return arg1; } }
);
var flow = [{"id":"n1", "type":"trigger", "name":"triggerNode", extend:"true", op1type:"flow", op1:"foo", op2:"bar", op2type:"global", duration:"100", wires:[["n2"]] },
{id:"n2", type:"helper"} ];
@ -428,12 +428,10 @@ describe('trigger node', function() {
n2.on("input", function(msg) {
try {
if (c === 0) {
console.log(c,Date.now() - ss,msg);
msg.should.have.a.property("payload", "Hello");
c += 1;
}
else {
console.log(c,Date.now() - ss,msg);
msg.should.have.a.property("payload", "World");
(Date.now() - ss).should.be.greaterThan(150);
done();

View File

@ -460,7 +460,7 @@ describe('switch Node', function() {
} catch(err) {
done(err);
}
},100)
},500)
});
});
@ -821,4 +821,24 @@ describe('switch Node', function() {
n1.receive({payload:1, parts:{index:0, count:4, id:222}});
});
});
it('should handle invalid jsonata expression', function(done) {
var flow = [{id:"switchNode1",type:"switch",name:"switchNode",property:"$invalidExpression(payload)",propertyType:"jsonata",rules:[{"t":"btwn","v":"$sqrt(16)","vt":"jsonata","v2":"$sqrt(36)","v2t":"jsonata"}],checkall:true,outputs:1,wires:[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(switchNode, flow, function() {
var n1 = helper.getNode("switchNode1");
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "switch";
});
var evt = logEvents[0][0];
evt.should.have.property('id', "switchNode1");
evt.should.have.property('type', "switch");
done();
}, 150);
n1.receive({payload:1});
});
});
});

View File

@ -15,6 +15,7 @@
**/
var should = require("should");
var sinon = require("sinon");
var changeNode = require("../../../../nodes/core/logic/15-change.js");
var helper = require("node-red-node-test-helper");
@ -454,6 +455,28 @@ describe('change Node', function() {
});
});
it('reports invalid jsonata expression', function(done) {
var flow = [{"id":"changeNode1","type":"change",rules:[{"t":"set","p":"payload","to":"$invalid(payload)","tot":"jsonata"}],"name":"changeNode","wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(changeNode, flow, function() {
var changeNode1 = helper.getNode("changeNode1");
var helperNode1 = helper.getNode("helperNode1");
sinon.spy(changeNode1,"error");
helperNode1.on("input", function(msg) {
done("Invalid jsonata expression passed message through");
});
changeNode1.receive({payload:"Hello World!"});
setTimeout(function() {
try {
changeNode1.error.called.should.be.true();
done();
} catch(err) {
done(err);
}
},50);
});
});
});
describe('#change', function() {
it('changes the value of the message property', function(done) {

View File

@ -247,4 +247,85 @@ describe('JSON node', function() {
});
});
});
it('should pass an object if provided a valid JSON string and schema', function(done) {
var flow = [{id:"jn1",type:"json",wires:[["jn2"]]},
{id:"jn2", type:"helper"}];
helper.load(jsonNode, flow, function() {
var jn1 = helper.getNode("jn1");
var jn2 = helper.getNode("jn2");
jn2.on("input", function(msg) {
should.equal(msg.payload.number, 3);
should.equal(msg.payload.string, "allo");
done();
});
var jsonString = '{"number": 3, "string": "allo"}';
var schema = {title: "testSchema", type: "object", properties: {number: {type: "number"}, string: {type: "string" }}};
jn1.receive({payload:jsonString, schema:schema});
});
});
it('should pass a string if provided a valid object and schema', function(done) {
var flow = [{id:"jn1",type:"json",wires:[["jn2"]]},
{id:"jn2", type:"helper"}];
helper.load(jsonNode, flow, function() {
var jn1 = helper.getNode("jn1");
var jn2 = helper.getNode("jn2");
jn2.on("input", function(msg) {
should.equal(msg.payload, '{"number":3,"string":"allo"}');
done();
});
var obj = {"number": 3, "string": "allo"};
var schema = {title: "testSchema", type: "object", properties: {number: {type: "number"}, string: {type: "string" }}};
jn1.receive({payload:obj, schema:schema});
});
});
it('should log an error if passed an invalid object and valid schema', function(done) {
var flow = [{id:"jn1",type:"json",wires:[["jn2"]]},
{id:"jn2", type:"helper"}];
helper.load(jsonNode, flow, function() {
try {
var jn1 = helper.getNode("jn1");
var jn2 = helper.getNode("jn2");
var schema = {title: "testSchema", type: "object", properties: {number: {type: "number"}, string: {type: "string" }}};
var obj = {"number": "foo", "string": 3};
jn1.receive({payload:obj, schema:schema});
var logEvents = helper.log().args.filter(function(evt) {
return evt[0].type == "json";
});
logEvents.should.have.length(1);
logEvents[0][0].should.have.a.property('msg');
logEvents[0][0].msg.should.equal("json.errors.schema-error: data.number should be number, data.string should be string");
logEvents[0][0].should.have.a.property('level',helper.log().ERROR);
done();
} catch(err) {
done(err);
}
});
});
it('should log an error if passed a valid object and invalid schema', function(done) {
var flow = [{id:"jn1",type:"json",wires:[["jn2"]]},
{id:"jn2", type:"helper"}];
helper.load(jsonNode, flow, function() {
try {
var jn1 = helper.getNode("jn1");
var jn2 = helper.getNode("jn2");
var schema = "garbage";
var obj = {"number": "foo", "string": 3};
jn1.receive({payload:obj, schema:schema});
var logEvents = helper.log().args.filter(function(evt) {
return evt[0].type == "json";
});
logEvents.should.have.length(1);
logEvents[0][0].should.have.a.property('msg');
logEvents[0][0].msg.should.equal("json.errors.schema-error-compile");
logEvents[0][0].should.have.a.property('level',helper.log().ERROR);
done();
} catch(err) {
done(err);
}
});
});
});

View File

@ -307,6 +307,10 @@ describe("red/util", function() {
},{});
result.should.eql("123");
});
it('returns null', function() {
var result = util.evaluateNodeProperty(null,'null');
(result === null).should.be.true();
})
describe('environment variable', function() {
before(function() {
process.env.NR_TEST_A = "foo";
@ -454,6 +458,30 @@ describe("red/util", function() {
var result = util.evaluateJSONataExpression(expr,{payload:"hello"});
should.not.exist(result);
});
it('handles async flow context access', function(done) {
var expr = util.prepareJSONataExpression('$flowContext("foo")',{context:function() { return {flow:{get: function(key,callback) { setTimeout(()=>{callback(null,{'foo':'bar'}[key])},10)}}}}});
util.evaluateJSONataExpression(expr,{payload:"hello"},function(err,value) {
try {
should.not.exist(err);
value.should.eql("bar");
done();
} catch(err2) {
done(err2);
}
});
})
it('handles async global context access', function(done) {
var expr = util.prepareJSONataExpression('$globalContext("foo")',{context:function() { return {global:{get: function(key,callback) { setTimeout(()=>{callback(null,{'foo':'bar'}[key])},10)}}}}});
util.evaluateJSONataExpression(expr,{payload:"hello"},function(err,value) {
try {
should.not.exist(err);
value.should.eql("bar");
done();
} catch(err2) {
done(err2);
}
});
})
});