Fixup Join node to apply reduce_fixup asynchronously

This commit is contained in:
Nick O'Leary 2018-07-25 11:08:03 +01:00
parent 4609ee75b6
commit 9efd48fe51
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
2 changed files with 35 additions and 20 deletions

View File

@ -266,7 +266,15 @@ module.exports = function(RED) {
function apply_f(exp, accum, count) { function apply_f(exp, accum, count) {
exp.assign("N", count); exp.assign("N", count);
exp.assign("A", accum); exp.assign("A", accum);
return RED.util.evaluateJSONataExpression(exp, {}); return new Promise((resolve,reject) => {
return RED.util.evaluateJSONataExpression(exp, {}, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
} }
function exp_or_undefined(exp) { function exp_or_undefined(exp) {
@ -296,10 +304,13 @@ module.exports = function(RED) {
return msgs.reduce((promise, msg) => promise.then(accum => apply_r(reduce_exp, accum, msg, msg.parts.index, count)), Promise.resolve(accum)) return msgs.reduce((promise, msg) => promise.then(accum => apply_r(reduce_exp, accum, msg, msg.parts.index, count)), Promise.resolve(accum))
.then(accum => { .then(accum => {
if(reduce_fixup !== undefined) { if(reduce_fixup !== undefined) {
accum = apply_f(reduce_fixup, accum, count); return apply_f(reduce_fixup, accum, count).then(accum => {
}
node.send({payload: accum}); node.send({payload: accum});
}); });
} else {
node.send({payload: accum});
}
});
}).catch(err => { }).catch(err => {
throw new Error(RED._("join.errors.invalid-expr",{error:e.message})); throw new Error(RED._("join.errors.invalid-expr",{error:e.message}));
}); });

View File

@ -1319,6 +1319,7 @@ describe('JOIN node', function() {
initContext(function () { initContext(function () {
var n1 = helper.getNode("n1"); var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2"); var n2 = helper.getNode("n2");
try {
n2.on("input", function(msg) { n2.on("input", function(msg) {
try { try {
msg.should.have.property("payload"); msg.should.have.property("payload");
@ -1337,6 +1338,9 @@ describe('JOIN node', function() {
n1.receive({payload:1, parts:{index:0, count:4, id:222}}); n1.receive({payload:1, parts:{index:0, count:4, id:222}});
} }
}); });
}catch(err) {
done(err);
}
}); });
}); });
}); });