From d57ec0cd5328e834f686a58b685d7edf067b691b Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Mon, 20 Jul 2020 16:48:47 +0100 Subject: [PATCH] Refactor lib/flows code to include initial router component --- .../@node-red/runtime/lib/api/flows.js | 14 ++--- .../runtime/lib/{nodes => }/flows/Flow.js | 10 +++- .../runtime/lib/{nodes => }/flows/Subflow.js | 6 +- .../runtime/lib/{nodes => }/flows/index.js | 10 ++-- .../runtime/lib/flows/router/index.js | 50 +++++++++++++++++ .../runtime/lib/flows/router/localRouter.js | 17 ++++++ .../runtime/lib/{nodes => }/flows/util.js | 0 .../@node-red/runtime/lib/index.js | 2 + .../@node-red/runtime/lib/nodes/Node.js | 55 ++++++------------- .../runtime/lib/nodes/context/index.js | 35 ------------ .../@node-red/runtime/lib/nodes/index.js | 4 +- .../@node-red/runtime/lib/api/flows_spec.js | 12 ++-- .../lib/{nodes => }/flows/Flow_spec.js | 6 +- .../lib/{nodes => }/flows/Subflow_spec.js | 8 +-- .../lib/{nodes => }/flows/index_spec.js | 4 +- .../lib/{nodes => }/flows/util_spec.js | 2 +- .../@node-red/runtime/lib/nodes/Node_spec.js | 25 ++++++--- .../@node-red/runtime/lib/nodes/index_spec.js | 2 +- 18 files changed, 147 insertions(+), 115 deletions(-) rename packages/node_modules/@node-red/runtime/lib/{nodes => }/flows/Flow.js (99%) rename packages/node_modules/@node-red/runtime/lib/{nodes => }/flows/Subflow.js (99%) rename packages/node_modules/@node-red/runtime/lib/{nodes => }/flows/index.js (99%) create mode 100644 packages/node_modules/@node-red/runtime/lib/flows/router/index.js create mode 100644 packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js rename packages/node_modules/@node-red/runtime/lib/{nodes => }/flows/util.js (100%) rename test/unit/@node-red/runtime/lib/{nodes => }/flows/Flow_spec.js (99%) rename test/unit/@node-red/runtime/lib/{nodes => }/flows/Subflow_spec.js (99%) rename test/unit/@node-red/runtime/lib/{nodes => }/flows/index_spec.js (99%) rename test/unit/@node-red/runtime/lib/{nodes => }/flows/util_spec.js (99%) diff --git a/packages/node_modules/@node-red/runtime/lib/api/flows.js b/packages/node_modules/@node-red/runtime/lib/api/flows.js index 4207c83ce..f97e75390 100644 --- a/packages/node_modules/@node-red/runtime/lib/api/flows.js +++ b/packages/node_modules/@node-red/runtime/lib/api/flows.js @@ -52,7 +52,7 @@ var api = module.exports = { getFlows: function(opts) { return new Promise(function(resolve,reject) { runtime.log.audit({event: "flows.get"}, opts.req); - return resolve(runtime.nodes.getFlows()); + return resolve(runtime.flows.getFlows()); }); }, /** @@ -75,10 +75,10 @@ var api = module.exports = { var apiPromise; if (deploymentType === 'reload') { - apiPromise = runtime.nodes.loadFlows(true); + apiPromise = runtime.flows.loadFlows(true); } else { if (flows.hasOwnProperty('rev')) { - var currentVersion = runtime.nodes.getFlows().rev; + var currentVersion = runtime.flows.getFlows().rev; if (currentVersion !== flows.rev) { var err; err = new Error(); @@ -114,7 +114,7 @@ var api = module.exports = { return mutex.runExclusive(function() { return new Promise(function (resolve, reject) { var flow = opts.flow; - runtime.nodes.addFlow(flow,opts.user).then(function (id) { + runtime.flows.addFlow(flow, opts.user).then(function (id) { runtime.log.audit({event: "flow.add", id: id}, opts.req); return resolve(id); }).catch(function (err) { @@ -141,7 +141,7 @@ var api = module.exports = { */ getFlow: function(opts) { return new Promise(function (resolve,reject) { - var flow = runtime.nodes.getFlow(opts.id); + var flow = runtime.flows.getFlow(opts.id); if (flow) { runtime.log.audit({event: "flow.get",id:opts.id}, opts.req); return resolve(flow); @@ -170,7 +170,7 @@ var api = module.exports = { var flow = opts.flow; var id = opts.id; try { - runtime.nodes.updateFlow(id, flow, opts.user).then(function () { + runtime.flows.updateFlow(id, flow, opts.user).then(function () { runtime.log.audit({event: "flow.update", id: id}, opts.req); return resolve(id); }).catch(function (err) { @@ -216,7 +216,7 @@ var api = module.exports = { return new Promise(function (resolve, reject) { var id = opts.id; try { - runtime.nodes.removeFlow(id, opts.user).then(function () { + runtime.flows.removeFlow(id, opts.user).then(function () { runtime.log.audit({event: "flow.remove", id: id}, opts.req); return resolve(); }).catch(function (err) { diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js similarity index 99% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js rename to packages/node_modules/@node-red/runtime/lib/flows/Flow.js index df4e810fa..aa5abab98 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -17,8 +17,10 @@ var clone = require("clone"); var redUtil = require("@node-red/util").util; var flowUtil = require("./util"); -var events = require("../../events"); -const context = require('../context'); +var events = require("../events"); +const context = require('../nodes/context'); +const router = require("./router"); + var Subflow; var Log; @@ -543,6 +545,10 @@ class Flow { return asyncMessageDelivery } + send(src,destinationId,msg) { + router.send(src,destinationId,msg); + } + dump() { console.log("==================") console.log(this.TYPE, this.id); diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Subflow.js b/packages/node_modules/@node-red/runtime/lib/flows/Subflow.js similarity index 99% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/Subflow.js rename to packages/node_modules/@node-red/runtime/lib/flows/Subflow.js index cca230d79..c651e77aa 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Subflow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Subflow.js @@ -16,14 +16,14 @@ const clone = require("clone"); const Flow = require('./Flow').Flow; -const context = require('../context'); +const context = require('../nodes/context'); const util = require("util"); const redUtil = require("@node-red/util").util; const flowUtil = require("./util"); -const credentials = require("../credentials"); +const credentials = require("../nodes/credentials"); var Log; @@ -159,7 +159,7 @@ class Subflow extends Flow { start(diff) { var self = this; // Create a subflow node to accept inbound messages and route appropriately - var Node = require("../Node"); + var Node = require("../nodes/Node"); if (this.subflowDef.status) { var subflowStatusConfig = { diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js similarity index 99% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js rename to packages/node_modules/@node-red/runtime/lib/flows/index.js index 38742daf3..157f384c8 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -22,13 +22,12 @@ var Flow = require('./Flow'); var typeRegistry = require("@node-red/registry"); var deprecated = typeRegistry.deprecated; - -var context = require("../context") -var credentials = require("../credentials"); - +var context = require("../nodes/context") +var credentials = require("../nodes/credentials"); +var router = require("./router"); var flowUtil = require("./util"); var log; -var events = require("../../events"); +var events = require("../events"); var redUtil = require("@node-red/util").util; var storage = null; @@ -71,6 +70,7 @@ function init(runtime) { } Flow.init(runtime); flowUtil.init(runtime); + router.init(runtime); } function loadFlows() { diff --git a/packages/node_modules/@node-red/runtime/lib/flows/router/index.js b/packages/node_modules/@node-red/runtime/lib/flows/router/index.js new file mode 100644 index 000000000..116019635 --- /dev/null +++ b/packages/node_modules/@node-red/runtime/lib/flows/router/index.js @@ -0,0 +1,50 @@ + +var settings; + +const LocalRouter = require("./localRouter"); +var defaultRouter; + + +class Router { + constructor(stack) { + this.stack = stack || []; + } + send(source,destinationId,msg) { + var pos = 0; + var next = () => { + var router = this.stack[pos++]; + if (router) { + router.send(source,destinationId,msg,next); + } + } + next(); + } +} + +function init(runtime) { + settings = runtime.settings; + + defaultRouter = new Router([ + new LocalRouter(), + new PostMessageLogger() + ]) + +} + +function send(source,destinationId,msg) { + defaultRouter.send(source,destinationId,msg); +} + +module.exports = { + init:init, + send: send +} + + + +class PostMessageLogger { + constructor() {} + send(source,destinationId,msg,next) { + console.log(source.id.padEnd(16),"->",destinationId.padEnd(16),JSON.stringify(msg)); + } +} diff --git a/packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js b/packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js new file mode 100644 index 000000000..96dcc3489 --- /dev/null +++ b/packages/node_modules/@node-red/runtime/lib/flows/router/localRouter.js @@ -0,0 +1,17 @@ + +class LocalRouter { + constructor() {} + send(source,destinationId,msg,next) { + var node = source._flow.getNode(destinationId); + if (node) { + setImmediate(function() { + node.receive(msg); + next(); + }); + } else if (next) { + next() + } + } +} + +module.exports = LocalRouter \ No newline at end of file diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/util.js b/packages/node_modules/@node-red/runtime/lib/flows/util.js similarity index 100% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/util.js rename to packages/node_modules/@node-red/runtime/lib/flows/util.js diff --git a/packages/node_modules/@node-red/runtime/lib/index.js b/packages/node_modules/@node-red/runtime/lib/index.js index 66bff6ff2..21e775393 100644 --- a/packages/node_modules/@node-red/runtime/lib/index.js +++ b/packages/node_modules/@node-red/runtime/lib/index.js @@ -19,6 +19,7 @@ var when = require('when'); var externalAPI = require("./api"); var redNodes = require("./nodes"); +var flows = require("./flows"); var storage = require("./storage"); var library = require("./library"); var events = require("./events"); @@ -273,6 +274,7 @@ var runtime = { storage: storage, events: events, nodes: redNodes, + flows: flows, library: library, exec: exec, util: require("@node-red/util").util, diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index 60b106426..e0bba6a65 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -20,7 +20,7 @@ var EventEmitter = require("events").EventEmitter; var redUtil = require("@node-red/util").util; var Log = require("@node-red/util").log; var context = require("./context"); -var flows = require("./flows"); +var flows = require("../flows"); const NOOP_SEND = function() {} @@ -55,10 +55,6 @@ function Node(n) { // as part of its constructor - config._flow will overwrite this._flow // which we can tolerate as they are the same object. Object.defineProperty(this,'_flow', {value: n._flow, enumerable: false, writable: true }) - this._asyncDelivery = n._flow.asyncMessageDelivery; - } - if (this._asyncDelivery === undefined) { - this._asyncDelivery = true; } this.updateWires(n.wires); } @@ -173,15 +169,7 @@ Node.prototype._emit = Node.prototype.emit; Node.prototype.emit = function(event, ...args) { var node = this; if (event === "input") { - // When Pluggable Message Routing arrives, this will be called from - // that and will already be sync/async depending on the router. - if (this._asyncDelivery) { - setImmediate(function() { - node._emitInput.apply(node,args); - }); - } else { - this._emitInput.apply(this,args); - } + this._emitInput.apply(this,args); } else { this._emit.apply(this,arguments); } @@ -366,11 +354,7 @@ Node.prototype.send = function(msg) { msg._msgid = redUtil.generateId(); } this.metric("send",msg); - node = this._flow.getNode(this._wire); - /* istanbul ignore else */ - if (node) { - node.receive(msg); - } + this._flow.send(this,this._wire,msg); return; } else { msg = [msg]; @@ -398,23 +382,20 @@ Node.prototype.send = function(msg) { var k = 0; // for each recipent node of that output for (var j = 0; j < wires.length; j++) { - node = this._flow.getNode(wires[j]); // node at end of wire j - if (node) { - // for each msg to send eg. [[m1, m2, ...], ...] - for (k = 0; k < msgs.length; k++) { - var m = msgs[k]; - if (m !== null && m !== undefined) { - /* istanbul ignore else */ - if (!sentMessageId) { - sentMessageId = m._msgid; - } - if (msgSent) { - var clonedmsg = redUtil.cloneMessage(m); - sendEvents.push({n:node,m:clonedmsg}); - } else { - sendEvents.push({n:node,m:m}); - msgSent = true; - } + // for each msg to send eg. [[m1, m2, ...], ...] + for (k = 0; k < msgs.length; k++) { + var m = msgs[k]; + if (m !== null && m !== undefined) { + /* istanbul ignore else */ + if (!sentMessageId) { + sentMessageId = m._msgid; + } + if (msgSent) { + var clonedmsg = redUtil.cloneMessage(m); + sendEvents.push({n:wires[j],m:clonedmsg}); + } else { + sendEvents.push({n:wires[j],m:m}); + msgSent = true; } } } @@ -434,7 +415,7 @@ Node.prototype.send = function(msg) { if (!ev.m._msgid) { ev.m._msgid = sentMessageId; } - ev.n.receive(ev.m); + this._flow.send(this,ev.n,ev.m); } }; diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/context/index.js b/packages/node_modules/@node-red/runtime/lib/nodes/context/index.js index c395c0615..fda3852ff 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/context/index.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/context/index.js @@ -18,7 +18,6 @@ var clone = require("clone"); var log = require("@node-red/util").log; var util = require("@node-red/util").util; var memory = require("./memory"); -var flows; var settings; @@ -48,7 +47,6 @@ function logUnknownStore(name) { } function init(_settings) { - flows = require("../flows"); settings = _settings; contexts = {}; stores = {}; @@ -513,39 +511,6 @@ function getContext(nodeId, flowId) { return newContext; } -// -// function getContext(localId,flowId,parent) { -// var contextId = localId; -// if (flowId) { -// contextId = localId+":"+flowId; -// } -// console.log("getContext",localId,flowId,"known?",contexts.hasOwnProperty(contextId)); -// if (contexts.hasOwnProperty(contextId)) { -// return contexts[contextId]; -// } -// var newContext = createContext(contextId,undefined,parent); -// if (flowId) { -// var node = flows.get(flowId); -// console.log("flows,get",flowId,node&&node.type) -// var parent = undefined; -// if (node && node.type.startsWith("subflow:")) { -// parent = node.context().flow; -// } -// else { -// parent = createRootContext(); -// } -// var flowContext = getContext(flowId,undefined,parent); -// Object.defineProperty(newContext, 'flow', { -// value: flowContext -// }); -// } -// Object.defineProperty(newContext, 'global', { -// value: contexts['global'] -// }) -// contexts[contextId] = newContext; -// return newContext; -// } - function deleteContext(id,flowId) { if(!hasConfiguredStore){ // only delete context if there's no configured storage. diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/index.js b/packages/node_modules/@node-red/runtime/lib/nodes/index.js index 705f10f5d..42fcd72f6 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/index.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/index.js @@ -23,8 +23,8 @@ var util = require("util"); var registry = require("@node-red/registry"); var credentials = require("./credentials"); -var flows = require("./flows"); -var flowUtil = require("./flows/util") +var flows = require("../flows"); +var flowUtil = require("../flows/util") var context = require("./context"); var Node = require("./Node"); var log; diff --git a/test/unit/@node-red/runtime/lib/api/flows_spec.js b/test/unit/@node-red/runtime/lib/api/flows_spec.js index dafbbc69b..c0b7d1cb3 100644 --- a/test/unit/@node-red/runtime/lib/api/flows_spec.js +++ b/test/unit/@node-red/runtime/lib/api/flows_spec.js @@ -37,7 +37,7 @@ describe("runtime-api/flows", function() { it("returns the current flow configuration", function(done) { flows.init({ log: mockLog(), - nodes: { + flows: { getFlows: function() { return [1,2,3] } } }); @@ -76,7 +76,7 @@ describe("runtime-api/flows", function() { }) flows.init({ log: mockLog(), - nodes: { + flows: { getFlows: function() { return {rev:"currentRev",flows:[]} }, setFlows: setFlows, loadFlows: loadFlows @@ -192,7 +192,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { addFlow: addFlow } }); @@ -225,7 +225,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { getFlow: getFlow } }); @@ -268,7 +268,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { updateFlow: updateFlow } }); @@ -324,7 +324,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { removeFlow: removeFlow } }); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js rename to test/unit/@node-red/runtime/lib/flows/Flow_spec.js index e1315dc50..9ce9aa30b 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js @@ -22,9 +22,9 @@ var util = require("util"); var NR_TEST_UTILS = require("nr-test-utils"); -var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util"); -var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util"); +var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry"); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js rename to test/unit/@node-red/runtime/lib/flows/Subflow_spec.js index 71bd9ad19..a55771391 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js @@ -22,11 +22,11 @@ var util = require("util"); var NR_TEST_UTILS = require("nr-test-utils"); -var Subflow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Subflow"); -var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow"); +var Subflow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Subflow"); +var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow"); -var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry"); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/index_spec.js b/test/unit/@node-red/runtime/lib/flows/index_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/index_spec.js rename to test/unit/@node-red/runtime/lib/flows/index_spec.js index 8865c5d3f..b8f3a25aa 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/index_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/index_spec.js @@ -20,13 +20,13 @@ var when = require("when"); var clone = require("clone"); var NR_TEST_UTILS = require("nr-test-utils"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var RED = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes"); var events = NR_TEST_UTILS.require("@node-red/runtime/lib/events"); var credentials = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/credentials"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry") -var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow"); +var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow"); describe('flows/index', function() { diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/util_spec.js b/test/unit/@node-red/runtime/lib/flows/util_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/util_spec.js rename to test/unit/@node-red/runtime/lib/flows/util_spec.js index b83a37660..8e1f15754 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/util_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/util_spec.js @@ -19,7 +19,7 @@ var sinon = require("sinon"); var when = require("when"); var clone = require("clone"); var NR_TEST_UTILS = require("nr-test-utils"); -var flowUtil = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util"); +var flowUtil = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry"); var redUtil = NR_TEST_UTILS.require("@node-red/util").util; diff --git a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js index 61b4446f7..3354c4215 100644 --- a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js @@ -19,7 +19,7 @@ var sinon = require('sinon'); var NR_TEST_UTILS = require("nr-test-utils"); var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var Log = NR_TEST_UTILS.require("@node-red/util").log; -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); describe('Node', function() { describe('#constructor',function() { @@ -208,6 +208,7 @@ describe('Node', function() { it('emits a single message', function(done) { var flow = { getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); @@ -255,7 +256,9 @@ describe('Node', function() { it('emits a single message - synchronous mode', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + send: (node,dst,msg) => { n2.receive(msg) ;}, asyncMessageDelivery: false }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); @@ -284,8 +287,10 @@ describe('Node', function() { it('emits a message with callback provided send', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - handleComplete: (node,msg) => {} + handleComplete: (node,msg) => {}, + send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); @@ -308,7 +313,9 @@ describe('Node', function() { it('emits multiple messages on a single output', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); @@ -338,14 +345,15 @@ describe('Node', function() { it('emits messages to multiple outputs', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3,'n4':n4,'n5':n5}[id]}, + send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} }; var n1 = new RedNode({_flow:flow, id:'n1',type:'abc',wires:[['n2'],['n3'],['n4','n5']]}); var n2 = new RedNode({_flow:flow, id:'n2',type:'abc'}); var n3 = new RedNode({_flow:flow, id:'n3',type:'abc'}); var n4 = new RedNode({_flow:flow, id:'n4',type:'abc'}); var n5 = new RedNode({_flow:flow, id:'n5',type:'abc'}); - var messages = [ {payload:"hello world"}, null, @@ -396,6 +404,7 @@ describe('Node', function() { it('emits no messages', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); @@ -414,20 +423,20 @@ describe('Node', function() { it('emits messages ignoring non-existent nodes', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + send: (node,dst,msg) => { setImmediate(function() { var n = flow.getNode(dst); n && n.receive(msg) })} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n9'],['n2']]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var messages = [ - {payload:"hello world"}, - {payload:"hello world again"} + {_msgid:"123", payload:"hello world"}, + {_msgid:"234", payload:"hello world again"} ]; - // only one message sent, so no copy needed n2.on('input',function(msg) { should.deepEqual(msg,messages[1]); - should.strictEqual(msg,messages[1]); done(); }); @@ -437,6 +446,7 @@ describe('Node', function() { it('emits messages without cloning req or res', function(done) { var flow = { getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3}[id]}, + send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[[['n2'],['n3']]]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); @@ -493,6 +503,7 @@ describe('Node', function() { Log.addHandler(logHandler); var flow = { getNode: (id) => { return {'n1':sender,'n2':receiver1,'n3':receiver2}[id]}, + send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} }; var sender = new RedNode({_flow:flow,id:'n1',type:'abc', wires:[['n2', 'n3']]}); diff --git a/test/unit/@node-red/runtime/lib/nodes/index_spec.js b/test/unit/@node-red/runtime/lib/nodes/index_spec.js index 66958196a..7a4992624 100644 --- a/test/unit/@node-red/runtime/lib/nodes/index_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/index_spec.js @@ -23,7 +23,7 @@ var inherits = require("util").inherits; var NR_TEST_UTILS = require("nr-test-utils"); var index = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/index"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var registry = NR_TEST_UTILS.require("@node-red/registry") var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node");