From fc9cdb61f2a46e0fa462e26f2821cd1521d8b79e Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Mon, 9 Jul 2018 11:30:53 +0100 Subject: [PATCH] Add async property handling to Switch node --- nodes/core/logic/10-switch.js | 219 +++++++++++++----------- red/runtime/util.js | 13 +- test/nodes/core/logic/10-switch_spec.js | 26 ++- 3 files changed, 149 insertions(+), 109 deletions(-) diff --git a/nodes/core/logic/10-switch.js b/nodes/core/logic/10-switch.js index a58ac91ca..d15be39de 100644 --- a/nodes/core/logic/10-switch.js +++ b/nodes/core/logic/10-switch.js @@ -84,7 +84,13 @@ module.exports = function(RED) { reject(err); } } else { - resolve(RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg)); + RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg,(err,value) => { + if (err) { + resolve(undefined); + } else { + resolve(value); + } + }); } }); } @@ -147,6 +153,56 @@ module.exports = function(RED) { }) } + function applyRule(node, msg, property, state) { + return new Promise((resolve,reject) => { + + var rule = node.rules[state.currentRule]; + var v1,v2; + + getV1(node,msg,rule,state.hasParts).then(value => { + v1 = value; + }).then(()=>getV2(node,msg,rule)).then(value => { + v2 = value; + }).then(() => { + if (rule.t == "else") { + property = state.elseflag; + state.elseflag = true; + } + if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) { + state.onward.push(msg); + state.elseflag = false; + if (node.checkall == "false") { + return resolve(false); + } + } else { + state.onward.push(null); + } + resolve(state.currentRule < node.rules.length - 1); + }); + }) + } + + function applyRules(node, msg, property,state) { + if (!state) { + state = { + currentRule: 0, + elseflag: true, + onward: [], + hasParts: msg.hasOwnProperty("parts") && + msg.parts.hasOwnProperty("id") && + msg.parts.hasOwnProperty("index") + } + } + return applyRule(node,msg,property,state).then(hasMore => { + if (hasMore) { + state.currentRule++; + return applyRules(node,msg,property,state); + } else { + node.previousValue = property; + return state.onward; + } + }); + } function SwitchNode(n) { @@ -248,23 +304,23 @@ module.exports = function(RED) { function addMessageToPending(msg) { var parts = msg.parts; - if (parts.hasOwnProperty("id") && - parts.hasOwnProperty("index")) { - var group = addMessageToGroup(parts.id, msg, parts); - var msgs = group.msgs; - var count = group.count; - if (count === msgs.length) { - for (var i = 0; i < msgs.length; i++) { - var msg = msgs[i]; + // We've already checked the msg.parts has the require bits + var group = addMessageToGroup(parts.id, msg, parts); + var msgs = group.msgs; + var count = group.count; + if (count === msgs.length) { + // We have a complete group - send the individual parts + return msgs.reduce((promise, msg) => { + return promise.then((result) => { msg.parts.count = count; - processMessage(msg, false); - } + return processMessage(msg, false); + }) + }, Promise.resolve()).then( () => { pendingCount -= group.msgs.length; delete pendingIn[parts.id]; - } - return true; + }); } - return false; + return Promise.resolve(); } function sendGroup(onwards, port_count) { @@ -332,103 +388,28 @@ module.exports = function(RED) { + + function processMessage(msg, checkParts) { var hasParts = msg.hasOwnProperty("parts") && msg.parts.hasOwnProperty("id") && msg.parts.hasOwnProperty("index"); - if (needsCount && checkParts && hasParts && - addMessageToPending(msg)) { - return; + if (needsCount && checkParts && hasParts) { + return addMessageToPending(msg); } - var onward = []; - try { - var prop; - - // getProperty - if (node.propertyType === 'jsonata') { - prop = RED.util.evaluateJSONataExpression(node.property,msg); - } else { - prop = RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg); - } - // end getProperty - - var elseflag = true; - for (var i=0; i applyRules(node,msg,property)) + .then(onward => { + if (!repair || !hasParts) { + node.send(onward); } - } else if (rule.vt === 'json') { - v1 = "json"; - } else if (rule.vt === 'null') { - v1 = "null"; - } else { - try { - v1 = RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg); - } catch(err) { - v1 = undefined; + else { + sendGroupMessages(onward, msg); } - } - //// end getV1 - - //// getV2 - v2 = rule.v2; - if (rule.v2t === 'prev') { - v2 = node.previousValue; - } else if (rule.v2t === 'jsonata') { - try { - v2 = RED.util.evaluateJSONataExpression(rule.v2,msg); - } catch(err) { - node.error(RED._("switch.errors.invalid-expr",{error:err.message})); - return; - } - } else if (typeof v2 !== 'undefined') { - try { - v2 = RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg); - } catch(err) { - v2 = undefined; - } - } - //// end getV2 - - - if (rule.t == "else") { test = elseflag; elseflag = true; } - if (operators[rule.t](test,v1,v2,rule.case,msg.parts)) { - onward.push(msg); - elseflag = false; - if (node.checkall == "false") { break; } - } else { - onward.push(null); - } - } - node.previousValue = prop; - if (!repair || !hasParts) { - node.send(onward); - } - else { - sendGroupMessages(onward, msg); - } - } catch(err) { - node.warn(err); - } + }).catch(err => { + node.warn(err); + }); } function clearPending() { @@ -439,8 +420,38 @@ module.exports = function(RED) { received = {}; } + var pendingMessages = []; + var activeMessagePromise = null; + var processMessageQueue = function(msg) { + if (msg) { + // A new message has arrived - add it to the message queue + pendingMessages.push(msg); + if (activeMessagePromise !== null) { + // The node is currently processing a message, so do nothing + // more with this message + return; + } + } + if (pendingMessages.length === 0) { + // There are no more messages to process, clear the active flag + // and return + activeMessagePromise = null; + return; + } + + // There are more messages to process. Get the next message and + // start processing it. Recurse back in to check for any more + var nextMsg = pendingMessages.shift(); + activeMessagePromise = processMessage(nextMsg,true) + .then(processMessageQueue) + .catch((err) => { + node.error(err,nextMsg); + return processMessageQueue(); + }); + } + this.on('input', function(msg) { - processMessage(msg, true); + processMessageQueue(msg, true); }); this.on('close', function() { diff --git a/red/runtime/util.js b/red/runtime/util.js index 6279c416c..7ffe737b3 100644 --- a/red/runtime/util.js +++ b/red/runtime/util.js @@ -350,7 +350,16 @@ function evaluateNodeProperty(value, type, node, msg, callback) { var data = JSON.parse(value); result = Buffer.from(data); } else if (type === 'msg' && msg) { - result = getMessageProperty(msg,value); + try { + result = getMessageProperty(msg,value); + } catch(err) { + if (callback) { + callback(err); + } else { + throw err; + } + return; + } } else if ((type === 'flow' || type === 'global') && node) { var contextKey = parseContextStore(value); result = node.context()[type].get(contextKey.key,contextKey.store,callback); @@ -366,7 +375,7 @@ function evaluateNodeProperty(value, type, node, msg, callback) { result = evaluteEnvProperty(value); } if (callback) { - callback(result); + callback(null,result); } else { return result; } diff --git a/test/nodes/core/logic/10-switch_spec.js b/test/nodes/core/logic/10-switch_spec.js index e655a3fba..5159bf42e 100644 --- a/test/nodes/core/logic/10-switch_spec.js +++ b/test/nodes/core/logic/10-switch_spec.js @@ -460,7 +460,7 @@ describe('switch Node', function() { } catch(err) { done(err); } - },100) + },500) }); }); @@ -599,7 +599,7 @@ describe('switch Node', function() { it('should take head of message sequence (w. context)', function(done) { var flow = [{id:"switchNode1",type:"switch",name:"switchNode",property:"payload",rules:[{"t":"head","v":"count",vt:"global"}],checkall:false,repair:true,outputs:1,wires:[["helperNode1"]]}, {id:"helperNode1", type:"helper", wires:[]}]; - customFlowSequenceSwitchTest(flow, [0, 1, 2, 3, 4], [0, 1, 2], true, + customFlowSequenceSwitchTest(flow, [0, 1, 2, 3, 4], [0, 1, 2], true, function(node) { node.context().global.set("count", 3); }, done); @@ -642,7 +642,7 @@ describe('switch Node', function() { {id:"helperNode1", type:"helper", wires:[]}]; customFlowSwitchTest(flow, true, 9, done); }); - + it('should be able to use $I in JSONata expression', function(done) { var flow = [{id:"switchNode1",type:"switch",name:"switchNode",property:"payload",rules:[{"t":"jsonata_exp","v":"$I % 2 = 1",vt:"jsonata"}],checkall:true,repair:true,outputs:1,wires:[["helperNode1"]]}, {id:"helperNode1", type:"helper", wires:[]}]; @@ -821,4 +821,24 @@ describe('switch Node', function() { n1.receive({payload:1, parts:{index:0, count:4, id:222}}); }); }); + + it('should handle invalid jsonata expression', function(done) { + + var flow = [{id:"switchNode1",type:"switch",name:"switchNode",property:"$invalidExpression(payload)",propertyType:"jsonata",rules:[{"t":"btwn","v":"$sqrt(16)","vt":"jsonata","v2":"$sqrt(36)","v2t":"jsonata"}],checkall:true,outputs:1,wires:[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(switchNode, flow, function() { + var n1 = helper.getNode("switchNode1"); + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "switch"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "switchNode1"); + evt.should.have.property('type', "switch"); + done(); + }, 150); + n1.receive({payload:1}); + }); + }); + });