From cebddc0237c7852edd8f2613e940c31eba93e396 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Sat, 29 Oct 2016 21:46:19 +0100 Subject: [PATCH] Add message router component --- nodes/core/core/20-inject.js | 21 ++-- red/runtime/nodes/Node.js | 114 ++------------------ red/runtime/nodes/index.js | 4 - red/runtime/nodes/router/index.js | 112 +++++++++++++++++++ test/red/runtime/nodes/Node_spec.js | 21 ++-- test/red/runtime/nodes/router/index_spec.js | 79 ++++++++++++++ 6 files changed, 223 insertions(+), 128 deletions(-) create mode 100644 red/runtime/nodes/router/index.js create mode 100644 test/red/runtime/nodes/router/index_spec.js diff --git a/nodes/core/core/20-inject.js b/nodes/core/core/20-inject.js index d31cf7c3d..40e5e0a07 100644 --- a/nodes/core/core/20-inject.js +++ b/nodes/core/core/20-inject.js @@ -67,21 +67,20 @@ module.exports = function(RED) { this.error(err,msg); } }); + this.on('close', function() { + if (node.interval_id != null) { + clearInterval(node.interval_id); + if (RED.settings.verbose) { node.log(RED._("inject.stopped")); } + } else if (node.cronjob != null) { + node.cronjob.stop(); + if (RED.settings.verbose) { node.log(RED._("inject.stopped")); } + delete node.cronjob; + } + }); } RED.nodes.registerType("inject",InjectNode); - InjectNode.prototype.close = function() { - if (this.interval_id != null) { - clearInterval(this.interval_id); - if (RED.settings.verbose) { this.log(RED._("inject.stopped")); } - } else if (this.cronjob != null) { - this.cronjob.stop(); - if (RED.settings.verbose) { this.log(RED._("inject.stopped")); } - delete this.cronjob; - } - } - RED.httpAdmin.post("/inject/:id", RED.auth.needsPermission("inject.write"), function(req,res) { var node = RED.nodes.getNode(req.params.id); if (node != null) { diff --git a/red/runtime/nodes/Node.js b/red/runtime/nodes/Node.js index ed0503ec5..9e9a2e865 100644 --- a/red/runtime/nodes/Node.js +++ b/red/runtime/nodes/Node.js @@ -22,6 +22,7 @@ var redUtil = require("../util"); var Log = require("../log"); var context = require("./context"); var flows = require("./flows"); +var router = require("./router"); function Node(n) { this.id = n.id; @@ -41,28 +42,10 @@ function Node(n) { util.inherits(Node, EventEmitter); Node.prototype.updateWires = function(wires) { - //console.log("UPDATE",this.id); + router.add(this,wires); this.wires = wires || []; - delete this._wire; - - var wc = 0; - this.wires.forEach(function(w) { - wc+=w.length; - }); - this._wireCount = wc; - if (wc === 0) { - // With nothing wired to the node, no-op send - this.send = function(msg) {} - } else { - this.send = Node.prototype.send; - if (this.wires.length === 1 && this.wires[0].length === 1) { - // Single wire, so we can shortcut the send when - // a single message is sent - this._wire = this.wires[0][0]; - } - } - } + Node.prototype.context = function() { if (!this._context) { this._context = context.get(this._alias||this.id,this.z); @@ -100,103 +83,22 @@ Node.prototype.close = function() { } if (promises.length > 0) { return when.settle(promises).then(function() { + router.remove(this); if (this._context) { - context.delete(this._alias||this.id,this.z); + context.delete(this._alias||this.id,this.z); } }); } else { + router.remove(this); if (this._context) { - context.delete(this._alias||this.id,this.z); + context.delete(this._alias||this.id,this.z); } return; } }; Node.prototype.send = function(msg) { - var msgSent = false; - var node; - - if (msg === null || typeof msg === "undefined") { - return; - } else if (!util.isArray(msg)) { - if (this._wire) { - // A single message and a single wire on output 0 - // TODO: pre-load flows.get calls - cannot do in constructor - // as not all nodes are defined at that point - if (!msg._msgid) { - msg._msgid = redUtil.generateId(); - } - this.metric("send",msg); - node = flows.get(this._wire); - /* istanbul ignore else */ - if (node) { - node.receive(msg); - } - return; - } else { - msg = [msg]; - } - } - - var numOutputs = this.wires.length; - - // Build a list of send events so that all cloning is done before - // any calls to node.receive - var sendEvents = []; - - var sentMessageId = null; - - // for each output of node eg. [msgs to output 0, msgs to output 1, ...] - for (var i = 0; i < numOutputs; i++) { - var wires = this.wires[i]; // wires leaving output i - /* istanbul ignore else */ - if (i < msg.length) { - var msgs = msg[i]; // msgs going to output i - if (msgs !== null && typeof msgs !== "undefined") { - if (!util.isArray(msgs)) { - msgs = [msgs]; - } - var k = 0; - // for each recipent node of that output - for (var j = 0; j < wires.length; j++) { - node = flows.get(wires[j]); // node at end of wire j - if (node) { - // for each msg to send eg. [[m1, m2, ...], ...] - for (k = 0; k < msgs.length; k++) { - var m = msgs[k]; - if (m !== null && m !== undefined) { - /* istanbul ignore else */ - if (!sentMessageId) { - sentMessageId = m._msgid; - } - if (msgSent) { - var clonedmsg = redUtil.cloneMessage(m); - sendEvents.push({n:node,m:clonedmsg}); - } else { - sendEvents.push({n:node,m:m}); - msgSent = true; - } - } - } - } - } - } - } - } - /* istanbul ignore else */ - if (!sentMessageId) { - sentMessageId = redUtil.generateId(); - } - this.metric("send",{_msgid:sentMessageId}); - - for (i=0;i