Refactor lib/flows code to include initial router component

This commit is contained in:
Nick O'Leary 2020-07-20 16:48:47 +01:00
parent 952c9d8bdb
commit d57ec0cd53
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
18 changed files with 147 additions and 115 deletions

View File

@ -52,7 +52,7 @@ var api = module.exports = {
getFlows: function(opts) {
return new Promise(function(resolve,reject) {
runtime.log.audit({event: "flows.get"}, opts.req);
return resolve(runtime.nodes.getFlows());
return resolve(runtime.flows.getFlows());
});
},
/**
@ -75,10 +75,10 @@ var api = module.exports = {
var apiPromise;
if (deploymentType === 'reload') {
apiPromise = runtime.nodes.loadFlows(true);
apiPromise = runtime.flows.loadFlows(true);
} else {
if (flows.hasOwnProperty('rev')) {
var currentVersion = runtime.nodes.getFlows().rev;
var currentVersion = runtime.flows.getFlows().rev;
if (currentVersion !== flows.rev) {
var err;
err = new Error();
@ -114,7 +114,7 @@ var api = module.exports = {
return mutex.runExclusive(function() {
return new Promise(function (resolve, reject) {
var flow = opts.flow;
runtime.nodes.addFlow(flow,opts.user).then(function (id) {
runtime.flows.addFlow(flow, opts.user).then(function (id) {
runtime.log.audit({event: "flow.add", id: id}, opts.req);
return resolve(id);
}).catch(function (err) {
@ -141,7 +141,7 @@ var api = module.exports = {
*/
getFlow: function(opts) {
return new Promise(function (resolve,reject) {
var flow = runtime.nodes.getFlow(opts.id);
var flow = runtime.flows.getFlow(opts.id);
if (flow) {
runtime.log.audit({event: "flow.get",id:opts.id}, opts.req);
return resolve(flow);
@ -170,7 +170,7 @@ var api = module.exports = {
var flow = opts.flow;
var id = opts.id;
try {
runtime.nodes.updateFlow(id, flow, opts.user).then(function () {
runtime.flows.updateFlow(id, flow, opts.user).then(function () {
runtime.log.audit({event: "flow.update", id: id}, opts.req);
return resolve(id);
}).catch(function (err) {
@ -216,7 +216,7 @@ var api = module.exports = {
return new Promise(function (resolve, reject) {
var id = opts.id;
try {
runtime.nodes.removeFlow(id, opts.user).then(function () {
runtime.flows.removeFlow(id, opts.user).then(function () {
runtime.log.audit({event: "flow.remove", id: id}, opts.req);
return resolve();
}).catch(function (err) {

View File

@ -17,8 +17,10 @@
var clone = require("clone");
var redUtil = require("@node-red/util").util;
var flowUtil = require("./util");
var events = require("../../events");
const context = require('../context');
var events = require("../events");
const context = require('../nodes/context');
const router = require("./router");
var Subflow;
var Log;
@ -543,6 +545,10 @@ class Flow {
return asyncMessageDelivery
}
send(src,destinationId,msg) {
router.send(src,destinationId,msg);
}
dump() {
console.log("==================")
console.log(this.TYPE, this.id);

View File

@ -16,14 +16,14 @@
const clone = require("clone");
const Flow = require('./Flow').Flow;
const context = require('../context');
const context = require('../nodes/context');
const util = require("util");
const redUtil = require("@node-red/util").util;
const flowUtil = require("./util");
const credentials = require("../credentials");
const credentials = require("../nodes/credentials");
var Log;
@ -159,7 +159,7 @@ class Subflow extends Flow {
start(diff) {
var self = this;
// Create a subflow node to accept inbound messages and route appropriately
var Node = require("../Node");
var Node = require("../nodes/Node");
if (this.subflowDef.status) {
var subflowStatusConfig = {

View File

@ -22,13 +22,12 @@ var Flow = require('./Flow');
var typeRegistry = require("@node-red/registry");
var deprecated = typeRegistry.deprecated;
var context = require("../context")
var credentials = require("../credentials");
var context = require("../nodes/context")
var credentials = require("../nodes/credentials");
var router = require("./router");
var flowUtil = require("./util");
var log;
var events = require("../../events");
var events = require("../events");
var redUtil = require("@node-red/util").util;
var storage = null;
@ -71,6 +70,7 @@ function init(runtime) {
}
Flow.init(runtime);
flowUtil.init(runtime);
router.init(runtime);
}
function loadFlows() {

View File

@ -0,0 +1,50 @@
var settings;
const LocalRouter = require("./localRouter");
var defaultRouter;
class Router {
constructor(stack) {
this.stack = stack || [];
}
send(source,destinationId,msg) {
var pos = 0;
var next = () => {
var router = this.stack[pos++];
if (router) {
router.send(source,destinationId,msg,next);
}
}
next();
}
}
function init(runtime) {
settings = runtime.settings;
defaultRouter = new Router([
new LocalRouter(),
new PostMessageLogger()
])
}
function send(source,destinationId,msg) {
defaultRouter.send(source,destinationId,msg);
}
module.exports = {
init:init,
send: send
}
class PostMessageLogger {
constructor() {}
send(source,destinationId,msg,next) {
console.log(source.id.padEnd(16),"->",destinationId.padEnd(16),JSON.stringify(msg));
}
}

View File

@ -0,0 +1,17 @@
class LocalRouter {
constructor() {}
send(source,destinationId,msg,next) {
var node = source._flow.getNode(destinationId);
if (node) {
setImmediate(function() {
node.receive(msg);
next();
});
} else if (next) {
next()
}
}
}
module.exports = LocalRouter

View File

@ -19,6 +19,7 @@ var when = require('when');
var externalAPI = require("./api");
var redNodes = require("./nodes");
var flows = require("./flows");
var storage = require("./storage");
var library = require("./library");
var events = require("./events");
@ -273,6 +274,7 @@ var runtime = {
storage: storage,
events: events,
nodes: redNodes,
flows: flows,
library: library,
exec: exec,
util: require("@node-red/util").util,

View File

@ -20,7 +20,7 @@ var EventEmitter = require("events").EventEmitter;
var redUtil = require("@node-red/util").util;
var Log = require("@node-red/util").log;
var context = require("./context");
var flows = require("./flows");
var flows = require("../flows");
const NOOP_SEND = function() {}
@ -55,10 +55,6 @@ function Node(n) {
// as part of its constructor - config._flow will overwrite this._flow
// which we can tolerate as they are the same object.
Object.defineProperty(this,'_flow', {value: n._flow, enumerable: false, writable: true })
this._asyncDelivery = n._flow.asyncMessageDelivery;
}
if (this._asyncDelivery === undefined) {
this._asyncDelivery = true;
}
this.updateWires(n.wires);
}
@ -173,15 +169,7 @@ Node.prototype._emit = Node.prototype.emit;
Node.prototype.emit = function(event, ...args) {
var node = this;
if (event === "input") {
// When Pluggable Message Routing arrives, this will be called from
// that and will already be sync/async depending on the router.
if (this._asyncDelivery) {
setImmediate(function() {
node._emitInput.apply(node,args);
});
} else {
this._emitInput.apply(this,args);
}
this._emitInput.apply(this,args);
} else {
this._emit.apply(this,arguments);
}
@ -366,11 +354,7 @@ Node.prototype.send = function(msg) {
msg._msgid = redUtil.generateId();
}
this.metric("send",msg);
node = this._flow.getNode(this._wire);
/* istanbul ignore else */
if (node) {
node.receive(msg);
}
this._flow.send(this,this._wire,msg);
return;
} else {
msg = [msg];
@ -398,23 +382,20 @@ Node.prototype.send = function(msg) {
var k = 0;
// for each recipent node of that output
for (var j = 0; j < wires.length; j++) {
node = this._flow.getNode(wires[j]); // node at end of wire j
if (node) {
// for each msg to send eg. [[m1, m2, ...], ...]
for (k = 0; k < msgs.length; k++) {
var m = msgs[k];
if (m !== null && m !== undefined) {
/* istanbul ignore else */
if (!sentMessageId) {
sentMessageId = m._msgid;
}
if (msgSent) {
var clonedmsg = redUtil.cloneMessage(m);
sendEvents.push({n:node,m:clonedmsg});
} else {
sendEvents.push({n:node,m:m});
msgSent = true;
}
// for each msg to send eg. [[m1, m2, ...], ...]
for (k = 0; k < msgs.length; k++) {
var m = msgs[k];
if (m !== null && m !== undefined) {
/* istanbul ignore else */
if (!sentMessageId) {
sentMessageId = m._msgid;
}
if (msgSent) {
var clonedmsg = redUtil.cloneMessage(m);
sendEvents.push({n:wires[j],m:clonedmsg});
} else {
sendEvents.push({n:wires[j],m:m});
msgSent = true;
}
}
}
@ -434,7 +415,7 @@ Node.prototype.send = function(msg) {
if (!ev.m._msgid) {
ev.m._msgid = sentMessageId;
}
ev.n.receive(ev.m);
this._flow.send(this,ev.n,ev.m);
}
};

View File

@ -18,7 +18,6 @@ var clone = require("clone");
var log = require("@node-red/util").log;
var util = require("@node-red/util").util;
var memory = require("./memory");
var flows;
var settings;
@ -48,7 +47,6 @@ function logUnknownStore(name) {
}
function init(_settings) {
flows = require("../flows");
settings = _settings;
contexts = {};
stores = {};
@ -513,39 +511,6 @@ function getContext(nodeId, flowId) {
return newContext;
}
//
// function getContext(localId,flowId,parent) {
// var contextId = localId;
// if (flowId) {
// contextId = localId+":"+flowId;
// }
// console.log("getContext",localId,flowId,"known?",contexts.hasOwnProperty(contextId));
// if (contexts.hasOwnProperty(contextId)) {
// return contexts[contextId];
// }
// var newContext = createContext(contextId,undefined,parent);
// if (flowId) {
// var node = flows.get(flowId);
// console.log("flows,get",flowId,node&&node.type)
// var parent = undefined;
// if (node && node.type.startsWith("subflow:")) {
// parent = node.context().flow;
// }
// else {
// parent = createRootContext();
// }
// var flowContext = getContext(flowId,undefined,parent);
// Object.defineProperty(newContext, 'flow', {
// value: flowContext
// });
// }
// Object.defineProperty(newContext, 'global', {
// value: contexts['global']
// })
// contexts[contextId] = newContext;
// return newContext;
// }
function deleteContext(id,flowId) {
if(!hasConfiguredStore){
// only delete context if there's no configured storage.

View File

@ -23,8 +23,8 @@ var util = require("util");
var registry = require("@node-red/registry");
var credentials = require("./credentials");
var flows = require("./flows");
var flowUtil = require("./flows/util")
var flows = require("../flows");
var flowUtil = require("../flows/util")
var context = require("./context");
var Node = require("./Node");
var log;

View File

@ -37,7 +37,7 @@ describe("runtime-api/flows", function() {
it("returns the current flow configuration", function(done) {
flows.init({
log: mockLog(),
nodes: {
flows: {
getFlows: function() { return [1,2,3] }
}
});
@ -76,7 +76,7 @@ describe("runtime-api/flows", function() {
})
flows.init({
log: mockLog(),
nodes: {
flows: {
getFlows: function() { return {rev:"currentRev",flows:[]} },
setFlows: setFlows,
loadFlows: loadFlows
@ -192,7 +192,7 @@ describe("runtime-api/flows", function() {
});
flows.init({
log: mockLog(),
nodes: {
flows: {
addFlow: addFlow
}
});
@ -225,7 +225,7 @@ describe("runtime-api/flows", function() {
});
flows.init({
log: mockLog(),
nodes: {
flows: {
getFlow: getFlow
}
});
@ -268,7 +268,7 @@ describe("runtime-api/flows", function() {
});
flows.init({
log: mockLog(),
nodes: {
flows: {
updateFlow: updateFlow
}
});
@ -324,7 +324,7 @@ describe("runtime-api/flows", function() {
});
flows.init({
log: mockLog(),
nodes: {
flows: {
removeFlow: removeFlow
}
});

View File

@ -22,9 +22,9 @@ var util = require("util");
var NR_TEST_UTILS = require("nr-test-utils");
var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util");
var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow");
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows");
var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util");
var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow");
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows");
var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node");
var typeRegistry = NR_TEST_UTILS.require("@node-red/registry");

View File

@ -22,11 +22,11 @@ var util = require("util");
var NR_TEST_UTILS = require("nr-test-utils");
var Subflow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Subflow");
var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow");
var Subflow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Subflow");
var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow");
var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util");
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows");
var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util");
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows");
var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node");
var typeRegistry = NR_TEST_UTILS.require("@node-red/registry");

View File

@ -20,13 +20,13 @@ var when = require("when");
var clone = require("clone");
var NR_TEST_UTILS = require("nr-test-utils");
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows");
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows");
var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node");
var RED = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes");
var events = NR_TEST_UTILS.require("@node-red/runtime/lib/events");
var credentials = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/credentials");
var typeRegistry = NR_TEST_UTILS.require("@node-red/registry")
var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow");
var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow");
describe('flows/index', function() {

View File

@ -19,7 +19,7 @@ var sinon = require("sinon");
var when = require("when");
var clone = require("clone");
var NR_TEST_UTILS = require("nr-test-utils");
var flowUtil = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util");
var flowUtil = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util");
var typeRegistry = NR_TEST_UTILS.require("@node-red/registry");
var redUtil = NR_TEST_UTILS.require("@node-red/util").util;

View File

@ -19,7 +19,7 @@ var sinon = require('sinon');
var NR_TEST_UTILS = require("nr-test-utils");
var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node");
var Log = NR_TEST_UTILS.require("@node-red/util").log;
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows");
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows");
describe('Node', function() {
describe('#constructor',function() {
@ -208,6 +208,7 @@ describe('Node', function() {
it('emits a single message', function(done) {
var flow = {
getNode: (id) => { return {'n1':n1,'n2':n2}[id]},
send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})}
};
var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]});
var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'});
@ -255,7 +256,9 @@ describe('Node', function() {
it('emits a single message - synchronous mode', function(done) {
var flow = {
handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)},
getNode: (id) => { return {'n1':n1,'n2':n2}[id]},
send: (node,dst,msg) => { n2.receive(msg) ;},
asyncMessageDelivery: false
};
var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]});
@ -284,8 +287,10 @@ describe('Node', function() {
it('emits a message with callback provided send', function(done) {
var flow = {
handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)},
getNode: (id) => { return {'n1':n1,'n2':n2}[id]},
handleComplete: (node,msg) => {}
handleComplete: (node,msg) => {},
send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})}
};
var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]});
var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'});
@ -308,7 +313,9 @@ describe('Node', function() {
it('emits multiple messages on a single output', function(done) {
var flow = {
handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)},
getNode: (id) => { return {'n1':n1,'n2':n2}[id]},
send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})}
};
var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]});
var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'});
@ -338,14 +345,15 @@ describe('Node', function() {
it('emits messages to multiple outputs', function(done) {
var flow = {
handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)},
getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3,'n4':n4,'n5':n5}[id]},
send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })}
};
var n1 = new RedNode({_flow:flow, id:'n1',type:'abc',wires:[['n2'],['n3'],['n4','n5']]});
var n2 = new RedNode({_flow:flow, id:'n2',type:'abc'});
var n3 = new RedNode({_flow:flow, id:'n3',type:'abc'});
var n4 = new RedNode({_flow:flow, id:'n4',type:'abc'});
var n5 = new RedNode({_flow:flow, id:'n5',type:'abc'});
var messages = [
{payload:"hello world"},
null,
@ -396,6 +404,7 @@ describe('Node', function() {
it('emits no messages', function(done) {
var flow = {
handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)},
getNode: (id) => { return {'n1':n1,'n2':n2}[id]},
};
var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]});
@ -414,20 +423,20 @@ describe('Node', function() {
it('emits messages ignoring non-existent nodes', function(done) {
var flow = {
handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)},
getNode: (id) => { return {'n1':n1,'n2':n2}[id]},
send: (node,dst,msg) => { setImmediate(function() { var n = flow.getNode(dst); n && n.receive(msg) })}
};
var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n9'],['n2']]});
var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'});
var messages = [
{payload:"hello world"},
{payload:"hello world again"}
{_msgid:"123", payload:"hello world"},
{_msgid:"234", payload:"hello world again"}
];
// only one message sent, so no copy needed
n2.on('input',function(msg) {
should.deepEqual(msg,messages[1]);
should.strictEqual(msg,messages[1]);
done();
});
@ -437,6 +446,7 @@ describe('Node', function() {
it('emits messages without cloning req or res', function(done) {
var flow = {
getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3}[id]},
send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })}
};
var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[[['n2'],['n3']]]});
var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'});
@ -493,6 +503,7 @@ describe('Node', function() {
Log.addHandler(logHandler);
var flow = {
getNode: (id) => { return {'n1':sender,'n2':receiver1,'n3':receiver2}[id]},
send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })}
};
var sender = new RedNode({_flow:flow,id:'n1',type:'abc', wires:[['n2', 'n3']]});

View File

@ -23,7 +23,7 @@ var inherits = require("util").inherits;
var NR_TEST_UTILS = require("nr-test-utils");
var index = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/index");
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows");
var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows");
var registry = NR_TEST_UTILS.require("@node-red/registry")
var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node");