Remove promises from Join node

This commit is contained in:
Nick O'Leary 2019-01-02 22:37:06 +00:00
parent 747af44fc1
commit 43b7aa40c3
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
1 changed files with 81 additions and 86 deletions

View File

@ -248,33 +248,11 @@ module.exports = function(RED) {
return _maxKeptMsgsCount;
}
function applyReduce(exp, accum, msg, index, count) {
function applyReduce(exp, accum, msg, index, count, done) {
exp.assign("I", index);
exp.assign("N", count);
exp.assign("A", accum);
return new Promise((resolve,reject) => {
RED.util.evaluateJSONataExpression(exp, msg, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
}
function applyFixup(exp, accum, count) {
exp.assign("N", count);
exp.assign("A", accum);
return new Promise((resolve,reject) => {
return RED.util.evaluateJSONataExpression(exp, {}, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
RED.util.evaluateJSONataExpression(exp, msg, done);
}
function exp_or_undefined(exp) {
@ -285,39 +263,68 @@ module.exports = function(RED) {
return exp
}
function reduceAndSendGroup(node, group) {
function reduceMessageGroup(node,msgs,exp,fixup,count,accumulator,done) {
var msg = msgs.shift();
exp.assign("I", msg.parts.index);
exp.assign("N", count);
exp.assign("A", accumulator);
RED.util.evaluateJSONataExpression(exp, msg, (err,result) => {
if (err) {
return done(err);
}
if (msgs.length === 0) {
if (fixup) {
fixup.assign("N", count);
fixup.assign("A", result);
RED.util.evaluateJSONataExpression(fixup, {}, (err, result) => {
if (err) {
return done(err);
}
node.send({payload: result});
done();
});
} else {
node.send({payload: result});
done();
}
} else {
reduceMessageGroup(node,msgs,exp,fixup,count,result,done);
}
});
}
function reduceAndSendGroup(node, group, done) {
var is_right = node.reduce_right;
var flag = is_right ? -1 : 1;
var msgs = group.msgs;
return getInitialReduceValue(node, node.exp_init, node.exp_init_type).then(accum => {
var reduceExpression = node.reduceExpression;
var fixupExpression = node.fixupExpression;
var count = group.count;
msgs.sort(function(x,y) {
var ix = x.parts.index;
var iy = y.parts.index;
if (ix < iy) {return -flag;}
if (ix > iy) {return flag;}
return 0;
});
return msgs.reduce((promise, msg) => promise.then(accum => applyReduce(reduceExpression, accum, msg, msg.parts.index, count)), Promise.resolve(accum))
.then(accum => {
if(fixupExpression !== undefined) {
return applyFixup(fixupExpression, accum, count).then(accum => {
node.send({payload: accum});
});
} else {
node.send({payload: accum});
}
try {
RED.util.evaluateNodeProperty(node.exp_init, node.exp_init_type, node, {}, (err,accum) => {
var reduceExpression = node.reduceExpression;
var fixupExpression = node.fixupExpression;
var count = group.count;
msgs.sort(function(x,y) {
var ix = x.parts.index;
var iy = y.parts.index;
if (ix < iy) {return -flag;}
if (ix > iy) {return flag;}
return 0;
});
}).catch(err => {
throw new Error(RED._("join.errors.invalid-expr",{error:err.message}));
});
reduceMessageGroup(node, msgs,reduceExpression,fixupExpression,count,accum,(err,result) => {
if (err) {
done(err);
return;
} else {
done();
}
})
});
} catch(err) {
done(new Error(RED._("join.errors.invalid-expr",{error:err.message})));
}
}
function reduceMessage(node, msg) {
var promise;
function reduceMessage(node, msg, done) {
if (msg.hasOwnProperty('parts')) {
var parts = msg.parts;
var pending = node.pending;
@ -335,51 +342,37 @@ module.exports = function(RED) {
}
var group = pending[gid];
var msgs = group.msgs;
if(parts.hasOwnProperty('count') && (group.count === undefined)) {
if (parts.hasOwnProperty('count') && (group.count === undefined)) {
group.count = parts.count;
}
msgs.push(msg);
pending_count++;
var completeProcess = function() {
var completeProcess = function(err) {
if (err) {
return done(err);
}
node.pending_count = pending_count;
var max_msgs = maxKeptMsgsCount(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
var promise = Promise.reject(RED._("join.too-many"));
promise.catch(()=>{});
return promise;
done(RED._("join.too-many"));
return;
}
return Promise.resolve();
return done();
}
if(msgs.length === group.count) {
if (msgs.length === group.count) {
delete pending[gid];
pending_count -= msgs.length;
promise = reduceAndSendGroup(node, group).then(completeProcess);
reduceAndSendGroup(node, group, completeProcess)
} else {
promise = completeProcess();
completeProcess();
}
} else {
node.send(msg);
done();
}
if (!promise) {
promise = Promise.resolve();
}
return promise;
}
function getInitialReduceValue(node, exp, exp_type) {
return new Promise((resolve, reject) => {
RED.util.evaluateNodeProperty(exp, exp_type, node, {},
(err, result) => {
if(err) {
return reject(err);
}
else {
return resolve(result);
}
});
});
}
function JoinNode(n) {
@ -490,7 +483,7 @@ module.exports = function(RED) {
}
var pendingMessages = [];
var activeMessagePromise = null;
var activeMessage = null;
// In reduce mode, we must process messages fully in order otherwise
// groups may overlap and cause unexpected results. The use of JSONata
// means some async processing *might* occur if flow/global context is
@ -499,7 +492,7 @@ module.exports = function(RED) {
if (msg) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
if (activeMessagePromise !== null) {
if (activeMessage !== null) {
// The node is currently processing a message, so do nothing
// more with this message
return;
@ -508,19 +501,21 @@ module.exports = function(RED) {
if (pendingMessages.length === 0) {
// There are no more messages to process, clear the active flag
// and return
activeMessagePromise = null;
activeMessage = null;
return;
}
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = reduceMessage(node, nextMsg)
.then(processReduceMessageQueue)
.catch((err) => {
activeMessage = true;
reduceMessage(node, nextMsg, err => {
if (err) {
node.error(err,nextMsg);
return processReduceMessageQueue();
});
}
activeMessage = null;
processReduceMessageQueue();
})
}
this.on("input", function(msg) {