diff --git a/packages/node_modules/@node-red/nodes/core/logic/17-split.js b/packages/node_modules/@node-red/nodes/core/logic/17-split.js index 2bc3bf888..481c4ee3d 100644 --- a/packages/node_modules/@node-red/nodes/core/logic/17-split.js +++ b/packages/node_modules/@node-red/nodes/core/logic/17-split.js @@ -248,33 +248,11 @@ module.exports = function(RED) { return _maxKeptMsgsCount; } - function applyReduce(exp, accum, msg, index, count) { + function applyReduce(exp, accum, msg, index, count, done) { exp.assign("I", index); exp.assign("N", count); exp.assign("A", accum); - return new Promise((resolve,reject) => { - RED.util.evaluateJSONataExpression(exp, msg, (err, result) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); - } - - function applyFixup(exp, accum, count) { - exp.assign("N", count); - exp.assign("A", accum); - return new Promise((resolve,reject) => { - return RED.util.evaluateJSONataExpression(exp, {}, (err, result) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); + RED.util.evaluateJSONataExpression(exp, msg, done); } function exp_or_undefined(exp) { @@ -285,39 +263,68 @@ module.exports = function(RED) { return exp } - function reduceAndSendGroup(node, group) { + + function reduceMessageGroup(node,msgs,exp,fixup,count,accumulator,done) { + var msg = msgs.shift(); + exp.assign("I", msg.parts.index); + exp.assign("N", count); + exp.assign("A", accumulator); + RED.util.evaluateJSONataExpression(exp, msg, (err,result) => { + if (err) { + return done(err); + } + if (msgs.length === 0) { + if (fixup) { + fixup.assign("N", count); + fixup.assign("A", result); + RED.util.evaluateJSONataExpression(fixup, {}, (err, result) => { + if (err) { + return done(err); + } + node.send({payload: result}); + done(); + }); + } else { + node.send({payload: result}); + done(); + } + } else { + reduceMessageGroup(node,msgs,exp,fixup,count,result,done); + } + }); + + } + function reduceAndSendGroup(node, group, done) { var is_right = node.reduce_right; var flag = is_right ? -1 : 1; var msgs = group.msgs; - return getInitialReduceValue(node, node.exp_init, node.exp_init_type).then(accum => { - var reduceExpression = node.reduceExpression; - var fixupExpression = node.fixupExpression; - var count = group.count; - msgs.sort(function(x,y) { - var ix = x.parts.index; - var iy = y.parts.index; - if (ix < iy) {return -flag;} - if (ix > iy) {return flag;} - return 0; - }); - - return msgs.reduce((promise, msg) => promise.then(accum => applyReduce(reduceExpression, accum, msg, msg.parts.index, count)), Promise.resolve(accum)) - .then(accum => { - if(fixupExpression !== undefined) { - return applyFixup(fixupExpression, accum, count).then(accum => { - node.send({payload: accum}); - }); - } else { - node.send({payload: accum}); - } + try { + RED.util.evaluateNodeProperty(node.exp_init, node.exp_init_type, node, {}, (err,accum) => { + var reduceExpression = node.reduceExpression; + var fixupExpression = node.fixupExpression; + var count = group.count; + msgs.sort(function(x,y) { + var ix = x.parts.index; + var iy = y.parts.index; + if (ix < iy) {return -flag;} + if (ix > iy) {return flag;} + return 0; }); - }).catch(err => { - throw new Error(RED._("join.errors.invalid-expr",{error:err.message})); - }); + reduceMessageGroup(node, msgs,reduceExpression,fixupExpression,count,accum,(err,result) => { + if (err) { + done(err); + return; + } else { + done(); + } + }) + }); + } catch(err) { + done(new Error(RED._("join.errors.invalid-expr",{error:err.message}))); + } } - function reduceMessage(node, msg) { - var promise; + function reduceMessage(node, msg, done) { if (msg.hasOwnProperty('parts')) { var parts = msg.parts; var pending = node.pending; @@ -335,51 +342,37 @@ module.exports = function(RED) { } var group = pending[gid]; var msgs = group.msgs; - if(parts.hasOwnProperty('count') && (group.count === undefined)) { + if (parts.hasOwnProperty('count') && (group.count === undefined)) { group.count = parts.count; } msgs.push(msg); pending_count++; - var completeProcess = function() { + var completeProcess = function(err) { + if (err) { + return done(err); + } node.pending_count = pending_count; var max_msgs = maxKeptMsgsCount(node); if ((max_msgs > 0) && (pending_count > max_msgs)) { node.pending = {}; node.pending_count = 0; - var promise = Promise.reject(RED._("join.too-many")); - promise.catch(()=>{}); - return promise; + done(RED._("join.too-many")); + return; } - return Promise.resolve(); + return done(); } - if(msgs.length === group.count) { + + if (msgs.length === group.count) { delete pending[gid]; pending_count -= msgs.length; - promise = reduceAndSendGroup(node, group).then(completeProcess); + reduceAndSendGroup(node, group, completeProcess) } else { - promise = completeProcess(); + completeProcess(); } } else { node.send(msg); + done(); } - if (!promise) { - promise = Promise.resolve(); - } - return promise; - } - - function getInitialReduceValue(node, exp, exp_type) { - return new Promise((resolve, reject) => { - RED.util.evaluateNodeProperty(exp, exp_type, node, {}, - (err, result) => { - if(err) { - return reject(err); - } - else { - return resolve(result); - } - }); - }); } function JoinNode(n) { @@ -490,7 +483,7 @@ module.exports = function(RED) { } var pendingMessages = []; - var activeMessagePromise = null; + var activeMessage = null; // In reduce mode, we must process messages fully in order otherwise // groups may overlap and cause unexpected results. The use of JSONata // means some async processing *might* occur if flow/global context is @@ -499,7 +492,7 @@ module.exports = function(RED) { if (msg) { // A new message has arrived - add it to the message queue pendingMessages.push(msg); - if (activeMessagePromise !== null) { + if (activeMessage !== null) { // The node is currently processing a message, so do nothing // more with this message return; @@ -508,19 +501,21 @@ module.exports = function(RED) { if (pendingMessages.length === 0) { // There are no more messages to process, clear the active flag // and return - activeMessagePromise = null; + activeMessage = null; return; } // There are more messages to process. Get the next message and // start processing it. Recurse back in to check for any more var nextMsg = pendingMessages.shift(); - activeMessagePromise = reduceMessage(node, nextMsg) - .then(processReduceMessageQueue) - .catch((err) => { + activeMessage = true; + reduceMessage(node, nextMsg, err => { + if (err) { node.error(err,nextMsg); - return processReduceMessageQueue(); - }); + } + activeMessage = null; + processReduceMessageQueue(); + }) } this.on("input", function(msg) {