1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Add async jsonata support to join node

This commit is contained in:
Nick O'Leary 2018-07-10 11:24:57 +01:00
parent d8d82e2ba3
commit 57c1524a9a
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9

View File

@ -233,7 +233,7 @@ module.exports = function(RED) {
RED.nodes.registerType("split",SplitNode); RED.nodes.registerType("split",SplitNode);
var _max_kept_msgs_count = undefined; var _max_kept_msgs_count;
function max_kept_msgs_count(node) { function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) { if (_max_kept_msgs_count === undefined) {
@ -252,7 +252,15 @@ module.exports = function(RED) {
exp.assign("I", index); exp.assign("I", index);
exp.assign("N", count); exp.assign("N", count);
exp.assign("A", accum); exp.assign("A", accum);
return RED.util.evaluateJSONataExpression(exp, msg); return new Promise((resolve,reject) => {
RED.util.evaluateJSONataExpression(exp, msg, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
} }
function apply_f(exp, accum, count) { function apply_f(exp, accum, count) {
@ -269,32 +277,37 @@ module.exports = function(RED) {
return exp return exp
} }
function reduce_and_send_group(node, group) { function reduceAndSendGroup(node, group) {
var is_right = node.reduce_right; var is_right = node.reduce_right;
var flag = is_right ? -1 : 1; var flag = is_right ? -1 : 1;
var msgs = group.msgs; var msgs = group.msgs;
var accum = eval_exp(node, node.exp_init, node.exp_init_type); return getInitialReduceValue(node, node.exp_init, node.exp_init_type).then(accum => {
var reduce_exp = node.reduce_exp; var reduce_exp = node.reduce_exp;
var reduce_fixup = node.reduce_fixup; var reduce_fixup = node.reduce_fixup;
var count = group.count; var count = group.count;
msgs.sort(function(x,y) { msgs.sort(function(x,y) {
var ix = x.parts.index; var ix = x.parts.index;
var iy = y.parts.index; var iy = y.parts.index;
if (ix < iy) return -flag; if (ix < iy) {return -flag;}
if (ix > iy) return flag; if (ix > iy) {return flag;}
return 0; return 0;
});
return msgs.reduce((promise, msg) => promise.then(accum => apply_r(reduce_exp, accum, msg, msg.parts.index, count)), Promise.resolve(accum))
.then(accum => {
if(reduce_fixup !== undefined) {
accum = apply_f(reduce_fixup, accum, count);
}
node.send({payload: accum});
});
}).catch(err => {
throw new Error(RED._("join.errors.invalid-expr",{error:e.message}));
}); });
for(var msg of msgs) {
accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count);
}
if(reduce_fixup !== undefined) {
accum = apply_f(reduce_fixup, accum, count);
}
node.send({payload: accum});
} }
function reduce_msg(node, msg) { function reduce_msg(node, msg) {
if(msg.hasOwnProperty('parts')) { var promise;
if (msg.hasOwnProperty('parts')) {
var parts = msg.parts; var parts = msg.parts;
var pending = node.pending; var pending = node.pending;
var pending_count = node.pending_count; var pending_count = node.pending_count;
@ -312,65 +325,82 @@ module.exports = function(RED) {
var group = pending[gid]; var group = pending[gid];
var msgs = group.msgs; var msgs = group.msgs;
if(parts.hasOwnProperty('count') && if(parts.hasOwnProperty('count') &&
(group.count === undefined)) { (group.count === undefined)) {
group.count = count; group.count = count;
} }
msgs.push(msg); msgs.push(msg);
pending_count++; pending_count++;
var completeProcess = function() {
node.pending_count = pending_count;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
var promise = Promise.reject(RED._("join.too-many"));
promise.catch(()=>{});
return promise;
}
return Promise.resolve();
}
if(msgs.length === group.count) { if(msgs.length === group.count) {
delete pending[gid]; delete pending[gid];
try { pending_count -= msgs.length;
pending_count -= msgs.length; promise = reduceAndSendGroup(node, group).then(completeProcess);
reduce_and_send_group(node, group); } else {
} catch(e) { promise = completeProcess();
node.error(RED._("join.errors.invalid-expr",{error:e.message})); }
} }
node.pending_count = pending_count; } else {
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
node.error(RED._("join.too-many"), msg);
}
}
else {
node.send(msg); node.send(msg);
} }
if (!promise) {
promise = Promise.resolve();
}
return promise;
} }
function eval_exp(node, exp, exp_type) { function getInitialReduceValue(node, exp, exp_type) {
if(exp_type === "flow") { return new Promise((resolve,reject) => {
return node.context().flow.get(exp); if(exp_type === "flow" || exp_type === "global") {
} node.context()[exp_type].get(exp,(err,value) => {
else if(exp_type === "global") { if (err) {
return node.context().global.get(exp); reject(err);
} } else {
else if(exp_type === "str") { resolve(value);
return exp; }
} });
else if(exp_type === "num") { return;
return Number(exp); } else if(exp_type === "jsonata") {
} var jexp = RED.util.prepareJSONataExpression(exp, node);
else if(exp_type === "bool") { RED.util.evaluateJSONataExpression(jexp, {},(err,value) => {
if (exp === 'true') { if (err) {
return true; reject(err);
} else {
resolve(value);
}
});
return;
} }
else if (exp === 'false') { var result;
return false; if(exp_type === "str") {
result = exp;
} else if(exp_type === "num") {
result = Number(exp);
} else if(exp_type === "bool") {
if (exp === 'true') {
result = true;
} else if (exp === 'false') {
result = false;
}
} else if ((exp_type === "bin") || (exp_type === "json")) {
result = JSON.parse(exp);
} else if(exp_type === "date") {
result = Date.now();
} else {
reject(new Error("unexpected initial value type"));
return;
} }
} resolve(result);
else if ((exp_type === "bin") || });
(exp_type === "json")) {
return JSON.parse(exp);
}
else if(exp_type === "date") {
return Date.now();
}
else if(exp_type === "jsonata") {
var jexp = RED.util.prepareJSONataExpression(exp, node);
return RED.util.evaluateJSONataExpression(jexp, {});
}
throw new Error("unexpected initial value type");
} }
function JoinNode(n) { function JoinNode(n) {
@ -478,6 +508,40 @@ module.exports = function(RED) {
node.send(group.msg); node.send(group.msg);
} }
var pendingMessages = [];
var activeMessagePromise = null;
// In reduce mode, we must process messages fully in order otherwise
// groups may overlap and cause unexpected results. The use of JSONata
// means some async processing *might* occur if flow/global context is
// accessed.
var processReduceMessageQueue = function(msg) {
if (msg) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
if (activeMessagePromise !== null) {
// The node is currently processing a message, so do nothing
// more with this message
return;
}
}
if (pendingMessages.length === 0) {
// There are no more messages to process, clear the active flag
// and return
activeMessagePromise = null;
return;
}
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = reduce_msg(node, nextMsg)
.then(processReduceMessageQueue)
.catch((err) => {
node.error(err,nextMsg);
return processReduceMessageQueue();
});
}
this.on("input", function(msg) { this.on("input", function(msg) {
try { try {
var property; var property;
@ -516,8 +580,7 @@ module.exports = function(RED) {
propertyIndex = msg.parts.index; propertyIndex = msg.parts.index;
} }
else if (node.mode === 'reduce') { else if (node.mode === 'reduce') {
reduce_msg(node, msg); return processReduceMessageQueue(msg);
return;
} }
else { else {
// Use the node configuration to identify all of the group information // Use the node configuration to identify all of the group information