diff --git a/Gruntfile.js b/Gruntfile.js index faf68f100..508fadc58 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -24,6 +24,10 @@ module.exports = function(grunt) { nodemonArgs.push(flowFile); } + var nonHeadless = grunt.option('non-headless'); + if (nonHeadless) { + process.env.NODE_RED_NON_HEADLESS = 'true'; + } grunt.initConfig({ pkg: grunt.file.readJSON('package.json'), paths: { diff --git a/editor/js/ui/common/typedInput.js b/editor/js/ui/common/typedInput.js index 74bbeb9ac..59db274f7 100644 --- a/editor/js/ui/common/typedInput.js +++ b/editor/js/ui/common/typedInput.js @@ -15,16 +15,11 @@ **/ (function($) { var contextParse = function(v) { - var parts = {}; - var m = /^#:\((\S+?)\)::(.*)$/.exec(v); - if (m) { - parts.option = m[1]; - parts.value = m[2]; - } else { - parts.value = v; - parts.option = RED.settings.context.default; + var parts = RED.utils.parseContextKey(v); + return { + option: parts.store, + value: parts.key } - return parts; } var contextExport = function(v,opt) { if (!opt) { diff --git a/editor/js/ui/utils.js b/editor/js/ui/utils.js index e64a33996..7da56ab78 100644 --- a/editor/js/ui/utils.js +++ b/editor/js/ui/utils.js @@ -828,6 +828,20 @@ RED.utils = (function() { return payload; } + function parseContextKey(key) { + var parts = {}; + var m = /^#:\((\S+?)\)::(.*)$/.exec(key); + if (m) { + parts.store = m[1]; + parts.key = m[2]; + } else { + parts.key = key; + if (RED.settings.context) { + parts.store = RED.settings.context.default; + } + } + return parts; + } return { createObjectElement: buildMessageElement, @@ -839,6 +853,7 @@ RED.utils = (function() { getNodeIcon: getNodeIcon, getNodeLabel: getNodeLabel, addSpinnerOverlay: addSpinnerOverlay, - decodeObject: decodeObject + decodeObject: decodeObject, + parseContextKey: parseContextKey } })(); diff --git a/nodes/core/core/20-inject.js b/nodes/core/core/20-inject.js index 9e6eb62a8..c0d9e0c2f 100644 --- a/nodes/core/core/20-inject.js +++ b/nodes/core/core/20-inject.js @@ -63,21 +63,33 @@ module.exports = function(RED) { } this.on("input",function(msg) { - try { - msg.topic = this.topic; - if ( (this.payloadType == null && this.payload === "") || this.payloadType === "date") { - msg.payload = Date.now(); - } else if (this.payloadType == null) { - msg.payload = this.payload; - } else if (this.payloadType === 'none') { - msg.payload = ""; - } else { - msg.payload = RED.util.evaluateNodeProperty(this.payload,this.payloadType,this,msg); + msg.topic = this.topic; + if (this.payloadType !== 'flow' && this.payloadType !== 'global') { + try { + if ( (this.payloadType == null && this.payload === "") || this.payloadType === "date") { + msg.payload = Date.now(); + } else if (this.payloadType == null) { + msg.payload = this.payload; + } else if (this.payloadType === 'none') { + msg.payload = ""; + } else { + msg.payload = RED.util.evaluateNodeProperty(this.payload,this.payloadType,this,msg); + } + this.send(msg); + msg = null; + } catch(err) { + this.error(err,msg); } - this.send(msg); - msg = null; - } catch(err) { - this.error(err,msg); + } else { + RED.util.evaluateNodeProperty(this.payload,this.payloadType,this,msg, function(err,res) { + if (err) { + node.error(err,msg); + } else { + msg.payload = res; + node.send(msg); + } + + }); } }); } diff --git a/nodes/core/core/89-trigger.js b/nodes/core/core/89-trigger.js index f5fd28b7f..48a595ccc 100644 --- a/nodes/core/core/89-trigger.js +++ b/nodes/core/core/89-trigger.js @@ -76,8 +76,43 @@ module.exports = function(RED) { var node = this; node.topics = {}; - this.on("input", function(msg) { + var pendingMessages = []; + var activeMessagePromise = null; + var processMessageQueue = function(msg) { + if (msg) { + // A new message has arrived - add it to the message queue + pendingMessages.push(msg); + if (activeMessagePromise !== null) { + // The node is currently processing a message, so do nothing + // more with this message + return; + } + } + if (pendingMessages.length === 0) { + // There are no more messages to process, clear the active flag + // and return + activeMessagePromise = null; + return; + } + + // There are more messages to process. Get the next message and + // start processing it. Recurse back in to check for any more + var nextMsg = pendingMessages.shift(); + activeMessagePromise = processMessage(nextMsg) + .then(processMessageQueue) + .catch((err) => { + node.error(err,nextMsg); + return processMessageQueue(); + }); + } + + this.on('input', function(msg) { + processMessageQueue(msg); + }); + + var processMessage = function(msg) { var topic = msg.topic || "_none"; + var promise; if (node.bytopic === "all") { topic = "_none"; } node.topics[topic] = node.topics[topic] || {}; if (msg.hasOwnProperty("reset") || ((node.reset !== '') && msg.hasOwnProperty("payload") && (msg.payload !== null) && msg.payload.toString && (msg.payload.toString() == node.reset)) ) { @@ -88,48 +123,88 @@ module.exports = function(RED) { } else { if (((!node.topics[topic].tout) && (node.topics[topic].tout !== 0)) || (node.loop === true)) { + promise = Promise.resolve(); if (node.op2type === "pay" || node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); } else if (node.op2Templated) { node.topics[topic].m2 = mustache.render(node.op2,msg); } else if (node.op2type !== "nul") { - node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg); - } - - if (node.op1type === "pay") { } - else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); } - else if (node.op1type !== "nul") { - msg.payload = RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg); - } - - if (node.duration === 0) { node.topics[topic].tout = 0; } - else if (node.loop === true) { - /* istanbul ignore else */ - if (node.topics[topic].tout) { clearInterval(node.topics[topic].tout); } - /* istanbul ignore else */ - if (node.op1type !== "nul") { - var msg2 = RED.util.cloneMessage(msg); - node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, node.duration); - } - } - else { - if (!node.topics[topic].tout) { - node.topics[topic].tout = setTimeout(function() { - var msg2 = null; - if (node.op2type !== "nul") { - msg2 = RED.util.cloneMessage(msg); - if (node.op2type === "flow" || node.op2type === "global") { - node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg); - } - msg2.payload = node.topics[topic].m2; - delete node.topics[topic]; - node.send(msg2); + promise = new Promise((resolve,reject) => { + RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => { + if (err) { + reject(err); + } else { + node.topics[topic].m2 = value; + resolve(); } - else { delete node.topics[topic]; } - node.status({}); - }, node.duration); - } + }); + }); } - node.status({fill:"blue",shape:"dot",text:" "}); - if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); } + + return promise.then(() => { + promise = Promise.resolve(); + if (node.op1type === "pay") { } + else if (node.op1Templated) { msg.payload = mustache.render(node.op1,msg); } + else if (node.op1type !== "nul") { + promise = new Promise((resolve,reject) => { + RED.util.evaluateNodeProperty(node.op1,node.op1type,node,msg,(err,value) => { + if (err) { + reject(err); + } else { + msg.payload = value; + resolve(); + } + }); + }); + } + return promise.then(() => { + if (node.duration === 0) { node.topics[topic].tout = 0; } + else if (node.loop === true) { + /* istanbul ignore else */ + if (node.topics[topic].tout) { clearInterval(node.topics[topic].tout); } + /* istanbul ignore else */ + if (node.op1type !== "nul") { + var msg2 = RED.util.cloneMessage(msg); + node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, node.duration); + } + } + else { + if (!node.topics[topic].tout) { + node.topics[topic].tout = setTimeout(function() { + var msg2 = null; + if (node.op2type !== "nul") { + var promise = Promise.resolve(); + msg2 = RED.util.cloneMessage(msg); + if (node.op2type === "flow" || node.op2type === "global") { + promise = new Promise((resolve,reject) => { + RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => { + if (err) { + reject(err); + } else { + node.topics[topic].m2 = value; + resolve(); + } + }); + }); + } + promise.then(() => { + msg2.payload = node.topics[topic].m2; + delete node.topics[topic]; + node.send(msg2); + node.status({}); + }).catch(err => { + node.error(err); + }); + } else { + delete node.topics[topic]; + node.status({}); + } + + }, node.duration); + } + } + node.status({fill:"blue",shape:"dot",text:" "}); + if (node.op1type !== "nul") { node.send(RED.util.cloneMessage(msg)); } + }); + }); } else if ((node.extend === "true" || node.extend === true) && (node.duration > 0)) { /* istanbul ignore else */ @@ -138,25 +213,43 @@ module.exports = function(RED) { if (node.topics[topic].tout) { clearTimeout(node.topics[topic].tout); } node.topics[topic].tout = setTimeout(function() { var msg2 = null; + var promise = Promise.resolve(); + if (node.op2type !== "nul") { if (node.op2type === "flow" || node.op2type === "global") { - node.topics[topic].m2 = RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg); - } - if (node.topics[topic] !== undefined) { - msg2 = RED.util.cloneMessage(msg); - msg2.payload = node.topics[topic].m2; + promise = new Promise((resolve,reject) => { + RED.util.evaluateNodeProperty(node.op2,node.op2type,node,msg,(err,value) => { + if (err) { + reject(err); + } else { + node.topics[topic].m2 = value; + resolve(); + } + }); + }); } } - delete node.topics[topic]; - node.status({}); - node.send(msg2); + promise.then(() => { + if (node.op2type !== "nul") { + if (node.topics[topic] !== undefined) { + msg2 = RED.util.cloneMessage(msg); + msg2.payload = node.topics[topic].m2; + } + } + delete node.topics[topic]; + node.status({}); + node.send(msg2); + }).catch(err => { + node.error(err); + }); }, node.duration); } else { if (node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); } } } - }); + return Promise.resolve(); + } this.on("close", function() { for (var t in node.topics) { /* istanbul ignore else */ diff --git a/nodes/core/hardware/nrgpio.py b/nodes/core/hardware/nrgpio.py index 6bbcddbbe..0cde0e4df 100755 --- a/nodes/core/hardware/nrgpio.py +++ b/nodes/core/hardware/nrgpio.py @@ -23,10 +23,6 @@ from time import sleep bounce = 25; -if sys.version_info >= (3,0): - print("Sorry - currently only configured to work with python 2.x") - sys.exit(1) - if len(sys.argv) > 2: cmd = sys.argv[1].lower() pin = int(sys.argv[2]) @@ -34,7 +30,7 @@ if len(sys.argv) > 2: GPIO.setwarnings(False) if cmd == "pwm": - #print "Initialised pin "+str(pin)+" to PWM" + #print("Initialised pin "+str(pin)+" to PWM") try: freq = int(sys.argv[3]) except: @@ -54,10 +50,10 @@ if len(sys.argv) > 2: GPIO.cleanup(pin) sys.exit(0) except Exception as ex: - print "bad data: "+data + print("bad data: "+data) elif cmd == "buzz": - #print "Initialised pin "+str(pin)+" to Buzz" + #print("Initialised pin "+str(pin)+" to Buzz") GPIO.setup(pin,GPIO.OUT) p = GPIO.PWM(pin, 100) p.stop() @@ -76,10 +72,10 @@ if len(sys.argv) > 2: GPIO.cleanup(pin) sys.exit(0) except Exception as ex: - print "bad data: "+data + print("bad data: "+data) elif cmd == "out": - #print "Initialised pin "+str(pin)+" to OUT" + #print("Initialised pin "+str(pin)+" to OUT") GPIO.setup(pin,GPIO.OUT) if len(sys.argv) == 4: GPIO.output(pin,int(sys.argv[3])) @@ -103,11 +99,11 @@ if len(sys.argv) > 2: GPIO.output(pin,data) elif cmd == "in": - #print "Initialised pin "+str(pin)+" to IN" + #print("Initialised pin "+str(pin)+" to IN") bounce = float(sys.argv[4]) def handle_callback(chan): sleep(bounce/1000.0) - print GPIO.input(chan) + print(GPIO.input(chan)) if sys.argv[3].lower() == "up": GPIO.setup(pin,GPIO.IN,GPIO.PUD_UP) @@ -116,7 +112,7 @@ if len(sys.argv) > 2: else: GPIO.setup(pin,GPIO.IN) - print GPIO.input(pin) + print(GPIO.input(pin)) GPIO.add_event_detect(pin, GPIO.BOTH, callback=handle_callback, bouncetime=int(bounce)) while True: @@ -129,7 +125,7 @@ if len(sys.argv) > 2: sys.exit(0) elif cmd == "byte": - #print "Initialised BYTE mode - "+str(pin)+ + #print("Initialised BYTE mode - "+str(pin)+) list = [7,11,13,12,15,16,18,22] GPIO.setup(list,GPIO.OUT) @@ -152,7 +148,7 @@ if len(sys.argv) > 2: GPIO.output(list[bit], data & mask) elif cmd == "borg": - #print "Initialised BORG mode - "+str(pin)+ + #print("Initialised BORG mode - "+str(pin)+) GPIO.setup(11,GPIO.OUT) GPIO.setup(13,GPIO.OUT) GPIO.setup(15,GPIO.OUT) @@ -190,7 +186,7 @@ if len(sys.argv) > 2: button = ord( buf[0] ) & pin # mask out just the required button(s) if button != oldbutt: # only send if changed oldbutt = button - print button + print(button) while True: try: @@ -215,7 +211,7 @@ if len(sys.argv) > 2: # type,code,value print("%u,%u" % (code, value)) event = file.read(EVENT_SIZE) - print "0,0" + print("0,0") file.close() sys.exit(0) except: @@ -225,14 +221,14 @@ if len(sys.argv) > 2: elif len(sys.argv) > 1: cmd = sys.argv[1].lower() if cmd == "rev": - print GPIO.RPI_REVISION + print(GPIO.RPI_REVISION) elif cmd == "ver": - print GPIO.VERSION + print(GPIO.VERSION) elif cmd == "info": - print GPIO.RPI_INFO + print(GPIO.RPI_INFO) else: - print "Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}" - print " only ver (gpio version) and info (board information) accept no pin parameter." + print("Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}") + print(" only ver (gpio version) and info (board information) accept no pin parameter.") else: - print "Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}" + print("Bad parameters - in|out|pwm|buzz|byte|borg|mouse|kbd|ver|info {pin} {value|up|down}") diff --git a/nodes/core/locales/en-US/messages.json b/nodes/core/locales/en-US/messages.json index d59f3d1bf..6e4828439 100644 --- a/nodes/core/locales/en-US/messages.json +++ b/nodes/core/locales/en-US/messages.json @@ -697,7 +697,9 @@ "errors": { "dropped-object": "Ignored non-object payload", "dropped": "Ignored unsupported payload type", - "dropped-error": "Failed to convert payload" + "dropped-error": "Failed to convert payload", + "schema-error": "JSON Schema error", + "schema-error-compile": "JSON Schema error: failed to compile schema" }, "label": { "o2j": "Object to JSON options", @@ -924,8 +926,8 @@ "ascending" : "ascending", "descending" : "descending", "as-number" : "as number", - "invalid-exp" : "invalid JSONata expression in sort node", - "too-many" : "too many pending messages in sort node", + "invalid-exp" : "Invalid JSONata expression in sort node: __message__", + "too-many" : "Too many pending messages in sort node", "clear" : "clear pending message in sort node" }, "batch" : { diff --git a/nodes/core/logic/10-switch.html b/nodes/core/logic/10-switch.html index f4ffc7dea..a57ce2187 100644 --- a/nodes/core/logic/10-switch.html +++ b/nodes/core/logic/10-switch.html @@ -99,11 +99,17 @@ } return v; } + function prop2name(key) { + var result = RED.utils.parseContextKey(key); + return result.key; + } function getValueLabel(t,v) { if (t === 'str') { return '"'+clipValueLength(v)+'"'; - } else if (t === 'msg' || t==='flow' || t==='global') { + } else if (t === 'msg') { return t+"."+clipValueLength(v); + } else if (t === 'flow' || t === 'global') { + return t+"."+clipValueLength(prop2name(v)); } return clipValueLength(v); } diff --git a/nodes/core/logic/10-switch.js b/nodes/core/logic/10-switch.js index 919fe13ac..df5a52023 100644 --- a/nodes/core/logic/10-switch.js +++ b/nodes/core/logic/10-switch.js @@ -59,21 +59,157 @@ module.exports = function(RED) { 'else': function(a) { return a === true; } }; - var _max_kept_msgs_count = undefined; + var _maxKeptCount; - function max_kept_msgs_count(node) { - if (_max_kept_msgs_count === undefined) { + function getMaxKeptCount() { + if (_maxKeptCount === undefined) { var name = "nodeMessageBufferMaxLength"; if (RED.settings.hasOwnProperty(name)) { - _max_kept_msgs_count = RED.settings[name]; + _maxKeptCount = RED.settings[name]; } else { - _max_kept_msgs_count = 0; + _maxKeptCount = 0; } } - return _max_kept_msgs_count; + return _maxKeptCount; } + function getProperty(node,msg) { + return new Promise((resolve,reject) => { + if (node.propertyType === 'jsonata') { + RED.util.evaluateJSONataExpression(node.property,msg,(err,value) => { + if (err) { + reject(RED._("switch.errors.invalid-expr",{error:err.message})); + } else { + resolve(value); + } + }); + } else { + RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg,(err,value) => { + if (err) { + resolve(undefined); + } else { + resolve(value); + } + }); + } + }); + } + + function getV1(node,msg,rule,hasParts) { + return new Promise( (resolve,reject) => { + if (rule.vt === 'prev') { + resolve(node.previousValue); + } else if (rule.vt === 'jsonata') { + var exp = rule.v; + if (rule.t === 'jsonata_exp') { + if (hasParts) { + exp.assign("I", msg.parts.index); + exp.assign("N", msg.parts.count); + } + } + RED.util.evaluateJSONataExpression(exp,msg,(err,value) => { + if (err) { + reject(RED._("switch.errors.invalid-expr",{error:err.message})); + } else { + resolve(value); + } + }); + } else if (rule.vt === 'json') { + resolve("json"); + } else if (rule.vt === 'null') { + resolve("null"); + } else { + RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg, function(err,value) { + if (err) { + resolve(undefined); + } else { + resolve(value); + } + }); + } + }); + } + + function getV2(node,msg,rule) { + return new Promise((resolve,reject) => { + var v2 = rule.v2; + if (rule.v2t === 'prev') { + resolve(node.previousValue); + } else if (rule.v2t === 'jsonata') { + RED.util.evaluateJSONataExpression(rule.v2,msg,(err,value) => { + if (err) { + reject(RED._("switch.errors.invalid-expr",{error:err.message})); + } else { + resolve(value); + } + }); + } else if (typeof v2 !== 'undefined') { + RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg, function(err,value) { + if (err) { + resolve(undefined); + } else { + resolve(value); + } + }); + } else { + resolve(v2); + } + }) + } + + function applyRule(node, msg, property, state) { + return new Promise((resolve,reject) => { + + var rule = node.rules[state.currentRule]; + var v1,v2; + + getV1(node,msg,rule,state.hasParts).then(value => { + v1 = value; + }).then(()=>getV2(node,msg,rule)).then(value => { + v2 = value; + }).then(() => { + if (rule.t == "else") { + property = state.elseflag; + state.elseflag = true; + } + if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) { + state.onward.push(msg); + state.elseflag = false; + if (node.checkall == "false") { + return resolve(false); + } + } else { + state.onward.push(null); + } + resolve(state.currentRule < node.rules.length - 1); + }); + }) + } + + function applyRules(node, msg, property,state) { + if (!state) { + state = { + currentRule: 0, + elseflag: true, + onward: [], + hasParts: msg.hasOwnProperty("parts") && + msg.parts.hasOwnProperty("id") && + msg.parts.hasOwnProperty("index") + } + } + return applyRule(node,msg,property,state).then(hasMore => { + if (hasMore) { + state.currentRule++; + return applyRules(node,msg,property,state); + } else { + node.previousValue = property; + return state.onward; + } + }); + } + + function SwitchNode(n) { RED.nodes.createNode(this, n); this.rules = n.rules || []; @@ -94,10 +230,10 @@ module.exports = function(RED) { var node = this; var valid = true; var repair = n.repair; - var needs_count = repair; + var needsCount = repair; for (var i=0; i 0) && (pending_count > max_msgs)) { - clear_pending(); + pendingCount++; + var max_msgs = getMaxKeptCount(); + if ((max_msgs > 0) && (pendingCount > max_msgs)) { + clearPending(); node.error(RED._("switch.errors.too-many"), msg); } if (parts.hasOwnProperty("count")) { @@ -170,32 +306,29 @@ module.exports = function(RED) { return group; } - function del_group_in(id, group) { - pending_count -= group.msgs.length; - delete pending_in[id]; - } - function add2pending_in(msg) { + function addMessageToPending(msg) { var parts = msg.parts; - if (parts.hasOwnProperty("id") && - parts.hasOwnProperty("index")) { - var group = add2group_in(parts.id, msg, parts); - var msgs = group.msgs; - var count = group.count; - if (count === msgs.length) { - for (var i = 0; i < msgs.length; i++) { - var msg = msgs[i]; + // We've already checked the msg.parts has the require bits + var group = addMessageToGroup(parts.id, msg, parts); + var msgs = group.msgs; + var count = group.count; + if (count === msgs.length) { + // We have a complete group - send the individual parts + return msgs.reduce((promise, msg) => { + return promise.then((result) => { msg.parts.count = count; - process_msg(msg, false); - } - del_group_in(parts.id, group); - } - return true; + return processMessage(msg, false); + }) + }, Promise.resolve()).then( () => { + pendingCount -= group.msgs.length; + delete pendingIn[parts.id]; + }); } - return false; + return Promise.resolve(); } - function send_group(onwards, port_count) { + function sendGroup(onwards, port_count) { var counts = new Array(port_count).fill(0); for (var i = 0; i < onwards.length; i++) { var onward = onwards[i]; @@ -230,141 +363,104 @@ module.exports = function(RED) { } } - function send2ports(onward, msg) { + function sendGroupMessages(onward, msg) { var parts = msg.parts; var gid = parts.id; received[gid] = ((gid in received) ? received[gid] : 0) +1; var send_ok = (received[gid] === parts.count); - if (!(gid in pending_out)) { - pending_out[gid] = { + if (!(gid in pendingOut)) { + pendingOut[gid] = { onwards: [] }; } - var group = pending_out[gid]; + var group = pendingOut[gid]; var onwards = group.onwards; onwards.push(onward); - pending_count++; + pendingCount++; if (send_ok) { - send_group(onwards, onward.length, msg); - pending_count -= onward.length; - delete pending_out[gid]; + sendGroup(onwards, onward.length, msg); + pendingCount -= onward.length; + delete pendingOut[gid]; delete received[gid]; } - var max_msgs = max_kept_msgs_count(node); - if ((max_msgs > 0) && (pending_count > max_msgs)) { - clear_pending(); + var max_msgs = getMaxKeptCount(); + if ((max_msgs > 0) && (pendingCount > max_msgs)) { + clearPending(); node.error(RED._("switch.errors.too-many"), msg); } } - function msg_has_parts(msg) { - if (msg.hasOwnProperty("parts")) { - var parts = msg.parts; - return (parts.hasOwnProperty("id") && - parts.hasOwnProperty("index")); + + + + + function processMessage(msg, checkParts) { + var hasParts = msg.hasOwnProperty("parts") && + msg.parts.hasOwnProperty("id") && + msg.parts.hasOwnProperty("index"); + + if (needsCount && checkParts && hasParts) { + return addMessageToPending(msg); } - return false; + return getProperty(node,msg) + .then(property => applyRules(node,msg,property)) + .then(onward => { + if (!repair || !hasParts) { + node.send(onward); + } + else { + sendGroupMessages(onward, msg); + } + }).catch(err => { + node.warn(err); + }); } - function process_msg(msg, check_parts) { - var has_parts = msg_has_parts(msg); - if (needs_count && check_parts && has_parts && - add2pending_in(msg)) { - return; - } - var onward = []; - try { - var prop; - if (node.propertyType === 'jsonata') { - prop = RED.util.evaluateJSONataExpression(node.property,msg); - } else { - prop = RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg); - } - var elseflag = true; - for (var i=0; i { + node.error(err,nextMsg); + return processMessageQueue(); + }); + } + this.on('input', function(msg) { - process_msg(msg, true); + processMessageQueue(msg); }); this.on('close', function() { - clear_pending(); + clearPending(); }); } diff --git a/nodes/core/logic/15-change.html b/nodes/core/logic/15-change.html index 83bdecf9e..359b43fdd 100644 --- a/nodes/core/logic/15-change.html +++ b/nodes/core/logic/15-change.html @@ -54,6 +54,10 @@ outputs: 1, icon: "swap.png", label: function() { + function prop2name(type, key) { + var result = RED.utils.parseContextKey(key); + return type +"." +result.key; + } if (this.name) { return this.name; } @@ -70,13 +74,13 @@ } else { if (this.rules.length == 1) { if (this.rules[0].t === "set") { - return this._("change.label.set",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p}); + return this._("change.label.set",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)}); } else if (this.rules[0].t === "change") { - return this._("change.label.change",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p}); + return this._("change.label.change",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)}); } else if (this.rules[0].t === "move") { - return this._("change.label.move",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p}); + return this._("change.label.move",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)}); } else { - return this._("change.label.delete",{property:(this.rules[0].pt||"msg")+"."+this.rules[0].p}); + return this._("change.label.delete",{property:prop2name((this.rules[0].pt||"msg"), this.rules[0].p)}); } } else { return this._("change.label.changeCount",{count:this.rules.length}); diff --git a/nodes/core/logic/15-change.js b/nodes/core/logic/15-change.js index ab4f59cc3..d98bf6bde 100644 --- a/nodes/core/logic/15-change.js +++ b/nodes/core/logic/15-change.js @@ -98,44 +98,53 @@ module.exports = function(RED) { } } - function applyRule(msg,rule) { - try { - var property = rule.p; - var value = rule.to; - if (rule.tot === 'json') { - value = JSON.parse(rule.to); - } else if (rule.tot === 'bin') { - value = Buffer.from(JSON.parse(rule.to)) - } - var current; - var fromValue; - var fromType; - var fromRE; - if (rule.tot === "msg") { - value = RED.util.getMessageProperty(msg,rule.to); - } else if (rule.tot === 'flow') { - value = node.context().flow.get(rule.to); - } else if (rule.tot === 'global') { - value = node.context().global.get(rule.to); - } else if (rule.tot === 'date') { - value = Date.now(); - } else if (rule.tot === 'jsonata') { - try{ - value = RED.util.evaluateJSONataExpression(rule.to,msg); - } catch(err) { - node.error(RED._("change.errors.invalid-expr",{error:err.message}),msg); - return; - } - } - if (rule.t === 'change') { - if (rule.fromt === 'msg' || rule.fromt === 'flow' || rule.fromt === 'global') { - if (rule.fromt === "msg") { - fromValue = RED.util.getMessageProperty(msg,rule.from); - } else if (rule.fromt === 'flow') { - fromValue = node.context().flow.get(rule.from); - } else if (rule.fromt === 'global') { - fromValue = node.context().global.get(rule.from); + function getToValue(msg,rule) { + var value = rule.to; + if (rule.tot === 'json') { + value = JSON.parse(rule.to); + } else if (rule.tot === 'bin') { + value = Buffer.from(JSON.parse(rule.to)) + } + if (rule.tot === "msg") { + value = RED.util.getMessageProperty(msg,rule.to); + } else if (rule.tot === 'flow') { + value = node.context().flow.get(rule.to); + } else if (rule.tot === 'global') { + value = node.context().global.get(rule.to); + } else if (rule.tot === 'date') { + value = Date.now(); + } else if (rule.tot === 'jsonata') { + return new Promise((resolve,reject) => { + RED.util.evaluateJSONataExpression(rule.to,msg, (err, value) => { + if (err) { + reject(RED._("change.errors.invalid-expr",{error:err.message})) + } else { + resolve(value); } + }); + }); + } + return Promise.resolve(value); + } + function getFromValue(msg,rule) { + var fromValue; + var fromType; + var fromRE; + if (rule.t === 'change') { + if (rule.fromt === 'msg' || rule.fromt === 'flow' || rule.fromt === 'global') { + return new Promise((resolve,reject) => { + if (rule.fromt === "msg") { + resolve(RED.util.getMessageProperty(msg,rule.from)); + } else if (rule.fromt === 'flow' || rule.fromt === 'global') { + node.context()[rule.fromt].get(rule.from,(err,fromValue) => { + if (err) { + reject(err); + } else { + resolve(fromValue); + } + }); + } + }).then(fromValue => { if (typeof fromValue === 'number' || fromValue instanceof Number) { fromType = 'num'; } else if (typeof fromValue === 'boolean') { @@ -149,108 +158,161 @@ module.exports = function(RED) { try { fromRE = new RegExp(fromRE, "g"); } catch (e) { - valid = false; - node.error(RED._("change.errors.invalid-from",{error:e.message}),msg); + reject(new Error(RED._("change.errors.invalid-from",{error:e.message}))); return; } } else { - node.error(RED._("change.errors.invalid-from",{error:"unsupported type: "+(typeof fromValue)}),msg); - return + reject(new Error(RED._("change.errors.invalid-from",{error:"unsupported type: "+(typeof fromValue)}))); + return; } + return { + fromType, + fromValue, + fromRE + } + }); + } else { + fromType = rule.fromt; + fromValue = rule.from; + fromRE = rule.fromRE; + } + } + return Promise.resolve({ + fromType, + fromValue, + fromRE + }); + } + function applyRule(msg,rule) { + var property = rule.p; + var current; + var fromValue; + var fromType; + var fromRE; + try { + return getToValue(msg,rule).then(value => { + return getFromValue(msg,rule).then(fromParts => { + fromValue = fromParts.fromValue; + fromType = fromParts.fromType; + fromRE = fromParts.fromRE; + if (rule.pt === 'msg') { + try { + if (rule.t === 'delete') { + RED.util.setMessageProperty(msg,property,undefined); + } else if (rule.t === 'set') { + RED.util.setMessageProperty(msg,property,value); + } else if (rule.t === 'change') { + current = RED.util.getMessageProperty(msg,property); + if (typeof current === 'string') { + if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) { + // str representation of exact from number/boolean + // only replace if they match exactly + RED.util.setMessageProperty(msg,property,value); + } else { + current = current.replace(fromRE,value); + RED.util.setMessageProperty(msg,property,current); + } + } else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') { + if (current == Number(fromValue)) { + RED.util.setMessageProperty(msg,property,value); + } + } else if (typeof current === 'boolean' && fromType === 'bool') { + if (current.toString() === fromValue) { + RED.util.setMessageProperty(msg,property,value); + } + } + } + } catch(err) {} + return msg; + } else if (rule.pt === 'flow' || rule.pt === 'global') { + return new Promise((resolve,reject) => { + var target = node.context()[rule.pt]; + var callback = err => { + if (err) { + reject(err); + } else { + resolve(msg); + } + } + if (rule.t === 'delete') { + target.set(property,undefined,callback); + } else if (rule.t === 'set') { + target.set(property,value,callback); + } else if (rule.t === 'change') { + target.get(property,(err,current) => { + if (err) { + reject(err); + return; + } + if (typeof current === 'string') { + if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) { + // str representation of exact from number/boolean + // only replace if they match exactly + target.set(property,value,callback); + } else { + current = current.replace(fromRE,value); + target.set(property,current,callback); + } + } else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') { + if (current == Number(fromValue)) { + target.set(property,value,callback); + } + } else if (typeof current === 'boolean' && fromType === 'bool') { + if (current.toString() === fromValue) { + target.set(property,value,callback); + } + } + }); + } + }); + } + }); + }).catch(err => { + node.error(err, msg); + return null; + }); + } catch(err) { + return Promise.resolve(msg); + } + } + function applyRules(msg, currentRule) { + var r = node.rules[currentRule]; + var rulePromise; + if (r.t === "move") { + if ((r.tot !== r.pt) || (r.p.indexOf(r.to) !== -1)) { + rulePromise = applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:r.p, tot:r.pt}).then( + msg => applyRule(msg,{t:"delete", p:r.p, pt:r.pt}) + ); + } + else { // 2 step move if we are moving from a child + rulePromise = applyRule(msg,{t:"set", p:"_temp_move", pt:r.tot, to:r.p, tot:r.pt}).then( + msg => applyRule(msg,{t:"delete", p:r.p, pt:r.pt}) + ).then( + msg => applyRule(msg,{t:"set", p:r.to, pt:r.tot, to:"_temp_move", tot:r.pt}) + ).then( + msg => applyRule(msg,{t:"delete", p:"_temp_move", pt:r.pt}) + ) + } + } else { + rulePromise = applyRule(msg,r); + } + return rulePromise.then( + msg => { + if (!msg) { + return + } else if (currentRule === node.rules.length - 1) { + return msg; } else { - fromType = rule.fromt; - fromValue = rule.from; - fromRE = rule.fromRE; + return applyRules(msg, currentRule+1); } } - if (rule.pt === 'msg') { - if (rule.t === 'delete') { - RED.util.setMessageProperty(msg,property,undefined); - } else if (rule.t === 'set') { - RED.util.setMessageProperty(msg,property,value); - } else if (rule.t === 'change') { - current = RED.util.getMessageProperty(msg,property); - if (typeof current === 'string') { - if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) { - // str representation of exact from number/boolean - // only replace if they match exactly - RED.util.setMessageProperty(msg,property,value); - } else { - current = current.replace(fromRE,value); - RED.util.setMessageProperty(msg,property,current); - } - } else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') { - if (current == Number(fromValue)) { - RED.util.setMessageProperty(msg,property,value); - } - } else if (typeof current === 'boolean' && fromType === 'bool') { - if (current.toString() === fromValue) { - RED.util.setMessageProperty(msg,property,value); - } - } - } - } - else { - var target; - if (rule.pt === 'flow') { - target = node.context().flow; - } else if (rule.pt === 'global') { - target = node.context().global; - } - if (target) { - if (rule.t === 'delete') { - target.set(property,undefined); - } else if (rule.t === 'set') { - target.set(property,value); - } else if (rule.t === 'change') { - current = target.get(property); - if (typeof current === 'string') { - if ((fromType === 'num' || fromType === 'bool' || fromType === 'str') && current === fromValue) { - // str representation of exact from number/boolean - // only replace if they match exactly - target.set(property,value); - } else { - current = current.replace(fromRE,value); - target.set(property,current); - } - } else if ((typeof current === 'number' || current instanceof Number) && fromType === 'num') { - if (current == Number(fromValue)) { - target.set(property,value); - } - } else if (typeof current === 'boolean' && fromType === 'bool') { - if (current.toString() === fromValue) { - target.set(property,value); - } - } - } - } - } - } catch(err) {/*console.log(err.stack)*/} - return msg; + ); } if (valid) { this.on('input', function(msg) { - for (var i=0; i { if (msg) { node.send(msg) }} ) + .catch( err => node.error(err, msg)) }); } } diff --git a/nodes/core/logic/17-split.js b/nodes/core/logic/17-split.js index 75ac7a4ce..09743f40c 100644 --- a/nodes/core/logic/17-split.js +++ b/nodes/core/logic/17-split.js @@ -233,7 +233,7 @@ module.exports = function(RED) { RED.nodes.registerType("split",SplitNode); - var _max_kept_msgs_count = undefined; + var _max_kept_msgs_count; function max_kept_msgs_count(node) { if (_max_kept_msgs_count === undefined) { @@ -252,7 +252,15 @@ module.exports = function(RED) { exp.assign("I", index); exp.assign("N", count); exp.assign("A", accum); - return RED.util.evaluateJSONataExpression(exp, msg); + return new Promise((resolve,reject) => { + RED.util.evaluateJSONataExpression(exp, msg, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); } function apply_f(exp, accum, count) { @@ -269,32 +277,37 @@ module.exports = function(RED) { return exp } - function reduce_and_send_group(node, group) { + function reduceAndSendGroup(node, group) { var is_right = node.reduce_right; var flag = is_right ? -1 : 1; var msgs = group.msgs; - var accum = eval_exp(node, node.exp_init, node.exp_init_type); - var reduce_exp = node.reduce_exp; - var reduce_fixup = node.reduce_fixup; - var count = group.count; - msgs.sort(function(x,y) { - var ix = x.parts.index; - var iy = y.parts.index; - if (ix < iy) return -flag; - if (ix > iy) return flag; - return 0; + return getInitialReduceValue(node, node.exp_init, node.exp_init_type).then(accum => { + var reduce_exp = node.reduce_exp; + var reduce_fixup = node.reduce_fixup; + var count = group.count; + msgs.sort(function(x,y) { + var ix = x.parts.index; + var iy = y.parts.index; + if (ix < iy) {return -flag;} + if (ix > iy) {return flag;} + return 0; + }); + + return msgs.reduce((promise, msg) => promise.then(accum => apply_r(reduce_exp, accum, msg, msg.parts.index, count)), Promise.resolve(accum)) + .then(accum => { + if(reduce_fixup !== undefined) { + accum = apply_f(reduce_fixup, accum, count); + } + node.send({payload: accum}); + }); + }).catch(err => { + throw new Error(RED._("join.errors.invalid-expr",{error:e.message})); }); - for(var msg of msgs) { - accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count); - } - if(reduce_fixup !== undefined) { - accum = apply_f(reduce_fixup, accum, count); - } - node.send({payload: accum}); } function reduce_msg(node, msg) { - if(msg.hasOwnProperty('parts')) { + var promise; + if (msg.hasOwnProperty('parts')) { var parts = msg.parts; var pending = node.pending; var pending_count = node.pending_count; @@ -312,65 +325,82 @@ module.exports = function(RED) { var group = pending[gid]; var msgs = group.msgs; if(parts.hasOwnProperty('count') && - (group.count === undefined)) { + (group.count === undefined)) { group.count = count; } msgs.push(msg); pending_count++; + var completeProcess = function() { + node.pending_count = pending_count; + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (pending_count > max_msgs)) { + node.pending = {}; + node.pending_count = 0; + var promise = Promise.reject(RED._("join.too-many")); + promise.catch(()=>{}); + return promise; + } + return Promise.resolve(); + } if(msgs.length === group.count) { delete pending[gid]; - try { - pending_count -= msgs.length; - reduce_and_send_group(node, group); - } catch(e) { - node.error(RED._("join.errors.invalid-expr",{error:e.message})); } + pending_count -= msgs.length; + promise = reduceAndSendGroup(node, group).then(completeProcess); + } else { + promise = completeProcess(); } - node.pending_count = pending_count; - var max_msgs = max_kept_msgs_count(node); - if ((max_msgs > 0) && (pending_count > max_msgs)) { - node.pending = {}; - node.pending_count = 0; - node.error(RED._("join.too-many"), msg); - } - } - else { + } else { node.send(msg); } + if (!promise) { + promise = Promise.resolve(); + } + return promise; } - function eval_exp(node, exp, exp_type) { - if(exp_type === "flow") { - return node.context().flow.get(exp); - } - else if(exp_type === "global") { - return node.context().global.get(exp); - } - else if(exp_type === "str") { - return exp; - } - else if(exp_type === "num") { - return Number(exp); - } - else if(exp_type === "bool") { - if (exp === 'true') { - return true; + function getInitialReduceValue(node, exp, exp_type) { + return new Promise((resolve,reject) => { + if(exp_type === "flow" || exp_type === "global") { + node.context()[exp_type].get(exp,(err,value) => { + if (err) { + reject(err); + } else { + resolve(value); + } + }); + return; + } else if(exp_type === "jsonata") { + var jexp = RED.util.prepareJSONataExpression(exp, node); + RED.util.evaluateJSONataExpression(jexp, {},(err,value) => { + if (err) { + reject(err); + } else { + resolve(value); + } + }); + return; } - else if (exp === 'false') { - return false; + var result; + if(exp_type === "str") { + result = exp; + } else if(exp_type === "num") { + result = Number(exp); + } else if(exp_type === "bool") { + if (exp === 'true') { + result = true; + } else if (exp === 'false') { + result = false; + } + } else if ((exp_type === "bin") || (exp_type === "json")) { + result = JSON.parse(exp); + } else if(exp_type === "date") { + result = Date.now(); + } else { + reject(new Error("unexpected initial value type")); + return; } - } - else if ((exp_type === "bin") || - (exp_type === "json")) { - return JSON.parse(exp); - } - else if(exp_type === "date") { - return Date.now(); - } - else if(exp_type === "jsonata") { - var jexp = RED.util.prepareJSONataExpression(exp, node); - return RED.util.evaluateJSONataExpression(jexp, {}); - } - throw new Error("unexpected initial value type"); + resolve(result); + }); } function JoinNode(n) { @@ -437,7 +467,8 @@ module.exports = function(RED) { newArray = newArray.concat(n); }) group.payload = newArray; - } else if (group.type === 'buffer') { + } + else if (group.type === 'buffer') { var buffers = []; var bufferLen = 0; if (group.joinChar !== undefined) { @@ -450,7 +481,8 @@ module.exports = function(RED) { buffers.push(group.payload[i]); bufferLen += group.payload[i].length; } - } else { + } + else { bufferLen = group.bufferLen; buffers = group.payload; } @@ -463,7 +495,8 @@ module.exports = function(RED) { groupJoinChar = group.joinChar.toString(); } RED.util.setMessageProperty(group.msg,node.property,group.payload.join(groupJoinChar)); - } else { + } + else { if (node.propertyType === 'full') { group.msg = RED.util.cloneMessage(group.msg); } @@ -471,13 +504,48 @@ module.exports = function(RED) { } if (group.msg.hasOwnProperty('parts') && group.msg.parts.hasOwnProperty('parts')) { group.msg.parts = group.msg.parts.parts; - } else { + } + else { delete group.msg.parts; } delete group.msg.complete; node.send(group.msg); } + var pendingMessages = []; + var activeMessagePromise = null; + // In reduce mode, we must process messages fully in order otherwise + // groups may overlap and cause unexpected results. The use of JSONata + // means some async processing *might* occur if flow/global context is + // accessed. + var processReduceMessageQueue = function(msg) { + if (msg) { + // A new message has arrived - add it to the message queue + pendingMessages.push(msg); + if (activeMessagePromise !== null) { + // The node is currently processing a message, so do nothing + // more with this message + return; + } + } + if (pendingMessages.length === 0) { + // There are no more messages to process, clear the active flag + // and return + activeMessagePromise = null; + return; + } + + // There are more messages to process. Get the next message and + // start processing it. Recurse back in to check for any more + var nextMsg = pendingMessages.shift(); + activeMessagePromise = reduce_msg(node, nextMsg) + .then(processReduceMessageQueue) + .catch((err) => { + node.error(err,nextMsg); + return processReduceMessageQueue(); + }); + } + this.on("input", function(msg) { try { var property; @@ -516,8 +584,7 @@ module.exports = function(RED) { propertyIndex = msg.parts.index; } else if (node.mode === 'reduce') { - reduce_msg(node, msg); - return; + return processReduceMessageQueue(msg); } else { // Use the node configuration to identify all of the group information @@ -525,7 +592,7 @@ module.exports = function(RED) { payloadType = node.build; targetCount = node.count; joinChar = node.joiner; - if (targetCount === 0 && msg.hasOwnProperty('parts')) { + if (n.count === "" && msg.hasOwnProperty('parts')) { targetCount = msg.parts.count || 0; } if (node.build === 'object') { @@ -554,7 +621,7 @@ module.exports = function(RED) { payload:{}, targetCount:targetCount, type:"object", - msg:msg + msg:RED.util.cloneMessage(msg) }; } else if (node.accumulate === true) { @@ -564,7 +631,7 @@ module.exports = function(RED) { payload:{}, targetCount:targetCount, type:payloadType, - msg:msg + msg:RED.util.cloneMessage(msg) } if (payloadType === 'string' || payloadType === 'array' || payloadType === 'buffer') { inflight[partId].payload = []; @@ -576,7 +643,7 @@ module.exports = function(RED) { payload:[], targetCount:targetCount, type:payloadType, - msg:msg + msg:RED.util.cloneMessage(msg) }; if (payloadType === 'string') { inflight[partId].joinChar = joinChar; @@ -627,14 +694,14 @@ module.exports = function(RED) { } } } - // TODO: currently reuse the last received - add option to pick first received - group.msg = msg; + group.msg = Object.assign(group.msg, msg); var tcnt = group.targetCount; if (msg.hasOwnProperty("parts")) { tcnt = group.targetCount || msg.parts.count; } if ((tcnt > 0 && group.currentCount >= tcnt) || msg.hasOwnProperty('complete')) { completeSend(partId); } - } catch(err) { + } + catch(err) { console.log(err.stack); } }); diff --git a/nodes/core/logic/18-sort.js b/nodes/core/logic/18-sort.js index e30535dc1..124392014 100644 --- a/nodes/core/logic/18-sort.js +++ b/nodes/core/logic/18-sort.js @@ -17,7 +17,7 @@ module.exports = function(RED) { "use strict"; - var _max_kept_msgs_count = undefined; + var _max_kept_msgs_count; function max_kept_msgs_count(node) { if (_max_kept_msgs_count === undefined) { @@ -32,30 +32,20 @@ module.exports = function(RED) { return _max_kept_msgs_count; } - function eval_jsonata(node, code, val) { - try { - return RED.util.evaluateJSONataExpression(code, val); - } - catch (e) { - node.error(RED._("sort.invalid-exp")); - throw e; - } - } - - function get_context_val(node, name, dval) { - var context = node.context(); - var val = context.get(name); - if (val === undefined) { - context.set(name, dval); - return dval; - } - return val; - } + // function get_context_val(node, name, dval) { + // var context = node.context(); + // var val = context.get(name); + // if (val === undefined) { + // context.set(name, dval); + // return dval; + // } + // return val; + // } function SortNode(n) { RED.nodes.createNode(this, n); var node = this; - var pending = get_context_val(node, 'pending', {}) + var pending = {};//get_context_val(node, 'pending', {}) var pending_count = 0; var pending_id = 0; var order = n.order || "ascending"; @@ -71,16 +61,15 @@ module.exports = function(RED) { key_exp = RED.util.prepareJSONataExpression(key_exp, this); } catch (e) { - node.error(RED._("sort.invalid-exp")); + node.error(RED._("sort.invalid-exp",{message:e.toString()})); return; } } var dir = (order === "ascending") ? 1 : -1; - var conv = as_num - ? function(x) { return Number(x); } - : function(x) { return x; }; + var conv = as_num ? function(x) { return Number(x); } + : function(x) { return x; }; - function gen_comp(key) { + function generateComparisonFunction(key) { return function(x, y) { var xp = conv(key(x)); var yp = conv(key(y)); @@ -90,74 +79,105 @@ module.exports = function(RED) { }; } - function send_group(group) { - var key = key_is_exp - ? function(msg) { - return eval_jsonata(node, key_exp, msg); - } - : function(msg) { - return RED.util.getMessageProperty(msg, key_prop); - }; - var comp = gen_comp(key); + function sortMessageGroup(group) { + var promise; var msgs = group.msgs; - try { - msgs.sort(comp); - } - catch (e) { - return; // not send when error - } - for (var i = 0; i < msgs.length; i++) { - var msg = msgs[i]; - msg.parts.index = i; - node.send(msg); - } - } - - function sort_payload(msg) { - var data = RED.util.getMessageProperty(msg, target_prop); - if (Array.isArray(data)) { - var key = key_is_exp - ? function(elem) { - return eval_jsonata(node, key_exp, elem); - } - : function(elem) { return elem; }; - var comp = gen_comp(key); + if (key_is_exp) { + var evaluatedDataPromises = msgs.map(msg => { + return new Promise((resolve,reject) => { + RED.util.evaluateJSONataExpression(key_exp, msg, (err, result) => { + if (err) { + reject(RED._("sort.invalid-exp",{message:err.toString()})); + } else { + resolve({ + item: msg, + sortValue: result + }) + } + }); + }) + }); + promise = Promise.all(evaluatedDataPromises).then(evaluatedElements => { + // Once all of the sort keys are evaluated, sort by them + var comp = generateComparisonFunction(elem=>elem.sortValue); + return evaluatedElements.sort(comp).map(elem=>elem.item); + }); + } else { + var key = function(msg) { + return ; + } + var comp = generateComparisonFunction(msg => RED.util.getMessageProperty(msg, key_prop)); try { - data.sort(comp); + msgs.sort(comp); } catch (e) { - return false; + return; // not send when error } - return true; + promise = Promise.resolve(msgs); } - return false; + return promise.then(msgs => { + for (var i = 0; i < msgs.length; i++) { + var msg = msgs[i]; + msg.parts.index = i; + node.send(msg); + } + }); } - function check_parts(parts) { - if (parts.hasOwnProperty("id") && - parts.hasOwnProperty("index")) { - return true; + function sortMessageProperty(msg) { + var data = RED.util.getMessageProperty(msg, target_prop); + if (Array.isArray(data)) { + if (key_is_exp) { + // key is an expression. Evaluated the expression for each item + // to get its sort value. As this could be async, need to do + // it first. + var evaluatedDataPromises = data.map(elem => { + return new Promise((resolve,reject) => { + RED.util.evaluateJSONataExpression(key_exp, elem, (err, result) => { + if (err) { + reject(RED._("sort.invalid-exp",{message:err.toString()})); + } else { + resolve({ + item: elem, + sortValue: result + }) + } + }); + }) + }) + return Promise.all(evaluatedDataPromises).then(evaluatedElements => { + // Once all of the sort keys are evaluated, sort by them + // and reconstruct the original message item with the newly + // sorted values. + var comp = generateComparisonFunction(elem=>elem.sortValue); + data = evaluatedElements.sort(comp).map(elem=>elem.item); + RED.util.setMessageProperty(msg, target_prop,data); + return true; + }) + } else { + var comp = generateComparisonFunction(elem=>elem); + try { + data.sort(comp); + } catch (e) { + return Promise.resolve(false); + } + return Promise.resolve(true); + } } - return false; + return Promise.resolve(false); } - function clear_pending() { + function removeOldestPending() { + var oldest; + var oldest_key; for(var key in pending) { - node.log(RED._("sort.clear"), pending[key].msgs[0]); - delete pending[key]; - } - pending_count = 0; - } - - function remove_oldest_pending() { - var oldest = undefined; - var oldest_key = undefined; - for(var key in pending) { - var item = pending[key]; - if((oldest === undefined) || - (oldest.seq_no > item.seq_no)) { - oldest = item; - oldest_key = key; + if (pending.hasOwnProperty(key)) { + var item = pending[key]; + if((oldest === undefined) || + (oldest.seq_no > item.seq_no)) { + oldest = item; + oldest_key = key; + } } } if(oldest !== undefined) { @@ -166,16 +186,19 @@ module.exports = function(RED) { } return 0; } - - function process_msg(msg) { + + function processMessage(msg) { if (target_is_prop) { - if (sort_payload(msg)) { - node.send(msg); - } - return; + sortMessageProperty(msg).then(send => { + if (send) { + node.send(msg); + } + }).catch(err => { + node.error(err,msg); + }); } var parts = msg.parts; - if (!check_parts(parts)) { + if (!parts.hasOwnProperty("id") || !parts.hasOwnProperty("index")) { return; } var gid = parts.id; @@ -195,23 +218,31 @@ module.exports = function(RED) { pending_count++; if (group.count === msgs.length) { delete pending[gid] - send_group(group); + sortMessageGroup(group).catch(err => { + node.error(err,msg); + }); pending_count -= msgs.length; - } - var max_msgs = max_kept_msgs_count(node); - if ((max_msgs > 0) && (pending_count > max_msgs)) { - pending_count -= remove_oldest_pending(); - node.error(RED._("sort.too-many"), msg); + } else { + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (pending_count > max_msgs)) { + pending_count -= removeOldestPending(); + node.error(RED._("sort.too-many"), msg); + } } } - + this.on("input", function(msg) { - process_msg(msg); + processMessage(msg); }); this.on("close", function() { - clear_pending(); - }) + for(var key in pending) { + if (pending.hasOwnProperty(key)) { + node.log(RED._("sort.clear"), pending[key].msgs[0]); + delete pending[key]; + } + } + pending_count = 0; }) } RED.nodes.registerType("sort", SortNode); diff --git a/nodes/core/parsers/70-JSON.html b/nodes/core/parsers/70-JSON.html index 565bae9a0..32b5a332a 100644 --- a/nodes/core/parsers/70-JSON.html +++ b/nodes/core/parsers/70-JSON.html @@ -31,6 +31,8 @@
payloadobject | string
A JavaScript object or JSON string.
+
schemaobject
+
An optional JSON Schema object to validate the payload against.

Outputs

@@ -41,6 +43,9 @@
  • If the input is a JavaScript object it creates a JSON string. The string can optionally be well-formatted.
  • +
    schemaErrorarray
    +
    If JSON schema validation fails, the catch node will have a schemaError property + containing an array of errors.

    Details

    By default, the node operates on msg.payload, but can be configured @@ -53,6 +58,8 @@ receives a String, no further checks will be made of the property. It will not check the String is valid JSON nor will it reformat it if the format option is selected.

    +

    For more details about JSON Schema you can consult the specification + here.