1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Add async property handling to Switch node

This commit is contained in:
Nick O'Leary 2018-07-09 11:30:53 +01:00
parent 9c00492dc2
commit fc9cdb61f2
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
3 changed files with 149 additions and 109 deletions

View File

@ -84,7 +84,13 @@ module.exports = function(RED) {
reject(err); reject(err);
} }
} else { } else {
resolve(RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg)); RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg,(err,value) => {
if (err) {
resolve(undefined);
} else {
resolve(value);
}
});
} }
}); });
} }
@ -147,6 +153,56 @@ module.exports = function(RED) {
}) })
} }
function applyRule(node, msg, property, state) {
return new Promise((resolve,reject) => {
var rule = node.rules[state.currentRule];
var v1,v2;
getV1(node,msg,rule,state.hasParts).then(value => {
v1 = value;
}).then(()=>getV2(node,msg,rule)).then(value => {
v2 = value;
}).then(() => {
if (rule.t == "else") {
property = state.elseflag;
state.elseflag = true;
}
if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) {
state.onward.push(msg);
state.elseflag = false;
if (node.checkall == "false") {
return resolve(false);
}
} else {
state.onward.push(null);
}
resolve(state.currentRule < node.rules.length - 1);
});
})
}
function applyRules(node, msg, property,state) {
if (!state) {
state = {
currentRule: 0,
elseflag: true,
onward: [],
hasParts: msg.hasOwnProperty("parts") &&
msg.parts.hasOwnProperty("id") &&
msg.parts.hasOwnProperty("index")
}
}
return applyRule(node,msg,property,state).then(hasMore => {
if (hasMore) {
state.currentRule++;
return applyRules(node,msg,property,state);
} else {
node.previousValue = property;
return state.onward;
}
});
}
function SwitchNode(n) { function SwitchNode(n) {
@ -248,23 +304,23 @@ module.exports = function(RED) {
function addMessageToPending(msg) { function addMessageToPending(msg) {
var parts = msg.parts; var parts = msg.parts;
if (parts.hasOwnProperty("id") && // We've already checked the msg.parts has the require bits
parts.hasOwnProperty("index")) { var group = addMessageToGroup(parts.id, msg, parts);
var group = addMessageToGroup(parts.id, msg, parts); var msgs = group.msgs;
var msgs = group.msgs; var count = group.count;
var count = group.count; if (count === msgs.length) {
if (count === msgs.length) { // We have a complete group - send the individual parts
for (var i = 0; i < msgs.length; i++) { return msgs.reduce((promise, msg) => {
var msg = msgs[i]; return promise.then((result) => {
msg.parts.count = count; msg.parts.count = count;
processMessage(msg, false); return processMessage(msg, false);
} })
}, Promise.resolve()).then( () => {
pendingCount -= group.msgs.length; pendingCount -= group.msgs.length;
delete pendingIn[parts.id]; delete pendingIn[parts.id];
} });
return true;
} }
return false; return Promise.resolve();
} }
function sendGroup(onwards, port_count) { function sendGroup(onwards, port_count) {
@ -332,103 +388,28 @@ module.exports = function(RED) {
function processMessage(msg, checkParts) { function processMessage(msg, checkParts) {
var hasParts = msg.hasOwnProperty("parts") && var hasParts = msg.hasOwnProperty("parts") &&
msg.parts.hasOwnProperty("id") && msg.parts.hasOwnProperty("id") &&
msg.parts.hasOwnProperty("index"); msg.parts.hasOwnProperty("index");
if (needsCount && checkParts && hasParts && if (needsCount && checkParts && hasParts) {
addMessageToPending(msg)) { return addMessageToPending(msg);
return;
} }
var onward = []; return getProperty(node,msg)
try { .then(property => applyRules(node,msg,property))
var prop; .then(onward => {
if (!repair || !hasParts) {
// getProperty node.send(onward);
if (node.propertyType === 'jsonata') {
prop = RED.util.evaluateJSONataExpression(node.property,msg);
} else {
prop = RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg);
}
// end getProperty
var elseflag = true;
for (var i=0; i<node.rules.length; i+=1) {
var rule = node.rules[i];
var test = prop;
var v1,v2;
//// getV1
if (rule.vt === 'prev') {
v1 = node.previousValue;
} else if (rule.vt === 'jsonata') {
try {
var exp = rule.v;
if (rule.t === 'jsonata_exp') {
if (hasParts) {
exp.assign("I", msg.parts.index);
exp.assign("N", msg.parts.count);
}
}
v1 = RED.util.evaluateJSONataExpression(exp,msg);
} catch(err) {
node.error(RED._("switch.errors.invalid-expr",{error:err.message}));
return;
} }
} else if (rule.vt === 'json') { else {
v1 = "json"; sendGroupMessages(onward, msg);
} else if (rule.vt === 'null') {
v1 = "null";
} else {
try {
v1 = RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg);
} catch(err) {
v1 = undefined;
} }
} }).catch(err => {
//// end getV1 node.warn(err);
});
//// getV2
v2 = rule.v2;
if (rule.v2t === 'prev') {
v2 = node.previousValue;
} else if (rule.v2t === 'jsonata') {
try {
v2 = RED.util.evaluateJSONataExpression(rule.v2,msg);
} catch(err) {
node.error(RED._("switch.errors.invalid-expr",{error:err.message}));
return;
}
} else if (typeof v2 !== 'undefined') {
try {
v2 = RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg);
} catch(err) {
v2 = undefined;
}
}
//// end getV2
if (rule.t == "else") { test = elseflag; elseflag = true; }
if (operators[rule.t](test,v1,v2,rule.case,msg.parts)) {
onward.push(msg);
elseflag = false;
if (node.checkall == "false") { break; }
} else {
onward.push(null);
}
}
node.previousValue = prop;
if (!repair || !hasParts) {
node.send(onward);
}
else {
sendGroupMessages(onward, msg);
}
} catch(err) {
node.warn(err);
}
} }
function clearPending() { function clearPending() {
@ -439,8 +420,38 @@ module.exports = function(RED) {
received = {}; received = {};
} }
var pendingMessages = [];
var activeMessagePromise = null;
var processMessageQueue = function(msg) {
if (msg) {
// A new message has arrived - add it to the message queue
pendingMessages.push(msg);
if (activeMessagePromise !== null) {
// The node is currently processing a message, so do nothing
// more with this message
return;
}
}
if (pendingMessages.length === 0) {
// There are no more messages to process, clear the active flag
// and return
activeMessagePromise = null;
return;
}
// There are more messages to process. Get the next message and
// start processing it. Recurse back in to check for any more
var nextMsg = pendingMessages.shift();
activeMessagePromise = processMessage(nextMsg,true)
.then(processMessageQueue)
.catch((err) => {
node.error(err,nextMsg);
return processMessageQueue();
});
}
this.on('input', function(msg) { this.on('input', function(msg) {
processMessage(msg, true); processMessageQueue(msg, true);
}); });
this.on('close', function() { this.on('close', function() {

View File

@ -350,7 +350,16 @@ function evaluateNodeProperty(value, type, node, msg, callback) {
var data = JSON.parse(value); var data = JSON.parse(value);
result = Buffer.from(data); result = Buffer.from(data);
} else if (type === 'msg' && msg) { } else if (type === 'msg' && msg) {
result = getMessageProperty(msg,value); try {
result = getMessageProperty(msg,value);
} catch(err) {
if (callback) {
callback(err);
} else {
throw err;
}
return;
}
} else if ((type === 'flow' || type === 'global') && node) { } else if ((type === 'flow' || type === 'global') && node) {
var contextKey = parseContextStore(value); var contextKey = parseContextStore(value);
result = node.context()[type].get(contextKey.key,contextKey.store,callback); result = node.context()[type].get(contextKey.key,contextKey.store,callback);
@ -366,7 +375,7 @@ function evaluateNodeProperty(value, type, node, msg, callback) {
result = evaluteEnvProperty(value); result = evaluteEnvProperty(value);
} }
if (callback) { if (callback) {
callback(result); callback(null,result);
} else { } else {
return result; return result;
} }

View File

@ -460,7 +460,7 @@ describe('switch Node', function() {
} catch(err) { } catch(err) {
done(err); done(err);
} }
},100) },500)
}); });
}); });
@ -821,4 +821,24 @@ describe('switch Node', function() {
n1.receive({payload:1, parts:{index:0, count:4, id:222}}); n1.receive({payload:1, parts:{index:0, count:4, id:222}});
}); });
}); });
it('should handle invalid jsonata expression', function(done) {
var flow = [{id:"switchNode1",type:"switch",name:"switchNode",property:"$invalidExpression(payload)",propertyType:"jsonata",rules:[{"t":"btwn","v":"$sqrt(16)","vt":"jsonata","v2":"$sqrt(36)","v2t":"jsonata"}],checkall:true,outputs:1,wires:[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(switchNode, flow, function() {
var n1 = helper.getNode("switchNode1");
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "switch";
});
var evt = logEvents[0][0];
evt.should.have.property('id', "switchNode1");
evt.should.have.property('type', "switch");
done();
}, 150);
n1.receive({payload:1});
});
});
}); });