From d57ec0cd5328e834f686a58b685d7edf067b691b Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Mon, 20 Jul 2020 16:48:47 +0100 Subject: [PATCH 01/11] Refactor lib/flows code to include initial router component --- .../@node-red/runtime/lib/api/flows.js | 14 ++--- .../runtime/lib/{nodes => }/flows/Flow.js | 10 +++- .../runtime/lib/{nodes => }/flows/Subflow.js | 6 +- .../runtime/lib/{nodes => }/flows/index.js | 10 ++-- .../runtime/lib/flows/router/index.js | 50 +++++++++++++++++ .../runtime/lib/flows/router/localRouter.js | 17 ++++++ .../runtime/lib/{nodes => }/flows/util.js | 0 .../@node-red/runtime/lib/index.js | 2 + .../@node-red/runtime/lib/nodes/Node.js | 55 ++++++------------- .../runtime/lib/nodes/context/index.js | 35 ------------ .../@node-red/runtime/lib/nodes/index.js | 4 +- .../@node-red/runtime/lib/api/flows_spec.js | 12 ++-- .../lib/{nodes => }/flows/Flow_spec.js | 6 +- .../lib/{nodes => }/flows/Subflow_spec.js | 8 +-- .../lib/{nodes => }/flows/index_spec.js | 4 +- .../lib/{nodes => }/flows/util_spec.js | 2 +- .../@node-red/runtime/lib/nodes/Node_spec.js | 25 ++++++--- .../@node-red/runtime/lib/nodes/index_spec.js | 2 +- 18 files changed, 147 insertions(+), 115 deletions(-) rename packages/node_modules/@node-red/runtime/lib/{nodes => }/flows/Flow.js (99%) rename packages/node_modules/@node-red/runtime/lib/{nodes => }/flows/Subflow.js (99%) rename packages/node_modules/@node-red/runtime/lib/{nodes => }/flows/index.js (99%) create mode 100644 packages/node_modules/@node-red/runtime/lib/flows/router/index.js create mode 100644 packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js rename packages/node_modules/@node-red/runtime/lib/{nodes => }/flows/util.js (100%) rename test/unit/@node-red/runtime/lib/{nodes => }/flows/Flow_spec.js (99%) rename test/unit/@node-red/runtime/lib/{nodes => }/flows/Subflow_spec.js (99%) rename test/unit/@node-red/runtime/lib/{nodes => }/flows/index_spec.js (99%) rename test/unit/@node-red/runtime/lib/{nodes => }/flows/util_spec.js (99%) diff --git a/packages/node_modules/@node-red/runtime/lib/api/flows.js b/packages/node_modules/@node-red/runtime/lib/api/flows.js index 4207c83ce..f97e75390 100644 --- a/packages/node_modules/@node-red/runtime/lib/api/flows.js +++ b/packages/node_modules/@node-red/runtime/lib/api/flows.js @@ -52,7 +52,7 @@ var api = module.exports = { getFlows: function(opts) { return new Promise(function(resolve,reject) { runtime.log.audit({event: "flows.get"}, opts.req); - return resolve(runtime.nodes.getFlows()); + return resolve(runtime.flows.getFlows()); }); }, /** @@ -75,10 +75,10 @@ var api = module.exports = { var apiPromise; if (deploymentType === 'reload') { - apiPromise = runtime.nodes.loadFlows(true); + apiPromise = runtime.flows.loadFlows(true); } else { if (flows.hasOwnProperty('rev')) { - var currentVersion = runtime.nodes.getFlows().rev; + var currentVersion = runtime.flows.getFlows().rev; if (currentVersion !== flows.rev) { var err; err = new Error(); @@ -114,7 +114,7 @@ var api = module.exports = { return mutex.runExclusive(function() { return new Promise(function (resolve, reject) { var flow = opts.flow; - runtime.nodes.addFlow(flow,opts.user).then(function (id) { + runtime.flows.addFlow(flow, opts.user).then(function (id) { runtime.log.audit({event: "flow.add", id: id}, opts.req); return resolve(id); }).catch(function (err) { @@ -141,7 +141,7 @@ var api = module.exports = { */ getFlow: function(opts) { return new Promise(function (resolve,reject) { - var flow = runtime.nodes.getFlow(opts.id); + var flow = runtime.flows.getFlow(opts.id); if (flow) { runtime.log.audit({event: "flow.get",id:opts.id}, opts.req); return resolve(flow); @@ -170,7 +170,7 @@ var api = module.exports = { var flow = opts.flow; var id = opts.id; try { - runtime.nodes.updateFlow(id, flow, opts.user).then(function () { + runtime.flows.updateFlow(id, flow, opts.user).then(function () { runtime.log.audit({event: "flow.update", id: id}, opts.req); return resolve(id); }).catch(function (err) { @@ -216,7 +216,7 @@ var api = module.exports = { return new Promise(function (resolve, reject) { var id = opts.id; try { - runtime.nodes.removeFlow(id, opts.user).then(function () { + runtime.flows.removeFlow(id, opts.user).then(function () { runtime.log.audit({event: "flow.remove", id: id}, opts.req); return resolve(); }).catch(function (err) { diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js similarity index 99% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js rename to packages/node_modules/@node-red/runtime/lib/flows/Flow.js index df4e810fa..aa5abab98 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -17,8 +17,10 @@ var clone = require("clone"); var redUtil = require("@node-red/util").util; var flowUtil = require("./util"); -var events = require("../../events"); -const context = require('../context'); +var events = require("../events"); +const context = require('../nodes/context'); +const router = require("./router"); + var Subflow; var Log; @@ -543,6 +545,10 @@ class Flow { return asyncMessageDelivery } + send(src,destinationId,msg) { + router.send(src,destinationId,msg); + } + dump() { console.log("==================") console.log(this.TYPE, this.id); diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Subflow.js b/packages/node_modules/@node-red/runtime/lib/flows/Subflow.js similarity index 99% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/Subflow.js rename to packages/node_modules/@node-red/runtime/lib/flows/Subflow.js index cca230d79..c651e77aa 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Subflow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Subflow.js @@ -16,14 +16,14 @@ const clone = require("clone"); const Flow = require('./Flow').Flow; -const context = require('../context'); +const context = require('../nodes/context'); const util = require("util"); const redUtil = require("@node-red/util").util; const flowUtil = require("./util"); -const credentials = require("../credentials"); +const credentials = require("../nodes/credentials"); var Log; @@ -159,7 +159,7 @@ class Subflow extends Flow { start(diff) { var self = this; // Create a subflow node to accept inbound messages and route appropriately - var Node = require("../Node"); + var Node = require("../nodes/Node"); if (this.subflowDef.status) { var subflowStatusConfig = { diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js similarity index 99% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js rename to packages/node_modules/@node-red/runtime/lib/flows/index.js index 38742daf3..157f384c8 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -22,13 +22,12 @@ var Flow = require('./Flow'); var typeRegistry = require("@node-red/registry"); var deprecated = typeRegistry.deprecated; - -var context = require("../context") -var credentials = require("../credentials"); - +var context = require("../nodes/context") +var credentials = require("../nodes/credentials"); +var router = require("./router"); var flowUtil = require("./util"); var log; -var events = require("../../events"); +var events = require("../events"); var redUtil = require("@node-red/util").util; var storage = null; @@ -71,6 +70,7 @@ function init(runtime) { } Flow.init(runtime); flowUtil.init(runtime); + router.init(runtime); } function loadFlows() { diff --git a/packages/node_modules/@node-red/runtime/lib/flows/router/index.js b/packages/node_modules/@node-red/runtime/lib/flows/router/index.js new file mode 100644 index 000000000..116019635 --- /dev/null +++ b/packages/node_modules/@node-red/runtime/lib/flows/router/index.js @@ -0,0 +1,50 @@ + +var settings; + +const LocalRouter = require("./localRouter"); +var defaultRouter; + + +class Router { + constructor(stack) { + this.stack = stack || []; + } + send(source,destinationId,msg) { + var pos = 0; + var next = () => { + var router = this.stack[pos++]; + if (router) { + router.send(source,destinationId,msg,next); + } + } + next(); + } +} + +function init(runtime) { + settings = runtime.settings; + + defaultRouter = new Router([ + new LocalRouter(), + new PostMessageLogger() + ]) + +} + +function send(source,destinationId,msg) { + defaultRouter.send(source,destinationId,msg); +} + +module.exports = { + init:init, + send: send +} + + + +class PostMessageLogger { + constructor() {} + send(source,destinationId,msg,next) { + console.log(source.id.padEnd(16),"->",destinationId.padEnd(16),JSON.stringify(msg)); + } +} diff --git a/packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js b/packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js new file mode 100644 index 000000000..96dcc3489 --- /dev/null +++ b/packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js @@ -0,0 +1,17 @@ + +class LocalRouter { + constructor() {} + send(source,destinationId,msg,next) { + var node = source._flow.getNode(destinationId); + if (node) { + setImmediate(function() { + node.receive(msg); + next(); + }); + } else if (next) { + next() + } + } +} + +module.exports = LocalRouter \ No newline at end of file diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/util.js b/packages/node_modules/@node-red/runtime/lib/flows/util.js similarity index 100% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/util.js rename to packages/node_modules/@node-red/runtime/lib/flows/util.js diff --git a/packages/node_modules/@node-red/runtime/lib/index.js b/packages/node_modules/@node-red/runtime/lib/index.js index 66bff6ff2..21e775393 100644 --- a/packages/node_modules/@node-red/runtime/lib/index.js +++ b/packages/node_modules/@node-red/runtime/lib/index.js @@ -19,6 +19,7 @@ var when = require('when'); var externalAPI = require("./api"); var redNodes = require("./nodes"); +var flows = require("./flows"); var storage = require("./storage"); var library = require("./library"); var events = require("./events"); @@ -273,6 +274,7 @@ var runtime = { storage: storage, events: events, nodes: redNodes, + flows: flows, library: library, exec: exec, util: require("@node-red/util").util, diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index 60b106426..e0bba6a65 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -20,7 +20,7 @@ var EventEmitter = require("events").EventEmitter; var redUtil = require("@node-red/util").util; var Log = require("@node-red/util").log; var context = require("./context"); -var flows = require("./flows"); +var flows = require("../flows"); const NOOP_SEND = function() {} @@ -55,10 +55,6 @@ function Node(n) { // as part of its constructor - config._flow will overwrite this._flow // which we can tolerate as they are the same object. Object.defineProperty(this,'_flow', {value: n._flow, enumerable: false, writable: true }) - this._asyncDelivery = n._flow.asyncMessageDelivery; - } - if (this._asyncDelivery === undefined) { - this._asyncDelivery = true; } this.updateWires(n.wires); } @@ -173,15 +169,7 @@ Node.prototype._emit = Node.prototype.emit; Node.prototype.emit = function(event, ...args) { var node = this; if (event === "input") { - // When Pluggable Message Routing arrives, this will be called from - // that and will already be sync/async depending on the router. - if (this._asyncDelivery) { - setImmediate(function() { - node._emitInput.apply(node,args); - }); - } else { - this._emitInput.apply(this,args); - } + this._emitInput.apply(this,args); } else { this._emit.apply(this,arguments); } @@ -366,11 +354,7 @@ Node.prototype.send = function(msg) { msg._msgid = redUtil.generateId(); } this.metric("send",msg); - node = this._flow.getNode(this._wire); - /* istanbul ignore else */ - if (node) { - node.receive(msg); - } + this._flow.send(this,this._wire,msg); return; } else { msg = [msg]; @@ -398,23 +382,20 @@ Node.prototype.send = function(msg) { var k = 0; // for each recipent node of that output for (var j = 0; j < wires.length; j++) { - node = this._flow.getNode(wires[j]); // node at end of wire j - if (node) { - // for each msg to send eg. [[m1, m2, ...], ...] - for (k = 0; k < msgs.length; k++) { - var m = msgs[k]; - if (m !== null && m !== undefined) { - /* istanbul ignore else */ - if (!sentMessageId) { - sentMessageId = m._msgid; - } - if (msgSent) { - var clonedmsg = redUtil.cloneMessage(m); - sendEvents.push({n:node,m:clonedmsg}); - } else { - sendEvents.push({n:node,m:m}); - msgSent = true; - } + // for each msg to send eg. [[m1, m2, ...], ...] + for (k = 0; k < msgs.length; k++) { + var m = msgs[k]; + if (m !== null && m !== undefined) { + /* istanbul ignore else */ + if (!sentMessageId) { + sentMessageId = m._msgid; + } + if (msgSent) { + var clonedmsg = redUtil.cloneMessage(m); + sendEvents.push({n:wires[j],m:clonedmsg}); + } else { + sendEvents.push({n:wires[j],m:m}); + msgSent = true; } } } @@ -434,7 +415,7 @@ Node.prototype.send = function(msg) { if (!ev.m._msgid) { ev.m._msgid = sentMessageId; } - ev.n.receive(ev.m); + this._flow.send(this,ev.n,ev.m); } }; diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/context/index.js b/packages/node_modules/@node-red/runtime/lib/nodes/context/index.js index c395c0615..fda3852ff 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/context/index.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/context/index.js @@ -18,7 +18,6 @@ var clone = require("clone"); var log = require("@node-red/util").log; var util = require("@node-red/util").util; var memory = require("./memory"); -var flows; var settings; @@ -48,7 +47,6 @@ function logUnknownStore(name) { } function init(_settings) { - flows = require("../flows"); settings = _settings; contexts = {}; stores = {}; @@ -513,39 +511,6 @@ function getContext(nodeId, flowId) { return newContext; } -// -// function getContext(localId,flowId,parent) { -// var contextId = localId; -// if (flowId) { -// contextId = localId+":"+flowId; -// } -// console.log("getContext",localId,flowId,"known?",contexts.hasOwnProperty(contextId)); -// if (contexts.hasOwnProperty(contextId)) { -// return contexts[contextId]; -// } -// var newContext = createContext(contextId,undefined,parent); -// if (flowId) { -// var node = flows.get(flowId); -// console.log("flows,get",flowId,node&&node.type) -// var parent = undefined; -// if (node && node.type.startsWith("subflow:")) { -// parent = node.context().flow; -// } -// else { -// parent = createRootContext(); -// } -// var flowContext = getContext(flowId,undefined,parent); -// Object.defineProperty(newContext, 'flow', { -// value: flowContext -// }); -// } -// Object.defineProperty(newContext, 'global', { -// value: contexts['global'] -// }) -// contexts[contextId] = newContext; -// return newContext; -// } - function deleteContext(id,flowId) { if(!hasConfiguredStore){ // only delete context if there's no configured storage. diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/index.js b/packages/node_modules/@node-red/runtime/lib/nodes/index.js index 705f10f5d..42fcd72f6 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/index.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/index.js @@ -23,8 +23,8 @@ var util = require("util"); var registry = require("@node-red/registry"); var credentials = require("./credentials"); -var flows = require("./flows"); -var flowUtil = require("./flows/util") +var flows = require("../flows"); +var flowUtil = require("../flows/util") var context = require("./context"); var Node = require("./Node"); var log; diff --git a/test/unit/@node-red/runtime/lib/api/flows_spec.js b/test/unit/@node-red/runtime/lib/api/flows_spec.js index dafbbc69b..c0b7d1cb3 100644 --- a/test/unit/@node-red/runtime/lib/api/flows_spec.js +++ b/test/unit/@node-red/runtime/lib/api/flows_spec.js @@ -37,7 +37,7 @@ describe("runtime-api/flows", function() { it("returns the current flow configuration", function(done) { flows.init({ log: mockLog(), - nodes: { + flows: { getFlows: function() { return [1,2,3] } } }); @@ -76,7 +76,7 @@ describe("runtime-api/flows", function() { }) flows.init({ log: mockLog(), - nodes: { + flows: { getFlows: function() { return {rev:"currentRev",flows:[]} }, setFlows: setFlows, loadFlows: loadFlows @@ -192,7 +192,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { addFlow: addFlow } }); @@ -225,7 +225,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { getFlow: getFlow } }); @@ -268,7 +268,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { updateFlow: updateFlow } }); @@ -324,7 +324,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { removeFlow: removeFlow } }); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js rename to test/unit/@node-red/runtime/lib/flows/Flow_spec.js index e1315dc50..9ce9aa30b 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js @@ -22,9 +22,9 @@ var util = require("util"); var NR_TEST_UTILS = require("nr-test-utils"); -var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util"); -var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util"); +var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry"); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js rename to test/unit/@node-red/runtime/lib/flows/Subflow_spec.js index 71bd9ad19..a55771391 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js @@ -22,11 +22,11 @@ var util = require("util"); var NR_TEST_UTILS = require("nr-test-utils"); -var Subflow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Subflow"); -var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow"); +var Subflow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Subflow"); +var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow"); -var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry"); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/index_spec.js b/test/unit/@node-red/runtime/lib/flows/index_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/index_spec.js rename to test/unit/@node-red/runtime/lib/flows/index_spec.js index 8865c5d3f..b8f3a25aa 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/index_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/index_spec.js @@ -20,13 +20,13 @@ var when = require("when"); var clone = require("clone"); var NR_TEST_UTILS = require("nr-test-utils"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var RED = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes"); var events = NR_TEST_UTILS.require("@node-red/runtime/lib/events"); var credentials = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/credentials"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry") -var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow"); +var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow"); describe('flows/index', function() { diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/util_spec.js b/test/unit/@node-red/runtime/lib/flows/util_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/util_spec.js rename to test/unit/@node-red/runtime/lib/flows/util_spec.js index b83a37660..8e1f15754 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/util_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/util_spec.js @@ -19,7 +19,7 @@ var sinon = require("sinon"); var when = require("when"); var clone = require("clone"); var NR_TEST_UTILS = require("nr-test-utils"); -var flowUtil = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util"); +var flowUtil = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry"); var redUtil = NR_TEST_UTILS.require("@node-red/util").util; diff --git a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js index 61b4446f7..3354c4215 100644 --- a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js @@ -19,7 +19,7 @@ var sinon = require('sinon'); var NR_TEST_UTILS = require("nr-test-utils"); var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var Log = NR_TEST_UTILS.require("@node-red/util").log; -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); describe('Node', function() { describe('#constructor',function() { @@ -208,6 +208,7 @@ describe('Node', function() { it('emits a single message', function(done) { var flow = { getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); @@ -255,7 +256,9 @@ describe('Node', function() { it('emits a single message - synchronous mode', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + send: (node,dst,msg) => { n2.receive(msg) ;}, asyncMessageDelivery: false }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); @@ -284,8 +287,10 @@ describe('Node', function() { it('emits a message with callback provided send', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - handleComplete: (node,msg) => {} + handleComplete: (node,msg) => {}, + send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); @@ -308,7 +313,9 @@ describe('Node', function() { it('emits multiple messages on a single output', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); @@ -338,14 +345,15 @@ describe('Node', function() { it('emits messages to multiple outputs', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3,'n4':n4,'n5':n5}[id]}, + send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} }; var n1 = new RedNode({_flow:flow, id:'n1',type:'abc',wires:[['n2'],['n3'],['n4','n5']]}); var n2 = new RedNode({_flow:flow, id:'n2',type:'abc'}); var n3 = new RedNode({_flow:flow, id:'n3',type:'abc'}); var n4 = new RedNode({_flow:flow, id:'n4',type:'abc'}); var n5 = new RedNode({_flow:flow, id:'n5',type:'abc'}); - var messages = [ {payload:"hello world"}, null, @@ -396,6 +404,7 @@ describe('Node', function() { it('emits no messages', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); @@ -414,20 +423,20 @@ describe('Node', function() { it('emits messages ignoring non-existent nodes', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + send: (node,dst,msg) => { setImmediate(function() { var n = flow.getNode(dst); n && n.receive(msg) })} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n9'],['n2']]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var messages = [ - {payload:"hello world"}, - {payload:"hello world again"} + {_msgid:"123", payload:"hello world"}, + {_msgid:"234", payload:"hello world again"} ]; - // only one message sent, so no copy needed n2.on('input',function(msg) { should.deepEqual(msg,messages[1]); - should.strictEqual(msg,messages[1]); done(); }); @@ -437,6 +446,7 @@ describe('Node', function() { it('emits messages without cloning req or res', function(done) { var flow = { getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3}[id]}, + send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[[['n2'],['n3']]]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); @@ -493,6 +503,7 @@ describe('Node', function() { Log.addHandler(logHandler); var flow = { getNode: (id) => { return {'n1':sender,'n2':receiver1,'n3':receiver2}[id]}, + send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} }; var sender = new RedNode({_flow:flow,id:'n1',type:'abc', wires:[['n2', 'n3']]}); diff --git a/test/unit/@node-red/runtime/lib/nodes/index_spec.js b/test/unit/@node-red/runtime/lib/nodes/index_spec.js index 66958196a..7a4992624 100644 --- a/test/unit/@node-red/runtime/lib/nodes/index_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/index_spec.js @@ -23,7 +23,7 @@ var inherits = require("util").inherits; var NR_TEST_UTILS = require("nr-test-utils"); var index = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/index"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var registry = NR_TEST_UTILS.require("@node-red/registry") var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); From bdd736315affa3602ee53d95df53cebea6b06f26 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Thu, 30 Jul 2020 17:52:11 +0100 Subject: [PATCH 02/11] Add RED.hooks engine --- Gruntfile.js | 1 + .../@node-red/registry/lib/util.js | 1 + .../@node-red/runtime/lib/flows/index.js | 2 - .../@node-red/runtime/lib/hooks.js | 165 +++++++++++++ .../@node-red/runtime/lib/index.js | 3 + packages/node_modules/node-red/lib/red.js | 8 + test/unit/@node-red/runtime/lib/hooks_spec.js | 221 ++++++++++++++++++ 7 files changed, 399 insertions(+), 2 deletions(-) create mode 100644 packages/node_modules/@node-red/runtime/lib/hooks.js create mode 100644 test/unit/@node-red/runtime/lib/hooks_spec.js diff --git a/Gruntfile.js b/Gruntfile.js index 1eaf3f6a9..ff711e654 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -457,6 +457,7 @@ module.exports = function(grunt) { 'packages/node_modules/@node-red/runtime/lib/index.js', 'packages/node_modules/@node-red/runtime/lib/api/*.js', 'packages/node_modules/@node-red/runtime/lib/events.js', + 'packages/node_modules/@node-red/runtime/lib/hooks.js', 'packages/node_modules/@node-red/util/**/*.js', 'packages/node_modules/@node-red/editor-api/lib/index.js', 'packages/node_modules/@node-red/editor-api/lib/auth/index.js' diff --git a/packages/node_modules/@node-red/registry/lib/util.js b/packages/node_modules/@node-red/registry/lib/util.js index 4e68caf51..5a6d3da38 100644 --- a/packages/node_modules/@node-red/registry/lib/util.js +++ b/packages/node_modules/@node-red/registry/lib/util.js @@ -57,6 +57,7 @@ function createNodeApi(node) { log: {}, settings: {}, events: runtime.events, + hooks: runtime.hooks, util: runtime.util, version: runtime.version, require: requireModule, diff --git a/packages/node_modules/@node-red/runtime/lib/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js index 157f384c8..ec46d8374 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -24,7 +24,6 @@ var deprecated = typeRegistry.deprecated; var context = require("../nodes/context") var credentials = require("../nodes/credentials"); -var router = require("./router"); var flowUtil = require("./util"); var log; var events = require("../events"); @@ -70,7 +69,6 @@ function init(runtime) { } Flow.init(runtime); flowUtil.init(runtime); - router.init(runtime); } function loadFlows() { diff --git a/packages/node_modules/@node-red/runtime/lib/hooks.js b/packages/node_modules/@node-red/runtime/lib/hooks.js new file mode 100644 index 000000000..b8333302f --- /dev/null +++ b/packages/node_modules/@node-red/runtime/lib/hooks.js @@ -0,0 +1,165 @@ +const Log = require("@node-red/util").log; + +// Flags for what hooks have handlers registered +let states = { } + +// Hooks by id +let hooks = { } + +// Hooks by label +let labelledHooks = { } + +/** + * Runtime hooks engine + * @mixin @node-red/runtime_hooks + */ + +/** + * Register a handler to a named hook + * @memberof @node-red/runtime_hooks + * @param {String} hookId - the name of the hook to attach to + * @param {Function} callback - the callback function for the hook + */ +function add(hookId, callback) { + let [id, label] = hookId.split("."); + if (label) { + if (labelledHooks[label] && labelledHooks[label][id]) { + throw new Error("Hook "+hookId+" already registered") + } + labelledHooks[label] = labelledHooks[label]||{}; + labelledHooks[label][id] = callback; + } + // Get location of calling code + const stack = new Error().stack; + const callModule = stack.split("\n")[2].split("(")[1].slice(0,-1); + Log.debug(`Adding hook '${hookId}' from ${callModule}`); + + hooks[id] = hooks[id] || []; + hooks[id].push({cb:callback,location:callModule}); + states[id] = true; +} + +/** + * Remove a handled from a named hook + * @memberof @node-red/runtime_hooks + * @param {String} hookId - the name of the hook event to remove - must be `name.label` + */ +function remove(hookId) { + let [id,label] = hookId.split("."); + if ( !label) { + throw new Error("Cannot remove hook without label: ",hookId) + } + Log.debug(`Removing hook '${hookId}'`); + if (labelledHooks[label]) { + if (id === "*") { + // Remove all hooks for this label + let hookList = Object.keys(labelledHooks[label]); + for (let i=0;i hook.cb === callback); + if (i !== -1) { + hooks[id].splice(i,1); + if (hooks[id].length === 0) { + delete hooks[id]; + delete states[id]; + } + } +} + + +function trigger(hookId, payload, done) { + const hookStack = hooks[hookId]; + if (!hookStack || hookStack.length === 0) { + done(); + return; + } + let i = 0; + + function callNextHook(err) { + if (i === hookStack.length || err) { + done(err); + return; + } + const hook = hookStack[i++]; + const callback = hook.cb; + if (callback.length === 1) { + try { + let result = callback(payload); + if (result === false) { + // Halting the flow + done(false); + return + } + if (result && typeof result.then === 'function') { + result.then(handleResolve, callNextHook) + return; + } + callNextHook(); + } catch(err) { + done(err); + return; + } + } else { + try { + callback(payload,handleResolve) + } catch(err) { + done(err); + return; + } + } + } + callNextHook(); + + function handleResolve(result) { + if (result === undefined) { + callNextHook(); + } else { + done(result); + } + } +} + +// add("preSend", function(sendEvents) { +// console.log("preSend",JSON.stringify(sendEvents)); +// }) +// add("preRoute", function(sendEvent) { +// console.log("preRoute",JSON.stringify(sendEvent.msg)); +// }) +// add("onSend", function(sendEvent) { +// console.log("onSend",JSON.stringify(sendEvent.msg)); +// }) +// add("postSend", function(sendEvent) { +// console.log("postSend",JSON.stringify(sendEvent.msg)); +// }) +// add("onReceive", function(recEvent) { +// console.log("onReceive",recEvent.destination.id,JSON.stringify(recEvent.msg)) +// }) +// add("postReceive", function(recEvent) { +// console.log("postReceive",recEvent.destination.id,JSON.stringify(recEvent.msg)) +// }) + +function clear() { + hooks = {} + labelledHooks = {} + states = {} +} +module.exports = { + get states() { return states }, + clear, + add, + remove, + trigger +} \ No newline at end of file diff --git a/packages/node_modules/@node-red/runtime/lib/index.js b/packages/node_modules/@node-red/runtime/lib/index.js index 21e775393..f9a772ea3 100644 --- a/packages/node_modules/@node-red/runtime/lib/index.js +++ b/packages/node_modules/@node-red/runtime/lib/index.js @@ -23,6 +23,7 @@ var flows = require("./flows"); var storage = require("./storage"); var library = require("./library"); var events = require("./events"); +var hooks = require("./hooks"); var settings = require("./settings"); var exec = require("./exec"); @@ -273,6 +274,7 @@ var runtime = { settings: settings, storage: storage, events: events, + hooks: hooks, nodes: redNodes, flows: flows, library: library, @@ -357,6 +359,7 @@ module.exports = { storage: storage, events: events, + hooks: hooks, util: require("@node-red/util").util, get httpNode() { return nodeApp }, get httpAdmin() { return adminApp }, diff --git a/packages/node_modules/node-red/lib/red.js b/packages/node_modules/node-red/lib/red.js index 9a4d77862..684ff6d8a 100644 --- a/packages/node_modules/node-red/lib/red.js +++ b/packages/node_modules/node-red/lib/red.js @@ -145,6 +145,14 @@ module.exports = { */ events: runtime.events, + /** + * Runtime hooks engine + * @see @node-red/runtime_hooks + * @memberof node-red + */ + hooks: runtime.hooks, + + /** * This provides access to the internal settings module of the * runtime. diff --git a/test/unit/@node-red/runtime/lib/hooks_spec.js b/test/unit/@node-red/runtime/lib/hooks_spec.js new file mode 100644 index 000000000..79c8c6bce --- /dev/null +++ b/test/unit/@node-red/runtime/lib/hooks_spec.js @@ -0,0 +1,221 @@ +const should = require("should"); +const NR_TEST_UTILS = require("nr-test-utils"); + +const hooks = NR_TEST_UTILS.require("@node-red/runtime/lib/hooks"); + +describe("runtime/hooks", function() { + afterEach(function() { + hooks.clear(); + }) + it("allows a hook to be registered", function(done) { + let calledWith = null; + should.not.exist(hooks.states.foo); + hooks.add("foo", function(payload) { calledWith = payload } ) + hooks.states.foo.should.be.true(); + let data = { a: 1 }; + hooks.trigger("foo",data,err => { + calledWith.should.equal(data); + done(err); + }) + }) + + it("calls hooks in the order they were registered", function(done) { + hooks.add("foo", function(payload) { payload.order.push("A") } ) + hooks.add("foo", function(payload) { payload.order.push("B") } ) + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql(["A","B"]) + done(err); + }) + }) + + it("does not allow multiple hooks with same id.label", function() { + hooks.add("foo.one", function(payload) { payload.order.push("A") } ); + (function() { + hooks.add("foo.one", function(payload) { payload.order.push("B") } ) + }).should.throw(); + }) + + it("removes labelled hook", function(done) { + hooks.add("foo.A", function(payload) { payload.order.push("A") } ) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + + hooks.remove("foo.A"); + hooks.states.foo.should.be.true(); + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + try { + data.order.should.eql(["B"]) + + hooks.remove("foo.B"); + should.not.exist(hooks.states.foo); + + done(err); + } catch(err2) { + done(err2); + } + }) + }) + + it("cannot remove unlabelled hook", function() { + hooks.add("foo", function(payload) { payload.order.push("A") } ); + (function() { + hooks.remove("foo") + }).should.throw(); + }) + it("removes all hooks with same label", function(done) { + hooks.add("foo.A", function(payload) { payload.order.push("A") } ) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.add("bar.A", function(payload) { payload.order.push("C") } ) + hooks.add("bar.B", function(payload) { payload.order.push("D") } ) + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql(["A","B"]) + hooks.trigger("bar", data, err => { + data.order.should.eql(["A","B","C","D"]) + + data.order = []; + + hooks.remove("*.A"); + + hooks.trigger("foo",data,err => { + data.order.should.eql(["B"]) + hooks.trigger("bar", data, err => { + data.order.should.eql(["B","D"]) + }) + done(err); + }) + }) + }) + }) + + + it("halts execution on return false", function(done) { + hooks.add("foo.A", function(payload) { payload.order.push("A"); return false } ) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql(["A"]) + err.should.be.false(); + done(); + }) + }) + it("halts execution on thrown error", function(done) { + hooks.add("foo.A", function(payload) { payload.order.push("A"); throw new Error("error") } ) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql(["A"]) + should.exist(err); + err.should.not.be.false() + done(); + }) + }) + + it("handler can use callback function", function(done) { + hooks.add("foo.A", function(payload, done) { + setTimeout(function() { + payload.order.push("A") + done() + },30) + }) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql(["A","B"]) + done(err); + }) + }) + + it("handler can use callback function - halt execution", function(done) { + hooks.add("foo.A", function(payload, done) { + setTimeout(function() { + payload.order.push("A") + done(false) + },30) + }) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql(["A"]) + err.should.be.false() + done(); + }) + }) + it("handler can use callback function - halt on error", function(done) { + hooks.add("foo.A", function(payload, done) { + setTimeout(function() { + done(new Error("test error")) + },30) + }) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql([]) + should.exist(err); + err.should.not.be.false() + done(); + }) + }) + + it("handler be an async function", function(done) { + hooks.add("foo.A", async function(payload) { + return new Promise(resolve => { + setTimeout(function() { + payload.order.push("A") + resolve() + },30) + }); + }) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql(["A","B"]) + done(err); + }) + }) + + it("handler be an async function - halt execution", function(done) { + hooks.add("foo.A", async function(payload) { + return new Promise(resolve => { + setTimeout(function() { + payload.order.push("A") + resolve(false) + },30) + }); + }) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql(["A"]) + done(err); + }) + }) + it("handler be an async function - halt on error", function(done) { + hooks.add("foo.A", async function(payload) { + return new Promise((resolve,reject) => { + setTimeout(function() { + reject(new Error("test error")) + },30) + }); + }) + hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("foo",data,err => { + data.order.should.eql([]) + should.exist(err); + err.should.not.be.false() + done(); + }) + }) +}); From 27c0e45940f2ac07594cd0853c6557c6cbe1199c Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Thu, 30 Jul 2020 17:52:28 +0100 Subject: [PATCH 03/11] Remove unused router component --- .../runtime/lib/flows/router/index.js | 50 ------------------- .../runtime/lib/flows/router/localRouter.js | 17 ------- 2 files changed, 67 deletions(-) delete mode 100644 packages/node_modules/@node-red/runtime/lib/flows/router/index.js delete mode 100644 packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js diff --git a/packages/node_modules/@node-red/runtime/lib/flows/router/index.js b/packages/node_modules/@node-red/runtime/lib/flows/router/index.js deleted file mode 100644 index 116019635..000000000 --- a/packages/node_modules/@node-red/runtime/lib/flows/router/index.js +++ /dev/null @@ -1,50 +0,0 @@ - -var settings; - -const LocalRouter = require("./localRouter"); -var defaultRouter; - - -class Router { - constructor(stack) { - this.stack = stack || []; - } - send(source,destinationId,msg) { - var pos = 0; - var next = () => { - var router = this.stack[pos++]; - if (router) { - router.send(source,destinationId,msg,next); - } - } - next(); - } -} - -function init(runtime) { - settings = runtime.settings; - - defaultRouter = new Router([ - new LocalRouter(), - new PostMessageLogger() - ]) - -} - -function send(source,destinationId,msg) { - defaultRouter.send(source,destinationId,msg); -} - -module.exports = { - init:init, - send: send -} - - - -class PostMessageLogger { - constructor() {} - send(source,destinationId,msg,next) { - console.log(source.id.padEnd(16),"->",destinationId.padEnd(16),JSON.stringify(msg)); - } -} diff --git a/packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js b/packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js deleted file mode 100644 index 96dcc3489..000000000 --- a/packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js +++ /dev/null @@ -1,17 +0,0 @@ - -class LocalRouter { - constructor() {} - send(source,destinationId,msg,next) { - var node = source._flow.getNode(destinationId); - if (node) { - setImmediate(function() { - node.receive(msg); - next(); - }); - } else if (next) { - next() - } - } -} - -module.exports = LocalRouter \ No newline at end of file From 08148a07b21433dab01a06e7c1d43b134530f7e3 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Thu, 30 Jul 2020 17:52:39 +0100 Subject: [PATCH 04/11] Update Node/Flow to trigger msg routing hooks --- .../@node-red/runtime/lib/flows/Flow.js | 79 ++- .../@node-red/runtime/lib/nodes/Node.js | 139 ++-- .../@node-red/runtime/lib/flows/Flow_spec.js | 368 +++++++++++ .../@node-red/runtime/lib/nodes/Node_spec.js | 608 +++++++++++------- 4 files changed, 900 insertions(+), 294 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index aa5abab98..9d97cea21 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -19,8 +19,7 @@ var redUtil = require("@node-red/util").util; var flowUtil = require("./util"); var events = require("../events"); const context = require('../nodes/context'); -const router = require("./router"); - +const hooks = require("../hooks"); var Subflow; var Log; @@ -545,8 +544,25 @@ class Flow { return asyncMessageDelivery } - send(src,destinationId,msg) { - router.send(src,destinationId,msg); + send(sendEvents) { + // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. + // preRoute - called once for each SendEvent object in turn + // preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed. + // postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) + // onReceive - a node is about to receive a message + // postReceive - the message has been passed to the node's input handler + // onDone, onError - the node has completed with a message or logged an error + handleOnSend(this,sendEvents, (err, eventData) => { + if (err) { + let srcNode; + if (Array.isArray(eventData)) { + srcNode = eventData[0].source.node; + } else { + srcNode = eventData.source.node; + } + srcNode.error(err); + } + }); } dump() { @@ -595,6 +611,61 @@ function stopNode(node,removed) { } +function handleOnSend(flow,sendEvents, reportError) { + // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. + hooks.trigger("onSend",sendEvents,(err) => { + if (err) { + reportError(err,sendEvents); + return + } else if (err !== false) { + for (var i=0;i { + if (err) { + reportError(err,sendEvent); + return; + } else if (err !== false) { + sendEvent.destination.node = flow.getNode(sendEvent.destination.id); + if (sendEvent.destination.node) { + if (sendEvent.cloneMessage) { + sendEvent.msg = redUtil.cloneMessage(sendEvent.msg); + } + handlePreDeliver(flow,sendEvent,reportError); + } + } + }) +} + +function handlePreDeliver(flow,sendEvent, reportError) { + // preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed. + hooks.trigger("preDeliver",sendEvent,(err) => { + if (err) { + reportError(err,sendEvent); + return; + } else if (err !== false) { + setImmediate(function() { + if (sendEvent.destination.node) { + sendEvent.destination.node.receive(sendEvent.msg); + + } + }) + // postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) + hooks.trigger("postDeliver", sendEvent, function(err) { + if (err) { + reportError(err,sendEvent); + } + }) + } + }) +} + module.exports = { init: function(runtime) { nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000; diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index e0bba6a65..dfc8b5ed1 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -21,6 +21,8 @@ var redUtil = require("@node-red/util").util; var Log = require("@node-red/util").log; var context = require("./context"); var flows = require("../flows"); +const hooks = require("../hooks"); + const NOOP_SEND = function() {} @@ -121,6 +123,11 @@ Node.prototype.context = function() { */ Node.prototype._complete = function(msg,error) { this.metric("done",msg); + hooks.trigger("onComplete",{ msg: msg, error: error, node: { id: this.id, node: this }}, err => { + if (err) { + this.error(err); + } + }) if (error) { // For now, delegate this to this.error // But at some point, the timeout handling will need to know about @@ -183,42 +190,53 @@ Node.prototype.emit = function(event, ...args) { Node.prototype._emitInput = function(arg) { var node = this; this.metric("receive", arg); - if (node._inputCallback) { - // Just one callback registered. - try { - node._inputCallback( - arg, - function() { node.send.apply(node,arguments) }, - function(err) { node._complete(arg,err); } - ); - } catch(err) { - node.error(err,arg); - } - } else if (node._inputCallbacks) { - // Multiple callbacks registered. Call each one, tracking eventual completion - var c = node._inputCallbacks.length; - for (var i=0;i { + if (err) { + node.error(err); + return + } else if (err !== false) { + if (node._inputCallback) { + // Just one callback registered. + try { + node._inputCallback( + arg, + function() { node.send.apply(node,arguments) }, + function(err) { node._complete(arg,err); } + ); + } catch(err) { + node.error(err,arg); + } + } else if (node._inputCallbacks) { + // Multiple callbacks registered. Call each one, tracking eventual completion + var c = node._inputCallbacks.length; + for (var i=0;i {if (err) { node.error(err) }}); } - } + }); } /** @@ -354,7 +372,19 @@ Node.prototype.send = function(msg) { msg._msgid = redUtil.generateId(); } this.metric("send",msg); - this._flow.send(this,this._wire,msg); + this._flow.send([{ + msg: msg, + source: { + id: this.id, + node: this, + port: 0 + }, + destination: { + id: this._wire, + node: undefined + }, + cloneMessage: false + }]); return; } else { msg = [msg]; @@ -368,7 +398,7 @@ Node.prototype.send = function(msg) { var sendEvents = []; var sentMessageId = null; - + var hasMissingIds = false; // for each output of node eg. [msgs to output 0, msgs to output 1, ...] for (var i = 0; i < numOutputs; i++) { var wires = this.wires[i]; // wires leaving output i @@ -386,17 +416,27 @@ Node.prototype.send = function(msg) { for (k = 0; k < msgs.length; k++) { var m = msgs[k]; if (m !== null && m !== undefined) { + if (!m._msgid) { + hasMissingIds = true; + } /* istanbul ignore else */ if (!sentMessageId) { sentMessageId = m._msgid; } - if (msgSent) { - var clonedmsg = redUtil.cloneMessage(m); - sendEvents.push({n:wires[j],m:clonedmsg}); - } else { - sendEvents.push({n:wires[j],m:m}); - msgSent = true; - } + sendEvents.push({ + msg: m, + source: { + id: this.id, + node: this, + port: i + }, + destination: { + id: wires[j], + node: undefined + }, + cloneMessage: msgSent + }); + msgSent = true; } } } @@ -409,21 +449,22 @@ Node.prototype.send = function(msg) { } this.metric("send",{_msgid:sentMessageId}); - for (i=0;i { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + var messageReceived = false; + n2.receive = function(msg) { + messageReceived = true; + try { + msg.should.be.exactly(message); + shutdownTest(); + } catch(err) { + shutdownTest(err); + } + } + + var message = {payload:"hello"} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: false + }]) + messageReceived.should.be.false() + }) + it("sends a message - cloning", function(done) { + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + + n2.receive = function(msg) { + try { + // Message should be cloned + msg.should.be.eql(message); + msg.should.not.be.exactly(message); + shutdownTest(); + } catch(err) { + shutdownTest(err); + } + } + + var message = {payload:"hello"} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + }) + it("sends multiple messages", function(done) { + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + + var messageCount = 0; + n2.receive = function(msg) { + try { + msg.should.be.exactly(messages[messageCount++]); + if (messageCount === 2) { + shutdownTest(); + } + } catch(err) { + shutdownTest(err); + } + } + + var messages = [{payload:"hello"},{payload:"world"}]; + + flow.send([{ + msg: messages[0], + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined } + },{ + msg: messages[1], + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined } + }]) + }) + it("sends a message - triggers hooks", function(done) { + var hookErrors = []; + var messageReceived = false; + var hooksCalled = []; + hooks.add("onSend", function(sendEvents) { + hooksCalled.push("onSend") + try { + messageReceived.should.be.false() + sendEvents.should.have.length(1); + sendEvents[0].msg.should.be.exactly(message); + } catch(err) { + hookErrors.push(err); + } + }) + hooks.add("preRoute", function(sendEvent) { + hooksCalled.push("preRoute") + try { + messageReceived.should.be.false() + sendEvent.msg.should.be.exactly(message); + should.not.exist(sendEvent.destination.node) + } catch(err) { + hookErrors.push(err); + } + + }) + hooks.add("preDeliver", function(sendEvent) { + hooksCalled.push("preDeliver") + try { + messageReceived.should.be.false() + // Cloning should have happened + sendEvent.msg.should.not.be.exactly(message); + // Destinatino node populated + should.exist(sendEvent.destination.node) + } catch(err) { + hookErrors.push(err); + } + + }) + hooks.add("postDeliver", function(sendEvent) { + hooksCalled.push("postDeliver") + try { + messageReceived.should.be.false() + + } catch(err) { + hookErrors.push(err); + } + + }) + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + n2.receive = function(msg) { + messageReceived = true; + try { + msg.should.be.eql(message); + msg.should.not.be.exactly(message); + hooksCalled.should.eql(["onSend","preRoute","preDeliver","postDeliver"]) + if (hookErrors.length > 0) { + shutdownTest(hookErrors[0]) + } else { + shutdownTest(); + } + } catch(err) { + shutdownTest(err); + } + } + + var message = {payload:"hello"} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + }) + + describe("errors thrown by hooks are reported to the sending node", function() { + var flow; + var n1,n2; + var messageReceived = false; + var errorReceived = null; + before(function() { + hooks.add("onSend", function(sendEvents) { + if (sendEvents[0].msg.payload === "trigger-onSend") { + throw new Error("onSend Error"); + } + }) + hooks.add("preRoute", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preRoute") { + throw new Error("preRoute Error"); + } + }) + hooks.add("preDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preDeliver") { + throw new Error("preDeliver Error"); + } + }) + hooks.add("postDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-postDeliver") { + throw new Error("postDeliver Error"); + } + }) + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + n1 = flow.getNode('1'); + n2 = flow.getNode('2'); + n2.receive = function(msg) { + messageReceived = true; + } + n1.error = function(err) { + errorReceived = err; + } + + }) + after(function(done) { + hooks.clear(); + flow.stop().then(() => { done() }); + }) + beforeEach(function() { + messageReceived = false; + errorReceived = null; + }) + function testHook(hook, msgExpected, done) { + var message = {payload:"trigger-"+hook} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + setTimeout(function() { + try { + messageReceived.should.equal(msgExpected); + should.exist(errorReceived) + errorReceived.toString().should.containEql(hook); + done(); + } catch(err) { + done(err); + } + },10) + } + + it("onSend", function(done) { testHook("onSend", false, done) }) + it("preRoute", function(done) { testHook("preRoute", false, done) }) + it("preDeliver", function(done) { testHook("preDeliver", false, done) }) + it("postDeliver", function(done) { testHook("postDeliver", true, done) }) + }) + + describe("hooks can stop the sending of messages", function() { + var flow; + var n1,n2; + var messageReceived = false; + var errorReceived = false; + before(function() { + hooks.add("onSend", function(sendEvents) { + if (sendEvents[0].msg.payload === "trigger-onSend") { + return false + } + }) + hooks.add("preRoute", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preRoute") { + return false + } + }) + hooks.add("preDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preDeliver") { + return false + } + }) + hooks.add("postDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-postDeliver") { + return false + } + }) + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + n1 = flow.getNode('1'); + n2 = flow.getNode('2'); + n2.receive = function(msg) { + messageReceived = true; + } + n1.error = function(err) { + errorReceived = true; + } + + }) + after(function(done) { + hooks.clear(); + flow.stop().then(() => { done() }); + }) + function testSend(payload,messageReceivedExpected,errorReceivedExpected,done) { + messageReceived = false; + errorReceived = false; + flow.send([{ + msg: {payload: payload}, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + setTimeout(function() { + try { + messageReceived.should.eql(messageReceivedExpected) + errorReceived.should.eql(errorReceivedExpected) + done(); + } catch(err) { + done(err); + } + },10) + } + function testHook(hook, done) { + testSend("pass",true,false,err => { + if (err) { + done(err) + } else { + testSend("trigger-"+hook,false,false,done); + } + }) + } + + it("onSend", function(done) { testHook("onSend", done) }) + it("preRoute", function(done) { testHook("preRoute", done) }) + it("preDeliver", function(done) { testHook("preDeliver", done) }) + // postDeliver happens after delivery is scheduled so cannot stop it + // it("postDeliver", function(done) { testHook("postDeliver", done) }) + }) + }) + }); diff --git a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js index 3354c4215..df362dc85 100644 --- a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js @@ -19,6 +19,7 @@ var sinon = require('sinon'); var NR_TEST_UTILS = require("nr-test-utils"); var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var Log = NR_TEST_UTILS.require("@node-red/util").log; +var hooks = NR_TEST_UTILS.require("@node-red/runtime/lib/hooks"); var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); describe('Node', function() { @@ -181,6 +182,93 @@ describe('Node', function() { }); n.receive(message); }); + + it('triggers onComplete hook when done callback provided', function(done) { + var handleCompleteCalled = false; + var hookCalled = false; + var n = new RedNode({id:'123',type:'abc', _flow: { + handleComplete: function(node,msg) { + handleCompleteCalled = true; + } + }}); + var hookError; + hooks.add("onComplete",function(completeEvent) { + hookCalled = true; + try { + handleCompleteCalled.should.be.false("onComplete should be called before handleComplete") + should.not.exist(completeEvent.error); + completeEvent.msg.should.deepEqual(message); + completeEvent.node.id.should.eql("123"); + completeEvent.node.node.should.equal(n); + } catch(err) { + hookError = err; + } + }) + var message = {payload:"hello world"}; + n.on('input',function(msg, nodeSend, nodeDone) { + nodeDone(); + }); + n.receive(message); + setTimeout(function() { + if (hookError) { + done(hookError); + return + } + try { + hookCalled.should.be.true("onComplete hook should be called"); + handleCompleteCalled.should.be.true("handleComplete should be called"); + done(); + } catch(err) { + done(err); + } + }) + }); + + it('triggers onComplete hook when done callback provided - with error', function(done) { + var handleCompleteCalled = false; + var hookCalled = false; + var errorReported = false; + var n = new RedNode({id:'123',type:'abc', _flow: { + handleComplete: function(node,msg) { + handleCompleteCalled = true; + } + }}); + var hookError; + hooks.add("onComplete",function(completeEvent) { + hookCalled = true; + try { + handleCompleteCalled.should.be.false("onComplete should be called before handleComplete") + should.exist(completeEvent.error); + completeEvent.error.toString().should.equal("Error: test error") + completeEvent.msg.should.deepEqual(message); + completeEvent.node.id.should.eql("123"); + completeEvent.node.node.should.equal(n); + } catch(err) { + hookError = err; + } + }) + var message = {payload:"hello world"}; + n.on('input',function(msg, nodeSend, nodeDone) { + nodeDone(new Error("test error")); + }); + n.error = function(err,msg) { + errorReported = true; + } + n.receive(message); + setTimeout(function() { + if (hookError) { + done(hookError); + return + } + try { + hookCalled.should.be.true("onComplete hook should be called"); + handleCompleteCalled.should.be.false("handleComplete should not be called"); + done(); + } catch(err) { + done(err); + } + }) + }); it('logs error if callback provides error', function(done) { var n = new RedNode({id:'123',type:'abc'}); sinon.stub(n,"error",function(err,msg) {}); @@ -201,153 +289,250 @@ describe('Node', function() { }); n.receive(message); }); + it("triggers hooks when receiving a message", function(done) { + var hookErrors = []; + var messageReceived = false; + var hooksCalled = []; + hooks.add("onReceive", function(receiveEvent) { + hooksCalled.push("onReceive") + try { + messageReceived.should.be.false("Message should not have been received before onReceive") + receiveEvent.msg.should.be.exactly(message); + receiveEvent.destination.id.should.equal("123") + receiveEvent.destination.node.should.equal(n) + } catch(err) { + hookErrors.push(err); + } + }) + hooks.add("postReceive", function(receiveEvent) { + hooksCalled.push("postReceive") + try { + messageReceived.should.be.true("Message should have been received before postReceive") + receiveEvent.msg.should.be.exactly(message); + receiveEvent.destination.id.should.equal("123") + receiveEvent.destination.node.should.equal(n) + } catch(err) { + hookErrors.push(err); + } + + }) + var n = new RedNode({id:'123',type:'abc'}); + var message = {payload:"hello world"}; + n.on('input',function(msg) { + messageReceived = true; + try { + should.strictEqual(this,n); + hooksCalled.should.eql(["onReceive"]) + should.deepEqual(msg,message); + } catch(err) { + hookErrors.push(err) + } + }); + n.receive(message); + setTimeout(function() { + hooks.clear(); + if (hookErrors.length > 0) { + done(hookErrors[0]) + } else { + done(); + } + },10); + }); + describe("errors thrown by hooks are reported", function() { + before(function() { + hooks.add("onReceive",function(recEvent) { + if (recEvent.msg.payload === "trigger-onReceive") { + throw new Error("onReceive Error") + } + }) + hooks.add("postReceive",function(recEvent) { + if (recEvent.msg.payload === "trigger-postReceive") { + throw new Error("postReceive Error") + } + }) + }) + after(function() { + hooks.clear(); + }) + function testHook(hook, msgExpected, done) { + var messageReceived = false; + var errorReceived; + var n = new RedNode({id:'123',type:'abc'}); + var message = {payload:"trigger-"+hook}; + n.on('input',function(msg) { + messageReceived = true; + }); + n.error = function (err) { + errorReceived = err; + } + + n.receive(message); + + setTimeout(function() { + try { + messageReceived.should.equal(msgExpected,`Hook ${hook} messageReceived expected ${msgExpected} actual ${messageReceived}`); + should.exist(errorReceived); + errorReceived.toString().should.containEql(hook) + done() + } catch(err) { + done(err); + } + },10); + } + it("onReceive", function(done) { testHook("onReceive", false, done)}) + it("postReceive", function(done) { testHook("postReceive", true, done)}) + }) }); + describe("hooks can halt receive", function() { + before(function() { + hooks.add("onReceive",function(recEvent) { + if (recEvent.msg.payload === "trigger-onReceive") { + return false; + } + }) + }) + after(function() { + hooks.clear(); + }) + + function testHook(hook, msgExpected, done) { + var messageReceived = false; + var errorReceived; + var n = new RedNode({id:'123',type:'abc'}); + var message = {payload:"trigger-"+hook}; + n.on('input',function(msg) { + messageReceived = true; + }); + n.error = function (err) { + errorReceived = err; + } + + n.receive(message); + + setTimeout(function() { + try { + messageReceived.should.equal(msgExpected,`Hook ${hook} messageReceived expected ${msgExpected} actual ${messageReceived}`); + should.not.exist(errorReceived); + done() + } catch(err) { + done(err); + } + },10); + } + it("onReceive", function(done) { testHook("onReceive", false, done)}) + }) + + describe('#send', function() { it('emits a single message', function(done) { var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} + send: (sendEvents) => { + try { + sendEvents.should.have.length(1); + sendEvents[0].msg.should.equal(message); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}) + done(); + } catch(err) { + done(err); + } + }, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var message = {payload:"hello world"}; - var messageReceived = false; - n2.on('input',function(msg) { - // msg equals message, and is not a new copy - messageReceived = true; - should.strictEqual(this,n2); - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(); - }); n1.send(message); - messageReceived.should.be.false(); - }); - - it('emits a single message - multiple input event listeners', function(done) { - var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var message = {payload:"hello world"}; - var messageReceived = 0; - n2.on('input',function(msg) { - // msg equals message, and is not a new copy - messageReceived++; - messageReceived.should.be.exactly(1); - should.strictEqual(this,n2); - should.deepEqual(msg,message); - should.strictEqual(msg,message); - }); - n2.on('input',function(msg) { - messageReceived++; - messageReceived.should.be.exactly(2); - should.strictEqual(this,n2); - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(); - }); - n1.send(message); - messageReceived.should.be.exactly(0); - }); - - it('emits a single message - synchronous mode', function(done) { - var flow = { - handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - send: (node,dst,msg) => { n2.receive(msg) ;}, - asyncMessageDelivery: false - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var message = {payload:"hello world"}; - var messageReceived = false; - var notSyncErr; - n2.on('input',function(msg) { - try { - // msg equals message, and is not a new copy - messageReceived = true; - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(notSyncErr); - } catch(err) { - done(err); - } - }); - n1.send(message); - try { - messageReceived.should.be.true(); - } catch(err) { - notSyncErr = err; - } }); it('emits a message with callback provided send', function(done) { var flow = { handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, handleComplete: (node,msg) => {}, - send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} + send: (sendEvents) => { + try { + sendEvents.should.have.length(1); + sendEvents[0].msg.should.equal(message); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[0].cloneMessage.should.be.false(); + done(); + } catch(err) { + done(err); + } + }, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var message = {payload:"hello world"}; - var messageReceived = false; n1.on('input',function(msg,nodeSend,nodeDone) { nodeSend(msg); nodeDone(); }); - n2.on('input',function(msg) { - // msg equals message, and is not a new copy - messageReceived = true; - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(); - }); n1.receive(message); - messageReceived.should.be.false(); }); it('emits multiple messages on a single output', function(done) { var flow = { handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} + send: (sendEvents) => { + try { + sendEvents.should.have.length(2); + sendEvents[0].msg.should.equal(messages[0]); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[0].cloneMessage.should.be.false(); + + sendEvents[1].msg.should.equal(messages[1]); + sendEvents[1].destination.should.eql({id:"n2", node: undefined}); + sendEvents[1].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[1].cloneMessage.should.be.true(); + + done(); + } catch(err) { + done(err); + } + }, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var messages = [ {payload:"hello world"}, {payload:"hello world again"} ]; - var rcvdCount = 0; - - n2.on('input',function(msg) { - if (rcvdCount === 0) { - // first msg sent, don't clone - should.deepEqual(msg,messages[rcvdCount]); - should.strictEqual(msg,messages[rcvdCount]); - rcvdCount += 1; - } else { - // second msg sent, clone - msg.payload.should.equal(messages[rcvdCount].payload); - should.notStrictEqual(msg,messages[rcvdCount]); - done(); - } - }); n1.send([messages]); }); it('emits messages to multiple outputs', function(done) { var flow = { handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3,'n4':n4,'n5':n5}[id]}, - send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} + send: (sendEvents) => { + try { + sendEvents.should.have.length(3); + sendEvents[0].msg.should.equal(messages[0]); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[0].cloneMessage.should.be.false(); + should.exist(sendEvents[0].msg._msgid); + sendEvents[1].msg.should.equal(messages[2]); + sendEvents[1].destination.should.eql({id:"n4", node: undefined}); + sendEvents[1].source.should.eql({id:"n1", node: n1, port: 2}) + sendEvents[1].cloneMessage.should.be.true(); + should.exist(sendEvents[1].msg._msgid); + sendEvents[2].msg.should.equal(messages[2]); + sendEvents[2].destination.should.eql({id:"n5", node: undefined}); + sendEvents[2].source.should.eql({id:"n1", node: n1, port: 2}) + sendEvents[2].cloneMessage.should.be.true(); + should.exist(sendEvents[2].msg._msgid); + + sendEvents[0].msg._msgid.should.eql(sendEvents[1].msg._msgid) + sendEvents[1].msg._msgid.should.eql(sendEvents[2].msg._msgid) + + done(); + } catch(err) { + done(err); + } + } }; var n1 = new RedNode({_flow:flow, id:'n1',type:'abc',wires:[['n2'],['n3'],['n4','n5']]}); var n2 = new RedNode({_flow:flow, id:'n2',type:'abc'}); @@ -362,43 +547,6 @@ describe('Node', function() { var rcvdCount = 0; - // first message sent, don't clone - // message uuids should match - n2.on('input',function(msg) { - should.deepEqual(msg,messages[0]); - should.strictEqual(msg,messages[0]); - rcvdCount += 1; - if (rcvdCount == 3) { - done(); - } - }); - - n3.on('input',function(msg) { - should.fail(null,null,"unexpected message"); - }); - - // second message sent, clone - // message uuids wont match since we've cloned - n4.on('input',function(msg) { - msg.payload.should.equal(messages[2].payload); - should.notStrictEqual(msg,messages[2]); - rcvdCount += 1; - if (rcvdCount == 3) { - done(); - } - }); - - // third message sent, clone - // message uuids wont match since we've cloned - n5.on('input',function(msg) { - msg.payload.should.equal(messages[2].payload); - should.notStrictEqual(msg,messages[2]); - rcvdCount += 1; - if (rcvdCount == 3) { - done(); - } - }); - n1.send(messages); }); @@ -421,107 +569,85 @@ describe('Node', function() { n1.send(); }); - it('emits messages ignoring non-existent nodes', function(done) { - var flow = { - handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - send: (node,dst,msg) => { setImmediate(function() { var n = flow.getNode(dst); n && n.receive(msg) })} - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n9'],['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + // it('emits messages without cloning req or res', function(done) { + // var flow = { + // getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3}[id]}, + // send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} + // }; + // var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[[['n2'],['n3']]]}); + // var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + // var n3 = new RedNode({_flow:flow,id:'n3',type:'abc'}); + // + // var req = {}; + // var res = {}; + // var cloned = {}; + // var message = {payload: "foo", cloned: cloned, req: req, res: res}; + // + // var rcvdCount = 0; + // + // // first message to be sent, so should not be cloned + // n2.on('input',function(msg) { + // should.deepEqual(msg, message); + // msg.cloned.should.be.exactly(message.cloned); + // msg.req.should.be.exactly(message.req); + // msg.res.should.be.exactly(message.res); + // rcvdCount += 1; + // if (rcvdCount == 2) { + // done(); + // } + // }); + // + // // second message to be sent, so should be cloned + // // message uuids wont match since we've cloned + // n3.on('input',function(msg) { + // msg.payload.should.equal(message.payload); + // msg.cloned.should.not.be.exactly(message.cloned); + // msg.req.should.be.exactly(message.req); + // msg.res.should.be.exactly(message.res); + // rcvdCount += 1; + // if (rcvdCount == 2) { + // done(); + // } + // }); + // + // n1.send(message); + // }); - var messages = [ - {_msgid:"123", payload:"hello world"}, - {_msgid:"234", payload:"hello world again"} - ]; - - n2.on('input',function(msg) { - should.deepEqual(msg,messages[1]); - done(); - }); - - n1.send(messages); - }); - - it('emits messages without cloning req or res', function(done) { - var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3}[id]}, - send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[[['n2'],['n3']]]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var n3 = new RedNode({_flow:flow,id:'n3',type:'abc'}); - - var req = {}; - var res = {}; - var cloned = {}; - var message = {payload: "foo", cloned: cloned, req: req, res: res}; - - var rcvdCount = 0; - - // first message to be sent, so should not be cloned - n2.on('input',function(msg) { - should.deepEqual(msg, message); - msg.cloned.should.be.exactly(message.cloned); - msg.req.should.be.exactly(message.req); - msg.res.should.be.exactly(message.res); - rcvdCount += 1; - if (rcvdCount == 2) { - done(); - } - }); - - // second message to be sent, so should be cloned - // message uuids wont match since we've cloned - n3.on('input',function(msg) { - msg.payload.should.equal(message.payload); - msg.cloned.should.not.be.exactly(message.cloned); - msg.req.should.be.exactly(message.req); - msg.res.should.be.exactly(message.res); - rcvdCount += 1; - if (rcvdCount == 2) { - done(); - } - }); - - n1.send(message); - }); - - it("logs the uuid for all messages sent", function(done) { - var logHandler = { - msgIds:[], - messagesSent: 0, - emit: function(event, msg) { - if (msg.event == "node.abc.send" && msg.level == Log.METRIC) { - this.messagesSent++; - this.msgIds.push(msg.msgid); - (typeof msg.msgid).should.not.be.equal("undefined"); - } - } - }; - - Log.addHandler(logHandler); - var flow = { - getNode: (id) => { return {'n1':sender,'n2':receiver1,'n3':receiver2}[id]}, - send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} - }; - - var sender = new RedNode({_flow:flow,id:'n1',type:'abc', wires:[['n2', 'n3']]}); - var receiver1 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var receiver2 = new RedNode({_flow:flow,id:'n3',type:'abc'}); - sender.send({"some": "message"}); - setTimeout(function() { - try { - logHandler.messagesSent.should.equal(1); - should.exist(logHandler.msgIds[0]) - Log.removeHandler(logHandler); - done(); - } catch(err) { - Log.removeHandler(logHandler); - done(err); - } - },50) - }) + // it("logs the uuid for all messages sent", function(done) { + // var logHandler = { + // msgIds:[], + // messagesSent: 0, + // emit: function(event, msg) { + // if (msg.event == "node.abc.send" && msg.level == Log.METRIC) { + // this.messagesSent++; + // this.msgIds.push(msg.msgid); + // (typeof msg.msgid).should.not.be.equal("undefined"); + // } + // } + // }; + // + // Log.addHandler(logHandler); + // var flow = { + // getNode: (id) => { return {'n1':sender,'n2':receiver1,'n3':receiver2}[id]}, + // send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} + // }; + // + // var sender = new RedNode({_flow:flow,id:'n1',type:'abc', wires:[['n2', 'n3']]}); + // var receiver1 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + // var receiver2 = new RedNode({_flow:flow,id:'n3',type:'abc'}); + // sender.send({"some": "message"}); + // setTimeout(function() { + // try { + // logHandler.messagesSent.should.equal(1); + // should.exist(logHandler.msgIds[0]) + // Log.removeHandler(logHandler); + // done(); + // } catch(err) { + // Log.removeHandler(logHandler); + // done(err); + // } + // },50) + // }) }); From 6f25337b999c66f6a1ea60b9f0da8e6e28997136 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Fri, 31 Jul 2020 14:31:33 +0100 Subject: [PATCH 05/11] Add docs for RED.hooks --- .../@node-red/runtime/lib/hooks.js | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/hooks.js b/packages/node_modules/@node-red/runtime/lib/hooks.js index b8333302f..10303ed55 100644 --- a/packages/node_modules/@node-red/runtime/lib/hooks.js +++ b/packages/node_modules/@node-red/runtime/lib/hooks.js @@ -11,6 +11,18 @@ let labelledHooks = { } /** * Runtime hooks engine + * + * The following hooks can be used: + * + * Message sending + * - `onSend` - passed an array of `SendEvent` objects. The messages inside these objects are exactly what the node has passed to `node.send` - meaning there could be duplicate references to the same message object. + * - `preRoute` - passed a `SendEvent` + * - `preDeliver` - passed a `SendEvent`. The local router has identified the node it is going to send to. At this point, the message has been cloned if needed. + * - `postDeliver` - passed a `SendEvent`. The message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) + * - `onReceive` - passed a `ReceiveEvent` when a node is about to receive a message + * - `postReceive` - passed a `ReceiveEvent` when the message has been given to the node's `input` handler(s) + * - `onComplete` - passed a `CompleteEvent` when the node has completed with a message or logged an error + * * @mixin @node-red/runtime_hooks */ @@ -132,25 +144,6 @@ function trigger(hookId, payload, done) { } } -// add("preSend", function(sendEvents) { -// console.log("preSend",JSON.stringify(sendEvents)); -// }) -// add("preRoute", function(sendEvent) { -// console.log("preRoute",JSON.stringify(sendEvent.msg)); -// }) -// add("onSend", function(sendEvent) { -// console.log("onSend",JSON.stringify(sendEvent.msg)); -// }) -// add("postSend", function(sendEvent) { -// console.log("postSend",JSON.stringify(sendEvent.msg)); -// }) -// add("onReceive", function(recEvent) { -// console.log("onReceive",recEvent.destination.id,JSON.stringify(recEvent.msg)) -// }) -// add("postReceive", function(recEvent) { -// console.log("postReceive",recEvent.destination.id,JSON.stringify(recEvent.msg)) -// }) - function clear() { hooks = {} labelledHooks = {} From 460e1f5563c69b94f669b6d3ca8259c5a2bbc299 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Mon, 21 Sep 2020 14:43:03 +0100 Subject: [PATCH 06/11] Fixup merge error --- packages/node_modules/@node-red/runtime/lib/api/flows.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node_modules/@node-red/runtime/lib/api/flows.js b/packages/node_modules/@node-red/runtime/lib/api/flows.js index f97e75390..005775e67 100644 --- a/packages/node_modules/@node-red/runtime/lib/api/flows.js +++ b/packages/node_modules/@node-red/runtime/lib/api/flows.js @@ -88,7 +88,7 @@ var api = module.exports = { return reject(err); } } - apiPromise = runtime.nodes.setFlows(flows.flows,flows.credentials,deploymentType,null,null,opts.user); + apiPromise = runtime.flows.setFlows(flows.flows,flows.credentials,deploymentType,null,null,opts.user); } apiPromise.then(function(flowId) { return resolve({rev:flowId}); From 605177dcf01157a16f5d882168b08ac197cf747c Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Tue, 29 Sep 2020 16:28:52 +0100 Subject: [PATCH 07/11] Validate hook names when they are added --- .../@node-red/runtime/lib/hooks.js | 26 +++- test/unit/@node-red/runtime/lib/hooks_spec.js | 133 +++++++++++------- 2 files changed, 107 insertions(+), 52 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/hooks.js b/packages/node_modules/@node-red/runtime/lib/hooks.js index 10303ed55..4ae537cc4 100644 --- a/packages/node_modules/@node-red/runtime/lib/hooks.js +++ b/packages/node_modules/@node-red/runtime/lib/hooks.js @@ -1,5 +1,17 @@ const Log = require("@node-red/util").log; +const VALID_HOOKS = [ + // Message Routing Path + "onSend", + "preRoute", + "preDeliver", + "postDeliver", + "onReceive", + "postReceive", + "onComplete" +] + + // Flags for what hooks have handlers registered let states = { } @@ -34,6 +46,9 @@ let labelledHooks = { } */ function add(hookId, callback) { let [id, label] = hookId.split("."); + if (VALID_HOOKS.indexOf(id) === -1) { + throw new Error(`Invalid hook '${id}'`); + } if (label) { if (labelledHooks[label] && labelledHooks[label][id]) { throw new Error("Hook "+hookId+" already registered") @@ -149,8 +164,17 @@ function clear() { labelledHooks = {} states = {} } + +function has(hookId) { + let [id, label] = hookId.split("."); + if (label) { + return !!(labelledHooks[label] && labelledHooks[label][id]) + } + return !!states[id] +} + module.exports = { - get states() { return states }, + has, clear, add, remove, diff --git a/test/unit/@node-red/runtime/lib/hooks_spec.js b/test/unit/@node-red/runtime/lib/hooks_spec.js index 79c8c6bce..4fb95b68a 100644 --- a/test/unit/@node-red/runtime/lib/hooks_spec.js +++ b/test/unit/@node-red/runtime/lib/hooks_spec.js @@ -9,48 +9,79 @@ describe("runtime/hooks", function() { }) it("allows a hook to be registered", function(done) { let calledWith = null; - should.not.exist(hooks.states.foo); - hooks.add("foo", function(payload) { calledWith = payload } ) - hooks.states.foo.should.be.true(); + hooks.has("onSend").should.be.false(); + hooks.add("onSend", function(payload) { calledWith = payload } ) + hooks.has("onSend").should.be.true(); let data = { a: 1 }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { calledWith.should.equal(data); done(err); }) }) - + it("rejects invalid hook id", function(done) { + try { + hooks.add("foo", function(payload) {}) + done(new Error("Invalid hook accepted")) + } catch(err) { + done(); + } + }) it("calls hooks in the order they were registered", function(done) { - hooks.add("foo", function(payload) { payload.order.push("A") } ) - hooks.add("foo", function(payload) { payload.order.push("B") } ) + hooks.add("onSend", function(payload) { payload.order.push("A") } ) + hooks.add("onSend", function(payload) { payload.order.push("B") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql(["A","B"]) done(err); }) }) it("does not allow multiple hooks with same id.label", function() { - hooks.add("foo.one", function(payload) { payload.order.push("A") } ); + hooks.has("onSend.one").should.be.false(); + hooks.has("onSend").should.be.false(); + hooks.add("onSend.one", function(payload) { payload.order.push("A") } ); + hooks.has("onSend.one").should.be.true(); + hooks.has("onSend").should.be.true(); (function() { - hooks.add("foo.one", function(payload) { payload.order.push("B") } ) + hooks.add("onSend.one", function(payload) { payload.order.push("B") } ) }).should.throw(); }) it("removes labelled hook", function(done) { - hooks.add("foo.A", function(payload) { payload.order.push("A") } ) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.has("onSend.A").should.be.false(); + hooks.has("onSend.B").should.be.false(); + hooks.has("onSend").should.be.false(); + + hooks.add("onSend.A", function(payload) { payload.order.push("A") } ) + + hooks.has("onSend.A").should.be.true(); + hooks.has("onSend.B").should.be.false(); + hooks.has("onSend").should.be.true(); + + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + hooks.has("onSend.A").should.be.true(); + hooks.has("onSend.B").should.be.true(); + hooks.has("onSend").should.be.true(); + + hooks.remove("onSend.A"); + + hooks.has("onSend.A").should.be.false(); + hooks.has("onSend.B").should.be.true(); + hooks.has("onSend").should.be.true(); - hooks.remove("foo.A"); - hooks.states.foo.should.be.true(); let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { try { data.order.should.eql(["B"]) - hooks.remove("foo.B"); - should.not.exist(hooks.states.foo); + hooks.remove("onSend.B"); + hooks.has("onSend.A").should.be.false(); + hooks.has("onSend.B").should.be.false(); + hooks.has("onSend").should.be.false(); + done(err); } catch(err2) { done(err2); @@ -59,30 +90,30 @@ describe("runtime/hooks", function() { }) it("cannot remove unlabelled hook", function() { - hooks.add("foo", function(payload) { payload.order.push("A") } ); + hooks.add("onSend", function(payload) { payload.order.push("A") } ); (function() { - hooks.remove("foo") + hooks.remove("onSend") }).should.throw(); }) it("removes all hooks with same label", function(done) { - hooks.add("foo.A", function(payload) { payload.order.push("A") } ) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) - hooks.add("bar.A", function(payload) { payload.order.push("C") } ) - hooks.add("bar.B", function(payload) { payload.order.push("D") } ) + hooks.add("onSend.A", function(payload) { payload.order.push("A") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + hooks.add("preRoute.A", function(payload) { payload.order.push("C") } ) + hooks.add("preRoute.B", function(payload) { payload.order.push("D") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql(["A","B"]) - hooks.trigger("bar", data, err => { + hooks.trigger("preRoute", data, err => { data.order.should.eql(["A","B","C","D"]) data.order = []; hooks.remove("*.A"); - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql(["B"]) - hooks.trigger("bar", data, err => { + hooks.trigger("preRoute", data, err => { data.order.should.eql(["B","D"]) }) done(err); @@ -93,22 +124,22 @@ describe("runtime/hooks", function() { it("halts execution on return false", function(done) { - hooks.add("foo.A", function(payload) { payload.order.push("A"); return false } ) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.add("onSend.A", function(payload) { payload.order.push("A"); return false } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql(["A"]) err.should.be.false(); done(); }) }) it("halts execution on thrown error", function(done) { - hooks.add("foo.A", function(payload) { payload.order.push("A"); throw new Error("error") } ) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.add("onSend.A", function(payload) { payload.order.push("A"); throw new Error("error") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql(["A"]) should.exist(err); err.should.not.be.false() @@ -117,47 +148,47 @@ describe("runtime/hooks", function() { }) it("handler can use callback function", function(done) { - hooks.add("foo.A", function(payload, done) { + hooks.add("onSend.A", function(payload, done) { setTimeout(function() { payload.order.push("A") done() },30) }) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql(["A","B"]) done(err); }) }) it("handler can use callback function - halt execution", function(done) { - hooks.add("foo.A", function(payload, done) { + hooks.add("onSend.A", function(payload, done) { setTimeout(function() { payload.order.push("A") done(false) },30) }) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql(["A"]) err.should.be.false() done(); }) }) it("handler can use callback function - halt on error", function(done) { - hooks.add("foo.A", function(payload, done) { + hooks.add("onSend.A", function(payload, done) { setTimeout(function() { done(new Error("test error")) },30) }) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql([]) should.exist(err); err.should.not.be.false() @@ -166,7 +197,7 @@ describe("runtime/hooks", function() { }) it("handler be an async function", function(done) { - hooks.add("foo.A", async function(payload) { + hooks.add("onSend.A", async function(payload) { return new Promise(resolve => { setTimeout(function() { payload.order.push("A") @@ -174,17 +205,17 @@ describe("runtime/hooks", function() { },30) }); }) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql(["A","B"]) done(err); }) }) it("handler be an async function - halt execution", function(done) { - hooks.add("foo.A", async function(payload) { + hooks.add("onSend.A", async function(payload) { return new Promise(resolve => { setTimeout(function() { payload.order.push("A") @@ -192,26 +223,26 @@ describe("runtime/hooks", function() { },30) }); }) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql(["A"]) done(err); }) }) it("handler be an async function - halt on error", function(done) { - hooks.add("foo.A", async function(payload) { + hooks.add("onSend.A", async function(payload) { return new Promise((resolve,reject) => { setTimeout(function() { reject(new Error("test error")) },30) }); }) - hooks.add("foo.B", function(payload) { payload.order.push("B") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) let data = { order:[] }; - hooks.trigger("foo",data,err => { + hooks.trigger("onSend",data,err => { data.order.should.eql([]) should.exist(err); err.should.not.be.false() From 22a301b55e78f9eb1d82c32e370e116a4058cde8 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Tue, 29 Sep 2020 16:29:10 +0100 Subject: [PATCH 08/11] Add flows:* events and deprecate nodes-* events --- .../@node-red/runtime/lib/events.js | 29 +++++++++++++++++-- .../@node-red/runtime/lib/flows/index.js | 9 ++++++ .../@node-red/runtime/lib/flows/index_spec.js | 15 +++++----- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/events.js b/packages/node_modules/@node-red/runtime/lib/events.js index ed280a618..8e6935222 100644 --- a/packages/node_modules/@node-red/runtime/lib/events.js +++ b/packages/node_modules/@node-red/runtime/lib/events.js @@ -14,9 +14,34 @@ * limitations under the License. **/ -var events = require("events"); +const events = new (require("events")).EventEmitter(); -module.exports = new events.EventEmitter(); + +const deprecatedEvents = { + "nodes-stopped": "flows:stopped", + "nodes-started": "flows:started" +} + +function wrapEventFunction(obj,func) { + events["_"+func] = events[func]; + return function(eventName, listener) { + if (deprecatedEvents.hasOwnProperty(eventName)) { + const log = require("@node-red/util").log; + const stack = (new Error().stack).split("\n")[2].split("(")[1].slice(0,-1); + log.warn(`[RED.events] Deprecated use of "${eventName}" event from "${stack}". Use "${deprecatedEvents[eventName]}" instead.`) + } + return events["_"+func].call(events,eventName,listener) + } +} + + +events.on = wrapEventFunction(events,"on"); +events.once = wrapEventFunction(events,"once"); +events.addListener = events.on; + + + +module.exports = events; /** * Runtime events emitter diff --git a/packages/node_modules/@node-red/runtime/lib/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js index ec46d8374..3ac4eddd4 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -28,6 +28,7 @@ var flowUtil = require("./util"); var log; var events = require("../events"); var redUtil = require("@node-red/util").util; +const hooks = require("../hooks"); var storage = null; var settings = null; @@ -292,6 +293,8 @@ function start(type,diff,muteLog) { } } + events.emit("flows:starting", {config: activeConfig, type: type, diff: diff}) + var id; if (type === "full") { // A full start means everything should @@ -352,6 +355,8 @@ function start(type,diff,muteLog) { } } } + events.emit("flows:started", {config: activeConfig, type: type, diff: diff}); + // Deprecated event events.emit("nodes-started"); if (credentialsPendingReset === true) { @@ -399,6 +404,8 @@ function stop(type,diff,muteLog) { stopList = diff.changed.concat(diff.removed).concat(diff.linked); } + events.emit("flows:stopping",{config: activeConfig, type: type, diff: diff}) + for (var id in activeFlows) { if (activeFlows.hasOwnProperty(id)) { var flowStateChanged = diff && (diff.added.indexOf(id) !== -1 || diff.removed.indexOf(id) !== -1); @@ -430,6 +437,8 @@ function stop(type,diff,muteLog) { log.info(log._("nodes.flows.stopped-flows")); } } + events.emit("flows:stopped",{config: activeConfig, type: type, diff: diff}); + // Deprecated event events.emit("nodes-stopped"); }); } diff --git a/test/unit/@node-red/runtime/lib/flows/index_spec.js b/test/unit/@node-red/runtime/lib/flows/index_spec.js index b8f3a25aa..df5012844 100644 --- a/test/unit/@node-red/runtime/lib/flows/index_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/index_spec.js @@ -209,8 +209,7 @@ describe('flows/index', function() { storage.getFlows = function() { return when.resolve({flows:originalConfig}); } - - events.once('nodes-started',function() { + events.once('flows:started',function() { flows.setFlows(newConfig,"nodes").then(function() { flows.getFlows().flows.should.eql(newConfig); flowCreate.flows['t1'].update.called.should.be.true(); @@ -239,7 +238,7 @@ describe('flows/index', function() { return when.resolve({flows:originalConfig}); } - events.once('nodes-started',function() { + events.once('flows:started',function() { flows.setFlows(newConfig,"nodes").then(function() { flows.getFlows().flows.should.eql(newConfig); flowCreate.flows['t1'].update.called.should.be.true(); @@ -301,7 +300,7 @@ describe('flows/index', function() { return when.resolve({flows:originalConfig}); } - events.once('nodes-started',function() { + events.once('flows:started',function() { Object.keys(flowCreate.flows).should.eql(['_GLOBAL_','t1']); done(); }); @@ -398,7 +397,7 @@ describe('flows/index', function() { // return when.resolve({flows:originalConfig}); // } // - // events.once('nodes-started',function() { + // events.once('flows:started',function() { // flows.handleError(originalConfig[0],"message",{}); // flowCreate.flows['t1'].handleError.called.should.be.true(); // done(); @@ -423,7 +422,7 @@ describe('flows/index', function() { // return when.resolve({flows:originalConfig}); // } // - // events.once('nodes-started',function() { + // events.once('flows:started',function() { // flows.handleError(originalConfig[0],"message",{}); // try { // flowCreate.flows['t1'].handleError.called.should.be.true(); @@ -451,7 +450,7 @@ describe('flows/index', function() { // return when.resolve({flows:originalConfig}); // } // - // events.once('nodes-started',function() { + // events.once('flows:started',function() { // flows.handleStatus(originalConfig[0],"message"); // flowCreate.flows['t1'].handleStatus.called.should.be.true(); // done(); @@ -477,7 +476,7 @@ describe('flows/index', function() { // return when.resolve({flows:originalConfig}); // } // - // events.once('nodes-started',function() { + // events.once('flows:started',function() { // flows.handleStatus(originalConfig[0],"message"); // try { // flowCreate.flows['t1'].handleStatus.called.should.be.true(); From ea45dde63a3d0272fb2fc744c313d76f26cdba2f Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Tue, 29 Sep 2020 17:20:01 +0100 Subject: [PATCH 09/11] Remove when.js from runtime/lib/flow/index --- .../@node-red/runtime/lib/api/flows.js | 285 ++++++++---------- .../@node-red/runtime/lib/flows/index.js | 15 +- 2 files changed, 131 insertions(+), 169 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/api/flows.js b/packages/node_modules/@node-red/runtime/lib/api/flows.js index 005775e67..9240a85d3 100644 --- a/packages/node_modules/@node-red/runtime/lib/api/flows.js +++ b/packages/node_modules/@node-red/runtime/lib/api/flows.js @@ -49,11 +49,9 @@ var api = module.exports = { * @return {Promise} - the active flow configuration * @memberof @node-red/runtime_flows */ - getFlows: function(opts) { - return new Promise(function(resolve,reject) { - runtime.log.audit({event: "flows.get"}, opts.req); - return resolve(runtime.flows.getFlows()); - }); + getFlows: async function(opts) { + runtime.log.audit({event: "flows.get"}, opts.req); + return runtime.flows.getFlows(); }, /** * Sets the current flow configuration @@ -65,38 +63,35 @@ var api = module.exports = { * @return {Promise} - the active flow configuration * @memberof @node-red/runtime_flows */ - setFlows: function(opts) { - return mutex.runExclusive(function() { - return new Promise(function(resolve,reject) { + setFlows: async function(opts) { + return mutex.runExclusive(async function() { + var flows = opts.flows; + var deploymentType = opts.deploymentType||"full"; + runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req); - var flows = opts.flows; - var deploymentType = opts.deploymentType||"full"; - runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req); - - var apiPromise; - if (deploymentType === 'reload') { - apiPromise = runtime.flows.loadFlows(true); - } else { - if (flows.hasOwnProperty('rev')) { - var currentVersion = runtime.flows.getFlows().rev; - if (currentVersion !== flows.rev) { - var err; - err = new Error(); - err.code = "version_mismatch"; - err.status = 409; - //TODO: log warning - return reject(err); - } + var apiPromise; + if (deploymentType === 'reload') { + apiPromise = runtime.flows.loadFlows(true); + } else { + if (flows.hasOwnProperty('rev')) { + var currentVersion = runtime.flows.getFlows().rev; + if (currentVersion !== flows.rev) { + var err; + err = new Error(); + err.code = "version_mismatch"; + err.status = 409; + //TODO: log warning + throw err; } - apiPromise = runtime.flows.setFlows(flows.flows,flows.credentials,deploymentType,null,null,opts.user); } - apiPromise.then(function(flowId) { - return resolve({rev:flowId}); - }).catch(function(err) { - runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message})); - runtime.log.warn(err.stack); - return reject(err); - }); + apiPromise = runtime.flows.setFlows(flows.flows,flows.credentials,deploymentType,null,null,opts.user); + } + return apiPromise.then(function(flowId) { + return {rev:flowId}; + }).catch(function(err) { + runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message})); + runtime.log.warn(err.stack); + throw err }); }); }, @@ -110,22 +105,20 @@ var api = module.exports = { * @return {Promise} - the id of the added flow * @memberof @node-red/runtime_flows */ - addFlow: function(opts) { - return mutex.runExclusive(function() { - return new Promise(function (resolve, reject) { - var flow = opts.flow; - runtime.flows.addFlow(flow, opts.user).then(function (id) { - runtime.log.audit({event: "flow.add", id: id}, opts.req); - return resolve(id); - }).catch(function (err) { - runtime.log.audit({ - event: "flow.add", - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - }) + addFlow: async function(opts) { + return mutex.runExclusive(async function() { + var flow = opts.flow; + return runtime.flows.addFlow(flow, opts.user).then(function (id) { + runtime.log.audit({event: "flow.add", id: id}, opts.req); + return id; + }).catch(function (err) { + runtime.log.audit({ + event: "flow.add", + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; + throw err; }) }); }, @@ -139,20 +132,18 @@ var api = module.exports = { * @return {Promise} - the active flow configuration * @memberof @node-red/runtime_flows */ - getFlow: function(opts) { - return new Promise(function (resolve,reject) { - var flow = runtime.flows.getFlow(opts.id); - if (flow) { - runtime.log.audit({event: "flow.get",id:opts.id}, opts.req); - return resolve(flow); - } else { - runtime.log.audit({event: "flow.get",id:opts.id,error:"not_found"}, opts.req); - var err = new Error(); - err.code = "not_found"; - err.status = 404; - return reject(err); - } - }) + getFlow: async function(opts) { + var flow = runtime.flows.getFlow(opts.id); + if (flow) { + runtime.log.audit({event: "flow.get",id:opts.id}, opts.req); + return flow; + } else { + runtime.log.audit({event: "flow.get",id:opts.id,error:"not_found"}, opts.req); + var err = new Error(); + err.code = "not_found"; + err.status = 404; + throw err; + } }, /** * Updates an existing flow configuration @@ -164,42 +155,29 @@ var api = module.exports = { * @return {Promise} - the id of the updated flow * @memberof @node-red/runtime_flows */ - updateFlow: function(opts) { - return mutex.runExclusive(function() { - return new Promise(function (resolve, reject) { - var flow = opts.flow; - var id = opts.id; - try { - runtime.flows.updateFlow(id, flow, opts.user).then(function () { - runtime.log.audit({event: "flow.update", id: id}, opts.req); - return resolve(id); - }).catch(function (err) { - runtime.log.audit({ - event: "flow.update", - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - }) - } catch (err) { - if (err.code === 404) { - runtime.log.audit({event: "flow.update", id: id, error: "not_found"}, opts.req); - // TODO: this swap around of .code and .status isn't ideal - err.status = 404; - err.code = "not_found"; - return reject(err); - } else { - runtime.log.audit({ - event: "flow.update", - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - } + updateFlow: async function(opts) { + return mutex.runExclusive(async function() { + var flow = opts.flow; + var id = opts.id; + return runtime.flows.updateFlow(id, flow, opts.user).then(function () { + runtime.log.audit({event: "flow.update", id: id}, opts.req); + return id; + }).catch(function (err) { + if (err.code === 404) { + runtime.log.audit({event: "flow.update", id: id, error: "not_found"}, opts.req); + // TODO: this swap around of .code and .status isn't ideal + err.status = 404; + err.code = "not_found"; + } else { + runtime.log.audit({ + event: "flow.update", + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; } - }); + throw err; + }) }); }, /** @@ -211,42 +189,29 @@ var api = module.exports = { * @return {Promise} - resolves if successful * @memberof @node-red/runtime_flows */ - deleteFlow: function(opts) { + deleteFlow: async function(opts) { return mutex.runExclusive(function() { - return new Promise(function (resolve, reject) { - var id = opts.id; - try { - runtime.flows.removeFlow(id, opts.user).then(function () { - runtime.log.audit({event: "flow.remove", id: id}, opts.req); - return resolve(); - }).catch(function (err) { - runtime.log.audit({ - event: "flow.remove", - id: id, - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - }); - } catch (err) { - if (err.code === 404) { - runtime.log.audit({event: "flow.remove", id: id, error: "not_found"}, opts.req); - // TODO: this swap around of .code and .status isn't ideal - err.status = 404; - err.code = "not_found"; - return reject(err); - } else { - runtime.log.audit({ - event: "flow.remove", - id: id, - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - } + var id = opts.id; + return runtime.flows.removeFlow(id, opts.user).then(function () { + runtime.log.audit({event: "flow.remove", id: id}, opts.req); + return resolve(); + }).catch(function (err) { + if (err.code === 404) { + runtime.log.audit({event: "flow.remove", id: id, error: "not_found"}, opts.req); + // TODO: this swap around of .code and .status isn't ideal + err.status = 404; + err.code = "not_found"; + return reject(err); + } else { + runtime.log.audit({ + event: "flow.remove", + id: id, + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; } + throw err; }); }); }, @@ -261,35 +226,33 @@ var api = module.exports = { * @return {Promise} - the safe credentials * @memberof @node-red/runtime_flows */ - getNodeCredentials: function(opts) { - return new Promise(function(resolve,reject) { - runtime.log.audit({event: "credentials.get",type:opts.type,id:opts.id}, opts.req); - var credentials = runtime.nodes.getCredentials(opts.id); - if (!credentials) { - return resolve({}); - } - var sendCredentials = {}; - var cred; - if (/^subflow(:|$)/.test(opts.type)) { - for (cred in credentials) { - if (credentials.hasOwnProperty(cred)) { - sendCredentials['has_'+cred] = credentials[cred] != null && credentials[cred] !== ''; - } - } - } else { - var definition = runtime.nodes.getCredentialDefinition(opts.type) || {}; - for (cred in definition) { - if (definition.hasOwnProperty(cred)) { - if (definition[cred].type == "password") { - var key = 'has_' + cred; - sendCredentials[key] = credentials[cred] != null && credentials[cred] !== ''; - continue; - } - sendCredentials[cred] = credentials[cred] || ''; - } + getNodeCredentials: async function(opts) { + runtime.log.audit({event: "credentials.get",type:opts.type,id:opts.id}, opts.req); + var credentials = runtime.nodes.getCredentials(opts.id); + if (!credentials) { + return {}; + } + var sendCredentials = {}; + var cred; + if (/^subflow(:|$)/.test(opts.type)) { + for (cred in credentials) { + if (credentials.hasOwnProperty(cred)) { + sendCredentials['has_'+cred] = credentials[cred] != null && credentials[cred] !== ''; } } - resolve(sendCredentials); - }) + } else { + var definition = runtime.nodes.getCredentialDefinition(opts.type) || {}; + for (cred in definition) { + if (definition.hasOwnProperty(cred)) { + if (definition[cred].type == "password") { + var key = 'has_' + cred; + sendCredentials[key] = credentials[cred] != null && credentials[cred] !== ''; + continue; + } + sendCredentials[cred] = credentials[cred] || ''; + } + } + } + return sendCredentials; } } diff --git a/packages/node_modules/@node-red/runtime/lib/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js index 3ac4eddd4..147f62ace 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -15,7 +15,6 @@ **/ var clone = require("clone"); -var when = require("when"); var Flow = require('./Flow'); @@ -488,7 +487,7 @@ function updateMissingTypes() { } } -function addFlow(flow, user) { +async function addFlow(flow, user) { var i,node; if (!flow.hasOwnProperty('nodes')) { throw new Error('missing nodes property'); @@ -513,10 +512,10 @@ function addFlow(flow, user) { node = flow.nodes[i]; if (activeFlowConfig.allNodes[node.id]) { // TODO nls - return when.reject(new Error('duplicate id')); + throw new Error('duplicate id'); } if (node.type === 'tab' || node.type === 'subflow') { - return when.reject(new Error('invalid node type: '+node.type)); + throw new Error('invalid node type: '+node.type); } node.z = flow.id; nodes.push(node); @@ -526,10 +525,10 @@ function addFlow(flow, user) { node = flow.configs[i]; if (activeFlowConfig.allNodes[node.id]) { // TODO nls - return when.reject(new Error('duplicate id')); + throw new Error('duplicate id'); } if (node.type === 'tab' || node.type === 'subflow') { - return when.reject(new Error('invalid node type: '+node.type)); + throw new Error('invalid node type: '+node.type); } node.z = flow.id; nodes.push(node); @@ -614,7 +613,7 @@ function getFlow(id) { return result; } -function updateFlow(id,newFlow, user) { +async function updateFlow(id,newFlow, user) { var label = id; if (id !== 'global') { if (!activeFlowConfig.flows[id]) { @@ -674,7 +673,7 @@ function updateFlow(id,newFlow, user) { }) } -function removeFlow(id, user) { +async function removeFlow(id, user) { if (id === 'global') { // TODO: nls + error code throw new Error('not allowed to remove global'); From 7a90fe5aecd9ea5bdc7fd5c03631f4f39dd6d780 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Tue, 29 Sep 2020 17:35:43 +0100 Subject: [PATCH 10/11] Fix flow api unit tests --- packages/node_modules/@node-red/runtime/lib/api/flows.js | 3 +-- test/unit/@node-red/runtime/lib/api/flows_spec.js | 8 ++++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/api/flows.js b/packages/node_modules/@node-red/runtime/lib/api/flows.js index 9240a85d3..811d7b5cb 100644 --- a/packages/node_modules/@node-red/runtime/lib/api/flows.js +++ b/packages/node_modules/@node-red/runtime/lib/api/flows.js @@ -194,14 +194,13 @@ var api = module.exports = { var id = opts.id; return runtime.flows.removeFlow(id, opts.user).then(function () { runtime.log.audit({event: "flow.remove", id: id}, opts.req); - return resolve(); + return; }).catch(function (err) { if (err.code === 404) { runtime.log.audit({event: "flow.remove", id: id, error: "not_found"}, opts.req); // TODO: this swap around of .code and .status isn't ideal err.status = 404; err.code = "not_found"; - return reject(err); } else { runtime.log.audit({ event: "flow.remove", diff --git a/test/unit/@node-red/runtime/lib/api/flows_spec.js b/test/unit/@node-red/runtime/lib/api/flows_spec.js index c0b7d1cb3..9062ef52f 100644 --- a/test/unit/@node-red/runtime/lib/api/flows_spec.js +++ b/test/unit/@node-red/runtime/lib/api/flows_spec.js @@ -255,7 +255,9 @@ describe("runtime-api/flows", function() { var err = new Error(); // TODO: quirk of internal api - uses .code for .status err.code = 404; - throw err; + var p = Promise.reject(err); + p.catch(()=>{}); + return p; } else if (id === "error") { var err = new Error(); // TODO: quirk of internal api - uses .code for .status @@ -311,7 +313,9 @@ describe("runtime-api/flows", function() { var err = new Error(); // TODO: quirk of internal api - uses .code for .status err.code = 404; - throw err; + var p = Promise.reject(err); + p.catch(()=>{}); + return p; } else if (flow === "error") { var err = new Error(); // TODO: quirk of internal api - uses .code for .status From 517e376582ae6faf8dacd297839015b80fcb6ec4 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Tue, 29 Sep 2020 17:39:29 +0100 Subject: [PATCH 11/11] Restore support for runtimeSyncDelivery flag --- .../@node-red/runtime/lib/flows/Flow.js | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 9d97cea21..20a4f6c3d 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -540,10 +540,6 @@ class Flow { } } - get asyncMessageDelivery() { - return asyncMessageDelivery - } - send(sendEvents) { // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. // preRoute - called once for each SendEvent object in turn @@ -650,12 +646,18 @@ function handlePreDeliver(flow,sendEvent, reportError) { reportError(err,sendEvent); return; } else if (err !== false) { - setImmediate(function() { + if (asyncMessageDelivery) { + setImmediate(function() { + if (sendEvent.destination.node) { + sendEvent.destination.node.receive(sendEvent.msg); + } + }) + } else { if (sendEvent.destination.node) { sendEvent.destination.node.receive(sendEvent.msg); } - }) + } // postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) hooks.trigger("postDeliver", sendEvent, function(err) { if (err) {