diff --git a/packages/node_modules/@node-red/nodes/core/logic/10-switch.js b/packages/node_modules/@node-red/nodes/core/logic/10-switch.js index 1d9c52971..f8e92a7a3 100644 --- a/packages/node_modules/@node-red/nodes/core/logic/10-switch.js +++ b/packages/node_modules/@node-red/nodes/core/logic/10-switch.js @@ -91,206 +91,117 @@ module.exports = function(RED) { return _maxKeptCount; } - function getProperty(node,msg) { - if (node.useAsyncRules) { - return new Promise((resolve,reject) => { - if (node.propertyType === 'jsonata') { - RED.util.evaluateJSONataExpression(node.property,msg,(err,value) => { - if (err) { - reject(RED._("switch.errors.invalid-expr",{error:err.message})); - } else { - resolve(value); - } - }); + function getProperty(node,msg,done) { + if (node.propertyType === 'jsonata') { + RED.util.evaluateJSONataExpression(node.property,msg,(err,value) => { + if (err) { + done(RED._("switch.errors.invalid-expr",{error:err.message})); } else { - RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg,(err,value) => { - if (err) { - resolve(undefined); - } else { - resolve(value); - } - }); + done(undefined,value); } }); } else { - if (node.propertyType === 'jsonata') { - try { - return RED.util.evaluateJSONataExpression(node.property,msg); - } catch(err) { - throw new Error(RED._("switch.errors.invalid-expr",{error:err.message})) + RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg,(err,value) => { + if (err) { + done(undefined,undefined); + } else { + done(undefined,value); } - } else { - try { - return RED.util.evaluateNodeProperty(node.property,node.propertyType,node,msg); - } catch(err) { - return undefined; - } - } + }); } } - function getV1(node,msg,rule,hasParts) { - if (node.useAsyncRules) { - return new Promise( (resolve,reject) => { - if (rule.vt === 'prev') { - resolve(node.previousValue); - } else if (rule.vt === 'jsonata') { - var exp = rule.v; - if (rule.t === 'jsonata_exp') { - if (hasParts) { - exp.assign("I", msg.parts.index); - exp.assign("N", msg.parts.count); - } - } - RED.util.evaluateJSONataExpression(exp,msg,(err,value) => { - if (err) { - reject(RED._("switch.errors.invalid-expr",{error:err.message})); - } else { - resolve(value); - } - }); - } else if (rule.vt === 'json') { - resolve("json"); // TODO: ?! invalid case - } else if (rule.vt === 'null') { - resolve("null"); + function getV1(node,msg,rule,hasParts,done) { + if (rule.vt === 'prev') { + return done(undefined,node.previousValue); + } else if (rule.vt === 'jsonata') { + var exp = rule.v; + if (rule.t === 'jsonata_exp') { + if (hasParts) { + exp.assign("I", msg.parts.index); + exp.assign("N", msg.parts.count); + } + } + RED.util.evaluateJSONataExpression(exp,msg,(err,value) => { + if (err) { + done(RED._("switch.errors.invalid-expr",{error:err.message})); } else { - RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg, function(err,value) { - if (err) { - resolve(undefined); - } else { - resolve(value); - } - }); + done(undefined, value); + } + }); + } else if (rule.vt === 'json') { + done(undefined,"json"); // TODO: ?! invalid case + } else if (rule.vt === 'null') { + done(undefined,"null"); + } else { + RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg, function(err,value) { + if (err) { + done(undefined, undefined); + } else { + done(undefined, value); + } + }); + } + } + + function getV2(node,msg,rule,done) { + var v2 = rule.v2; + if (rule.v2t === 'prev') { + return done(undefined,node.previousValue); + } else if (rule.v2t === 'jsonata') { + RED.util.evaluateJSONataExpression(rule.v2,msg,(err,value) => { + if (err) { + done(RED._("switch.errors.invalid-expr",{error:err.message})); + } else { + done(undefined,value); + } + }); + } else if (typeof v2 !== 'undefined') { + RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg, function(err,value) { + if (err) { + done(undefined,undefined); + } else { + done(undefined,value); } }); } else { - if (rule.vt === 'prev') { - return node.previousValue; - } else if (rule.vt === 'jsonata') { - var exp = rule.v; - if (rule.t === 'jsonata_exp') { - if (hasParts) { - exp.assign("I", msg.parts.index); - exp.assign("N", msg.parts.count); - } - } - try { - return RED.util.evaluateJSONataExpression(exp,msg); - } catch(err) { - throw new Error(RED._("switch.errors.invalid-expr",{error:err.message})) - } - } else if (rule.vt === 'json') { - return "json"; // TODO: ?! invalid case - } else if (rule.vt === 'null') { - return "null"; - } else { - try { - return RED.util.evaluateNodeProperty(rule.v,rule.vt,node,msg); - } catch(err) { - return undefined; - } - } + done(undefined,v2); } } - function getV2(node,msg,rule) { - if (node.useAsyncRules) { - return new Promise((resolve,reject) => { - var v2 = rule.v2; - if (rule.v2t === 'prev') { - resolve(node.previousValue); - } else if (rule.v2t === 'jsonata') { - RED.util.evaluateJSONataExpression(rule.v2,msg,(err,value) => { - if (err) { - reject(RED._("switch.errors.invalid-expr",{error:err.message})); - } else { - resolve(value); - } - }); - } else if (typeof v2 !== 'undefined') { - RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg, function(err,value) { - if (err) { - resolve(undefined); - } else { - resolve(value); - } - }); + function applyRule(node, msg, property, state, done) { + var rule = node.rules[state.currentRule]; + var v1,v2; + + getV1(node,msg,rule,state.hasParts, (err,value) => { + if (err) { + return done(err); + } + v1 = value; + getV2(node,msg,rule, (err,value) => { + if (err) { + return done(err); + } + v2 = value; + if (rule.t == "else") { + property = state.elseflag; + state.elseflag = true; + } + if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) { + state.onward.push(msg); + state.elseflag = false; + if (node.checkall == "false") { + return done(undefined,false); + } } else { - resolve(v2); + state.onward.push(null); } - }) - } else { - var v2 = rule.v2; - if (rule.v2t === 'prev') { - return node.previousValue; - } else if (rule.v2t === 'jsonata') { - try { - return RED.util.evaluateJSONataExpression(rule.v2,msg); - } catch(err) { - throw new Error(RED._("switch.errors.invalid-expr",{error:err.message})) - } - } else if (typeof v2 !== 'undefined') { - try { - return RED.util.evaluateNodeProperty(rule.v2,rule.v2t,node,msg); - } catch(err) { - return undefined; - } - } else { - return v2; - } - } + done(undefined, state.currentRule < node.rules.length - 1); + }); + }); } - function applyRule(node, msg, property, state) { - if (node.useAsyncRules) { - return new Promise((resolve,reject) => { - - var rule = node.rules[state.currentRule]; - var v1,v2; - - getV1(node,msg,rule,state.hasParts).then(value => { - v1 = value; - }).then(()=>getV2(node,msg,rule)).then(value => { - v2 = value; - }).then(() => { - if (rule.t == "else") { - property = state.elseflag; - state.elseflag = true; - } - if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) { - state.onward.push(msg); - state.elseflag = false; - if (node.checkall == "false") { - return resolve(false); - } - } else { - state.onward.push(null); - } - resolve(state.currentRule < node.rules.length - 1); - }); - }) - } else { - var rule = node.rules[state.currentRule]; - var v1 = getV1(node,msg,rule,state.hasParts); - var v2 = getV2(node,msg,rule); - if (rule.t == "else") { - property = state.elseflag; - state.elseflag = true; - } - if (operators[rule.t](property,v1,v2,rule.case,msg.parts)) { - state.onward.push(msg); - state.elseflag = false; - if (node.checkall == "false") { - return false; - } - } else { - state.onward.push(null); - } - return state.currentRule < node.rules.length - 1 - } - } - - function applyRules(node, msg, property,state) { + function applyRules(node, msg, property,state,done) { if (!state) { state = { currentRule: 0, @@ -301,26 +212,18 @@ module.exports = function(RED) { msg.parts.hasOwnProperty("index") } } - if (node.useAsyncRules) { - return applyRule(node,msg,property,state).then(hasMore => { - if (hasMore) { - state.currentRule++; - return applyRules(node,msg,property,state); - } else { - node.previousValue = property; - return state.onward; - } - }); - } else { - var hasMore = applyRule(node,msg,property,state); + applyRule(node,msg,property,state,(err,hasMore) => { + if (err) { + return done(err); + } if (hasMore) { state.currentRule++; - return applyRules(node,msg,property,state); + applyRules(node,msg,property,state,done); } else { node.previousValue = property; - return state.onward; + done(undefined,state.onward); } - } + }); } @@ -345,13 +248,6 @@ module.exports = function(RED) { var valid = true; var repair = n.repair; var needsCount = repair; - this.useAsyncRules = ( - this.propertyType === 'flow' || - this.propertyType === 'global' || ( - this.propertyType === 'jsonata' && - /\$(flow|global)Context/.test(this.property) - ) - ); for (var i=0; i { + if (err) { + done(err); + } else { + if (msgs.length === 0) { + done() + } else { + drainMessageGroup(msgs,count,done); + } + } + }) + } + function addMessageToPending(msg,done) { var parts = msg.parts; // We've already checked the msg.parts has the require bits var group = addMessageToGroup(parts.id, msg, parts); var msgs = group.msgs; var count = group.count; - if (count === msgs.length) { + var msgsCount = msgs.length; + if (count === msgsCount) { // We have a complete group - send the individual parts - return msgs.reduce((promise, msg) => { - return promise.then((result) => { - msg.parts.count = count; - return processMessage(msg, false); - }) - }, Promise.resolve()).then( () => { - pendingCount -= group.msgs.length; + drainMessageGroup(msgs,count,err => { + pendingCount -= msgsCount; delete pendingIn[parts.id]; - }); + done(); + }) + return; } - return Promise.resolve(); + done(); } function sendGroup(onwards, port_count) { @@ -529,43 +420,33 @@ module.exports = function(RED) { } } - - - - - function processMessage(msg, checkParts) { + function processMessage(msg, checkParts, done) { var hasParts = msg.hasOwnProperty("parts") && msg.parts.hasOwnProperty("id") && msg.parts.hasOwnProperty("index"); if (needsCount && checkParts && hasParts) { - return addMessageToPending(msg); - } - if (node.useAsyncRules) { - return getProperty(node,msg) - .then(property => applyRules(node,msg,property)) - .then(onward => { - if (!repair || !hasParts) { - node.send(onward); - } - else { - sendGroupMessages(onward, msg); - } - }).catch(err => { - node.warn(err); - }); + addMessageToPending(msg,done); } else { - try { - var property = getProperty(node,msg); - var onward = applyRules(node,msg,property); - if (!repair || !hasParts) { - node.send(onward); + getProperty(node,msg,(err,property) => { + if (err) { + node.warn(err); + done(); } else { - sendGroupMessages(onward, msg); + applyRules(node,msg,property,undefined,(err,onward) => { + if (err) { + node.warn(err); + } else { + if (!repair || !hasParts) { + node.send(onward); + } else { + sendGroupMessages(onward, msg); + } + } + done(); + }); } - } catch(err) { - node.warn(err); - } + }); } } @@ -578,12 +459,13 @@ module.exports = function(RED) { } var pendingMessages = []; - var activeMessagePromise = null; + var handlingMessage = false; var processMessageQueue = function(msg) { if (msg) { + // A new message has arrived - add it to the message queue pendingMessages.push(msg); - if (activeMessagePromise !== null) { + if (handlingMessage) { // The node is currently processing a message, so do nothing // more with this message return; @@ -592,27 +474,24 @@ module.exports = function(RED) { if (pendingMessages.length === 0) { // There are no more messages to process, clear the active flag // and return - activeMessagePromise = null; + handlingMessage = false; return; } // There are more messages to process. Get the next message and // start processing it. Recurse back in to check for any more var nextMsg = pendingMessages.shift(); - activeMessagePromise = processMessage(nextMsg,true) - .then(processMessageQueue) - .catch((err) => { + handlingMessage = true; + processMessage(nextMsg,true,err => { + if (err) { node.error(err,nextMsg); - return processMessageQueue(); - }); + } + processMessageQueue() + }); } this.on('input', function(msg) { - if (node.useAsyncRules) { - processMessageQueue(msg); - } else { - processMessage(msg,true); - } + processMessageQueue(msg); }); this.on('close', function() {