Remove all Promises from Switch node

Promises are expensive and should not be used in the main
message handling path. The Switch node used them a lot if
the node references context - with a lot of duplicate code
to handle async and sync code paths.

This change modifies the code to use callbacks throughout
that are just as performant in either case.
This commit is contained in:
Nick O'Leary 2018-12-20 22:57:47 +00:00
parent 7f5d47f39d
commit 473a2ae275
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
1 changed files with 153 additions and 274 deletions

View File

@ -91,206 +91,117 @@ module.exports = function(RED) {
return _maxKeptCount;
}
function getProperty(node,msg) {
if (node.useAsyncRules) {
return new Promise((resolve,reject) => {
if (node.propertyType === 'jsonata') {
RED.util.evaluateJSONataExpression(node.property,msg,(err,value) => {
if (err) {
reject(RED._("switch.errors.invalid-expr",{error:err.message}));
} else {
resolve(value);
}
});
function getProperty(node,msg,done) {
if (node.propertyType === 'jsonata') {
RED.util.evaluateJSONataExpression(node.property,msg,(err,value) => {
if (err) {
done(RED._("switch.errors.invalid-expr",{error:err.message}));
} else {
RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg,(err,value) => {
if (err) {
resolve(undefined);
} else {
resolve(value);
}
});
done(undefined,value);
}
});
} else {
if (node.propertyType === 'jsonata') {
try {
return RED.util.evaluateJSONataExpression(node.property,msg);
} catch(err) {
throw new Error(RED._("switch.errors.invalid-expr",{error:err.message}))
RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg,(err,value) => {
if (err) {
done(undefined,undefined);
} else {
done(undefined,value);
}
} else {
try {
return RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg);
} catch(err) {
return undefined;
}
}
});
}
}
function getV1(node,msg,rule,hasParts) {
if (node.useAsyncRules) {
return new Promise( (resolve,reject) => {
if (rule.vt === 'prev') {
resolve(node.previousValue);
} else if (rule.vt === 'jsonata') {
var exp = rule.v;
if (rule.t === 'jsonata_exp') {
if (hasParts) {
exp.assign("I", msg.parts.index);
exp.assign("N", msg.parts.count);
}
}
RED.util.evaluateJSONataExpression(exp,msg,(err,value) => {
if (err) {
reject(RED._("switch.errors.invalid-expr",{error:err.message}));
} else {
resolve(value);
}
});
} else if (rule.vt === 'json') {
resolve("json"); // TODO: ?! invalid case
} else if (rule.vt === 'null') {
resolve("null");
function getV1(node,msg,rule,hasParts,done) {
if (rule.vt === 'prev') {
return done(undefined,node.previousValue);
} else if (rule.vt === 'jsonata') {
var exp = rule.v;
if (rule.t === 'jsonata_exp') {
if (hasParts) {
exp.assign("I", msg.parts.index);
exp.assign("N", msg.parts.count);
}
}
RED.util.evaluateJSONataExpression(exp,msg,(err,value) => {
if (err) {
done(RED._("switch.errors.invalid-expr",{error:err.message}));
} else {
RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg, function(err,value) {
if (err) {
resolve(undefined);
} else {
resolve(value);
}
});
done(undefined, value);
}
});
} else if (rule.vt === 'json') {
done(undefined,"json"); // TODO: ?! invalid case
} else if (rule.vt === 'null') {
done(undefined,"null");
} else {
RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg, function(err,value) {
if (err) {
done(undefined, undefined);
} else {
done(undefined, value);
}
});
}
}
function getV2(node,msg,rule,done) {
var v2 = rule.v2;
if (rule.v2t === 'prev') {
return done(undefined,node.previousValue);
} else if (rule.v2t === 'jsonata') {
RED.util.evaluateJSONataExpression(rule.v2,msg,(err,value) => {
if (err) {
done(RED._("switch.errors.invalid-expr",{error:err.message}));
} else {
done(undefined,value);
}
});
} else if (typeof v2 !== 'undefined') {
RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg, function(err,value) {
if (err) {
done(undefined,undefined);
} else {
done(undefined,value);
}
});
} else {
if (rule.vt === 'prev') {
return node.previousValue;
} else if (rule.vt === 'jsonata') {
var exp = rule.v;
if (rule.t === 'jsonata_exp') {
if (hasParts) {
exp.assign("I", msg.parts.index);
exp.assign("N", msg.parts.count);
}
}
try {
return RED.util.evaluateJSONataExpression(exp,msg);
} catch(err) {
throw new Error(RED._("switch.errors.invalid-expr",{error:err.message}))
}
} else if (rule.vt === 'json') {
return "json"; // TODO: ?! invalid case
} else if (rule.vt === 'null') {
return "null";
} else {
try {
return RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg);
} catch(err) {
return undefined;
}
}
done(undefined,v2);
}
}
function getV2(node,msg,rule) {
if (node.useAsyncRules) {
return new Promise((resolve,reject) => {
var v2 = rule.v2;
if (rule.v2t === 'prev') {
resolve(node.previousValue);
} else if (rule.v2t === 'jsonata') {
RED.util.evaluateJSONataExpression(rule.v2,msg,(err,value) => {
if (err) {
reject(RED._("switch.errors.invalid-expr",{error:err.message}));
} else {
resolve(value);
}
});
} else if (typeof v2 !== 'undefined') {
RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg, function(err,value) {
if (err) {
resolve(undefined);
} else {
resolve(value);
}
});
function applyRule(node, msg, property, state, done) {
var rule = node.rules[state.currentRule];
var v1,v2;
getV1(node,msg,rule,state.hasParts, (err,value) => {
if (err) {
return done(err);
}
v1 = value;
getV2(node,msg,rule, (err,value) => {
if (err) {
return done(err);
}
v2 = value;
if (rule.t == "else") {
property = state.elseflag;
state.elseflag = true;
}
if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) {
state.onward.push(msg);
state.elseflag = false;
if (node.checkall == "false") {
return done(undefined,false);
}
} else {
resolve(v2);
state.onward.push(null);
}
})
} else {
var v2 = rule.v2;
if (rule.v2t === 'prev') {
return node.previousValue;
} else if (rule.v2t === 'jsonata') {
try {
return RED.util.evaluateJSONataExpression(rule.v2,msg);
} catch(err) {
throw new Error(RED._("switch.errors.invalid-expr",{error:err.message}))
}
} else if (typeof v2 !== 'undefined') {
try {
return RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg);
} catch(err) {
return undefined;
}
} else {
return v2;
}
}
done(undefined, state.currentRule < node.rules.length - 1);
});
});
}
function applyRule(node, msg, property, state) {
if (node.useAsyncRules) {
return new Promise((resolve,reject) => {
var rule = node.rules[state.currentRule];
var v1,v2;
getV1(node,msg,rule,state.hasParts).then(value => {
v1 = value;
}).then(()=>getV2(node,msg,rule)).then(value => {
v2 = value;
}).then(() => {
if (rule.t == "else") {
property = state.elseflag;
state.elseflag = true;
}
if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) {
state.onward.push(msg);
state.elseflag = false;
if (node.checkall == "false") {
return resolve(false);
}
} else {
state.onward.push(null);
}
resolve(state.currentRule < node.rules.length - 1);
});
})
} else {
var rule = node.rules[state.currentRule];
var v1 = getV1(node,msg,rule,state.hasParts);
var v2 = getV2(node,msg,rule);
if (rule.t == "else") {
property = state.elseflag;
state.elseflag = true;
}
if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) {
state.onward.push(msg);
state.elseflag = false;
if (node.checkall == "false") {
return false;
}
} else {
state.onward.push(null);
}
return state.currentRule < node.rules.length - 1
}
}
function applyRules(node, msg, property,state) {
function applyRules(node, msg, property,state,done) {
if (!state) {
state = {
currentRule: 0,
@ -301,26 +212,18 @@ module.exports = function(RED) {
msg.parts.hasOwnProperty("index")
}
}
if (node.useAsyncRules) {
return applyRule(node,msg,property,state).then(hasMore => {
if (hasMore) {
state.currentRule++;
return applyRules(node,msg,property,state);
} else {
node.previousValue = property;
return state.onward;
}
});
} else {
var hasMore = applyRule(node,msg,property,state);
applyRule(node,msg,property,state,(err,hasMore) => {
if (err) {
return done(err);
}
if (hasMore) {
state.currentRule++;
return applyRules(node,msg,property,state);
applyRules(node,msg,property,state,done);
} else {
node.previousValue = property;
return state.onward;
done(undefined,state.onward);
}
}
});
}
@ -345,13 +248,6 @@ module.exports = function(RED) {
var valid = true;
var repair = n.repair;
var needsCount = repair;
this.useAsyncRules = (
this.propertyType === 'flow' ||
this.propertyType === 'global' || (
this.propertyType === 'jsonata' &&
/\$(flow|global)Context/.test(this.property)
)
);
for (var i=0; i<this.rules.length; i+=1) {
var rule = this.rules[i];
@ -363,13 +259,6 @@ module.exports = function(RED) {
rule.vt = 'str';
}
}
this.useAsyncRules = this.useAsyncRules || (
rule.vt === 'flow' ||
rule.vt === 'global' || (
rule.vt === 'jsonata' &&
/\$(flow|global)Context/.test(rule.v)
)
);
if (rule.vt === 'num') {
if (!isNaN(Number(rule.v))) {
rule.v = Number(rule.v);
@ -382,9 +271,6 @@ module.exports = function(RED) {
valid = false;
}
}
if (rule.vt === 'flow' || rule.vt === 'global' || rule.vt === 'jsonata') {
this.useAsyncRules = true;
}
if (typeof rule.v2 !== 'undefined') {
if (!rule.v2t) {
if (!isNaN(Number(rule.v2))) {
@ -393,13 +279,6 @@ module.exports = function(RED) {
rule.v2t = 'str';
}
}
this.useAsyncRules = this.useAsyncRules || (
rule.v2t === 'flow' ||
rule.v2t === 'global' || (
rule.v2t === 'jsonata' &&
/\$(flow|global)Context/.test(rule.v2)
)
);
if (rule.v2t === 'num') {
rule.v2 = Number(rule.v2);
} else if (rule.v2t === 'jsonata') {
@ -444,26 +323,38 @@ module.exports = function(RED) {
return group;
}
function addMessageToPending(msg) {
function drainMessageGroup(msgs,count,done) {
var msg = msgs.shift();
msg.parts.count = count;
processMessage(msg,false, err => {
if (err) {
done(err);
} else {
if (msgs.length === 0) {
done()
} else {
drainMessageGroup(msgs,count,done);
}
}
})
}
function addMessageToPending(msg,done) {
var parts = msg.parts;
// We've already checked the msg.parts has the require bits
var group = addMessageToGroup(parts.id, msg, parts);
var msgs = group.msgs;
var count = group.count;
if (count === msgs.length) {
var msgsCount = msgs.length;
if (count === msgsCount) {
// We have a complete group - send the individual parts
return msgs.reduce((promise, msg) => {
return promise.then((result) => {
msg.parts.count = count;
return processMessage(msg, false);
})
}, Promise.resolve()).then( () => {
pendingCount -= group.msgs.length;
drainMessageGroup(msgs,count,err => {
pendingCount -= msgsCount;
delete pendingIn[parts.id];
});
done();
})
return;
}
return Promise.resolve();
done();
}
function sendGroup(onwards, port_count) {
@ -529,43 +420,33 @@ module.exports = function(RED) {
}
}
function processMessage(msg, checkParts) {
function processMessage(msg, checkParts, done) {
var hasParts = msg.hasOwnProperty("parts") &&
msg.parts.hasOwnProperty("id") &&
msg.parts.hasOwnProperty("index");
if (needsCount && checkParts && hasParts) {
return addMessageToPending(msg);
}
if (node.useAsyncRules) {
return getProperty(node,msg)
.then(property => applyRules(node,msg,property))
.then(onward => {
if (!repair || !hasParts) {
node.send(onward);
}
else {
sendGroupMessages(onward, msg);
}
}).catch(err => {
node.warn(err);
});
addMessageToPending(msg,done);
} else {
try {
var property = getProperty(node,msg);
var onward = applyRules(node,msg,property);
if (!repair || !hasParts) {
node.send(onward);
getProperty(node,msg,(err,property) => {
if (err) {
node.warn(err);
done();
} else {
sendGroupMessages(onward, msg);
applyRules(node,msg,property,undefined,(err,onward) => {
if (err) {
node.warn(err);
} else {
if (!repair || !hasParts) {
node.send(onward);
} else {
sendGroupMessages(onward, msg);
}
}
done();
});
}
} catch(err) {
node.warn(err);
}
});
}
}
@ -578,12 +459,13 @@ module.exports = function(RED) {
}
var pendingMessages = [];
var activeMessagePromise = null;
var handlingMessage = false;
var processMessageQueue = function(msg) {
if (msg) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
if (activeMessagePromise !== null) {
if (handlingMessage) {
// The node is currently processing a message, so do nothing
// more with this message
return;
@ -592,27 +474,24 @@ module.exports = function(RED) {
if (pendingMessages.length === 0) {
// There are no more messages to process, clear the active flag
// and return
activeMessagePromise = null;
handlingMessage = false;
return;
}
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = processMessage(nextMsg,true)
.then(processMessageQueue)
.catch((err) => {
handlingMessage = true;
processMessage(nextMsg,true,err => {
if (err) {
node.error(err,nextMsg);
return processMessageQueue();
});
}
processMessageQueue()
});
}
this.on('input', function(msg) {
if (node.useAsyncRules) {
processMessageQueue(msg);
} else {
processMessage(msg,true);
}
processMessageQueue(msg);
});
this.on('close', function() {