From 57c1524a9a5de597c5d94e8c67d4390229c4c61f Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Tue, 10 Jul 2018 11:24:57 +0100 Subject: [PATCH] Add async jsonata support to join node --- nodes/core/logic/17-split.js | 201 +++++++++++++++++++++++------------ 1 file changed, 132 insertions(+), 69 deletions(-) diff --git a/nodes/core/logic/17-split.js b/nodes/core/logic/17-split.js index 92b527d01..73231335f 100644 --- a/nodes/core/logic/17-split.js +++ b/nodes/core/logic/17-split.js @@ -233,7 +233,7 @@ module.exports = function(RED) { RED.nodes.registerType("split",SplitNode); - var _max_kept_msgs_count = undefined; + var _max_kept_msgs_count; function max_kept_msgs_count(node) { if (_max_kept_msgs_count === undefined) { @@ -252,7 +252,15 @@ module.exports = function(RED) { exp.assign("I", index); exp.assign("N", count); exp.assign("A", accum); - return RED.util.evaluateJSONataExpression(exp, msg); + return new Promise((resolve,reject) => { + RED.util.evaluateJSONataExpression(exp, msg, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); } function apply_f(exp, accum, count) { @@ -269,32 +277,37 @@ module.exports = function(RED) { return exp } - function reduce_and_send_group(node, group) { + function reduceAndSendGroup(node, group) { var is_right = node.reduce_right; var flag = is_right ? -1 : 1; var msgs = group.msgs; - var accum = eval_exp(node, node.exp_init, node.exp_init_type); - var reduce_exp = node.reduce_exp; - var reduce_fixup = node.reduce_fixup; - var count = group.count; - msgs.sort(function(x,y) { - var ix = x.parts.index; - var iy = y.parts.index; - if (ix < iy) return -flag; - if (ix > iy) return flag; - return 0; + return getInitialReduceValue(node, node.exp_init, node.exp_init_type).then(accum => { + var reduce_exp = node.reduce_exp; + var reduce_fixup = node.reduce_fixup; + var count = group.count; + msgs.sort(function(x,y) { + var ix = x.parts.index; + var iy = y.parts.index; + if (ix < iy) {return -flag;} + if (ix > iy) {return flag;} + return 0; + }); + + return msgs.reduce((promise, msg) => promise.then(accum => apply_r(reduce_exp, accum, msg, msg.parts.index, count)), Promise.resolve(accum)) + .then(accum => { + if(reduce_fixup !== undefined) { + accum = apply_f(reduce_fixup, accum, count); + } + node.send({payload: accum}); + }); + }).catch(err => { + throw new Error(RED._("join.errors.invalid-expr",{error:e.message})); }); - for(var msg of msgs) { - accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count); - } - if(reduce_fixup !== undefined) { - accum = apply_f(reduce_fixup, accum, count); - } - node.send({payload: accum}); } function reduce_msg(node, msg) { - if(msg.hasOwnProperty('parts')) { + var promise; + if (msg.hasOwnProperty('parts')) { var parts = msg.parts; var pending = node.pending; var pending_count = node.pending_count; @@ -312,65 +325,82 @@ module.exports = function(RED) { var group = pending[gid]; var msgs = group.msgs; if(parts.hasOwnProperty('count') && - (group.count === undefined)) { + (group.count === undefined)) { group.count = count; } msgs.push(msg); pending_count++; + var completeProcess = function() { + node.pending_count = pending_count; + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (pending_count > max_msgs)) { + node.pending = {}; + node.pending_count = 0; + var promise = Promise.reject(RED._("join.too-many")); + promise.catch(()=>{}); + return promise; + } + return Promise.resolve(); + } if(msgs.length === group.count) { delete pending[gid]; - try { - pending_count -= msgs.length; - reduce_and_send_group(node, group); - } catch(e) { - node.error(RED._("join.errors.invalid-expr",{error:e.message})); } + pending_count -= msgs.length; + promise = reduceAndSendGroup(node, group).then(completeProcess); + } else { + promise = completeProcess(); } - node.pending_count = pending_count; - var max_msgs = max_kept_msgs_count(node); - if ((max_msgs > 0) && (pending_count > max_msgs)) { - node.pending = {}; - node.pending_count = 0; - node.error(RED._("join.too-many"), msg); - } - } - else { + } else { node.send(msg); } + if (!promise) { + promise = Promise.resolve(); + } + return promise; } - function eval_exp(node, exp, exp_type) { - if(exp_type === "flow") { - return node.context().flow.get(exp); - } - else if(exp_type === "global") { - return node.context().global.get(exp); - } - else if(exp_type === "str") { - return exp; - } - else if(exp_type === "num") { - return Number(exp); - } - else if(exp_type === "bool") { - if (exp === 'true') { - return true; + function getInitialReduceValue(node, exp, exp_type) { + return new Promise((resolve,reject) => { + if(exp_type === "flow" || exp_type === "global") { + node.context()[exp_type].get(exp,(err,value) => { + if (err) { + reject(err); + } else { + resolve(value); + } + }); + return; + } else if(exp_type === "jsonata") { + var jexp = RED.util.prepareJSONataExpression(exp, node); + RED.util.evaluateJSONataExpression(jexp, {},(err,value) => { + if (err) { + reject(err); + } else { + resolve(value); + } + }); + return; } - else if (exp === 'false') { - return false; + var result; + if(exp_type === "str") { + result = exp; + } else if(exp_type === "num") { + result = Number(exp); + } else if(exp_type === "bool") { + if (exp === 'true') { + result = true; + } else if (exp === 'false') { + result = false; + } + } else if ((exp_type === "bin") || (exp_type === "json")) { + result = JSON.parse(exp); + } else if(exp_type === "date") { + result = Date.now(); + } else { + reject(new Error("unexpected initial value type")); + return; } - } - else if ((exp_type === "bin") || - (exp_type === "json")) { - return JSON.parse(exp); - } - else if(exp_type === "date") { - return Date.now(); - } - else if(exp_type === "jsonata") { - var jexp = RED.util.prepareJSONataExpression(exp, node); - return RED.util.evaluateJSONataExpression(jexp, {}); - } - throw new Error("unexpected initial value type"); + resolve(result); + }); } function JoinNode(n) { @@ -478,6 +508,40 @@ module.exports = function(RED) { node.send(group.msg); } + var pendingMessages = []; + var activeMessagePromise = null; + // In reduce mode, we must process messages fully in order otherwise + // groups may overlap and cause unexpected results. The use of JSONata + // means some async processing *might* occur if flow/global context is + // accessed. + var processReduceMessageQueue = function(msg) { + if (msg) { + // A new message has arrived - add it to the message queue + pendingMessages.push(msg); + if (activeMessagePromise !== null) { + // The node is currently processing a message, so do nothing + // more with this message + return; + } + } + if (pendingMessages.length === 0) { + // There are no more messages to process, clear the active flag + // and return + activeMessagePromise = null; + return; + } + + // There are more messages to process. Get the next message and + // start processing it. Recurse back in to check for any more + var nextMsg = pendingMessages.shift(); + activeMessagePromise = reduce_msg(node, nextMsg) + .then(processReduceMessageQueue) + .catch((err) => { + node.error(err,nextMsg); + return processReduceMessageQueue(); + }); + } + this.on("input", function(msg) { try { var property; @@ -516,8 +580,7 @@ module.exports = function(RED) { propertyIndex = msg.parts.index; } else if (node.mode === 'reduce') { - reduce_msg(node, msg); - return; + return processReduceMessageQueue(msg); } else { // Use the node configuration to identify all of the group information