From 08148a07b21433dab01a06e7c1d43b134530f7e3 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Thu, 30 Jul 2020 17:52:39 +0100 Subject: [PATCH] Update Node/Flow to trigger msg routing hooks --- .../@node-red/runtime/lib/flows/Flow.js | 79 ++- .../@node-red/runtime/lib/nodes/Node.js | 139 ++-- .../@node-red/runtime/lib/flows/Flow_spec.js | 368 +++++++++++ .../@node-red/runtime/lib/nodes/Node_spec.js | 608 +++++++++++------- 4 files changed, 900 insertions(+), 294 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index aa5abab98..9d97cea21 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -19,8 +19,7 @@ var redUtil = require("@node-red/util").util; var flowUtil = require("./util"); var events = require("../events"); const context = require('../nodes/context'); -const router = require("./router"); - +const hooks = require("../hooks"); var Subflow; var Log; @@ -545,8 +544,25 @@ class Flow { return asyncMessageDelivery } - send(src,destinationId,msg) { - router.send(src,destinationId,msg); + send(sendEvents) { + // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. + // preRoute - called once for each SendEvent object in turn + // preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed. + // postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) + // onReceive - a node is about to receive a message + // postReceive - the message has been passed to the node's input handler + // onDone, onError - the node has completed with a message or logged an error + handleOnSend(this,sendEvents, (err, eventData) => { + if (err) { + let srcNode; + if (Array.isArray(eventData)) { + srcNode = eventData[0].source.node; + } else { + srcNode = eventData.source.node; + } + srcNode.error(err); + } + }); } dump() { @@ -595,6 +611,61 @@ function stopNode(node,removed) { } +function handleOnSend(flow,sendEvents, reportError) { + // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. + hooks.trigger("onSend",sendEvents,(err) => { + if (err) { + reportError(err,sendEvents); + return + } else if (err !== false) { + for (var i=0;i { + if (err) { + reportError(err,sendEvent); + return; + } else if (err !== false) { + sendEvent.destination.node = flow.getNode(sendEvent.destination.id); + if (sendEvent.destination.node) { + if (sendEvent.cloneMessage) { + sendEvent.msg = redUtil.cloneMessage(sendEvent.msg); + } + handlePreDeliver(flow,sendEvent,reportError); + } + } + }) +} + +function handlePreDeliver(flow,sendEvent, reportError) { + // preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed. + hooks.trigger("preDeliver",sendEvent,(err) => { + if (err) { + reportError(err,sendEvent); + return; + } else if (err !== false) { + setImmediate(function() { + if (sendEvent.destination.node) { + sendEvent.destination.node.receive(sendEvent.msg); + + } + }) + // postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) + hooks.trigger("postDeliver", sendEvent, function(err) { + if (err) { + reportError(err,sendEvent); + } + }) + } + }) +} + module.exports = { init: function(runtime) { nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000; diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index e0bba6a65..dfc8b5ed1 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -21,6 +21,8 @@ var redUtil = require("@node-red/util").util; var Log = require("@node-red/util").log; var context = require("./context"); var flows = require("../flows"); +const hooks = require("../hooks"); + const NOOP_SEND = function() {} @@ -121,6 +123,11 @@ Node.prototype.context = function() { */ Node.prototype._complete = function(msg,error) { this.metric("done",msg); + hooks.trigger("onComplete",{ msg: msg, error: error, node: { id: this.id, node: this }}, err => { + if (err) { + this.error(err); + } + }) if (error) { // For now, delegate this to this.error // But at some point, the timeout handling will need to know about @@ -183,42 +190,53 @@ Node.prototype.emit = function(event, ...args) { Node.prototype._emitInput = function(arg) { var node = this; this.metric("receive", arg); - if (node._inputCallback) { - // Just one callback registered. - try { - node._inputCallback( - arg, - function() { node.send.apply(node,arguments) }, - function(err) { node._complete(arg,err); } - ); - } catch(err) { - node.error(err,arg); - } - } else if (node._inputCallbacks) { - // Multiple callbacks registered. Call each one, tracking eventual completion - var c = node._inputCallbacks.length; - for (var i=0;i { + if (err) { + node.error(err); + return + } else if (err !== false) { + if (node._inputCallback) { + // Just one callback registered. + try { + node._inputCallback( + arg, + function() { node.send.apply(node,arguments) }, + function(err) { node._complete(arg,err); } + ); + } catch(err) { + node.error(err,arg); + } + } else if (node._inputCallbacks) { + // Multiple callbacks registered. Call each one, tracking eventual completion + var c = node._inputCallbacks.length; + for (var i=0;i {if (err) { node.error(err) }}); } - } + }); } /** @@ -354,7 +372,19 @@ Node.prototype.send = function(msg) { msg._msgid = redUtil.generateId(); } this.metric("send",msg); - this._flow.send(this,this._wire,msg); + this._flow.send([{ + msg: msg, + source: { + id: this.id, + node: this, + port: 0 + }, + destination: { + id: this._wire, + node: undefined + }, + cloneMessage: false + }]); return; } else { msg = [msg]; @@ -368,7 +398,7 @@ Node.prototype.send = function(msg) { var sendEvents = []; var sentMessageId = null; - + var hasMissingIds = false; // for each output of node eg. [msgs to output 0, msgs to output 1, ...] for (var i = 0; i < numOutputs; i++) { var wires = this.wires[i]; // wires leaving output i @@ -386,17 +416,27 @@ Node.prototype.send = function(msg) { for (k = 0; k < msgs.length; k++) { var m = msgs[k]; if (m !== null && m !== undefined) { + if (!m._msgid) { + hasMissingIds = true; + } /* istanbul ignore else */ if (!sentMessageId) { sentMessageId = m._msgid; } - if (msgSent) { - var clonedmsg = redUtil.cloneMessage(m); - sendEvents.push({n:wires[j],m:clonedmsg}); - } else { - sendEvents.push({n:wires[j],m:m}); - msgSent = true; - } + sendEvents.push({ + msg: m, + source: { + id: this.id, + node: this, + port: i + }, + destination: { + id: wires[j], + node: undefined + }, + cloneMessage: msgSent + }); + msgSent = true; } } } @@ -409,21 +449,22 @@ Node.prototype.send = function(msg) { } this.metric("send",{_msgid:sentMessageId}); - for (i=0;i { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + var messageReceived = false; + n2.receive = function(msg) { + messageReceived = true; + try { + msg.should.be.exactly(message); + shutdownTest(); + } catch(err) { + shutdownTest(err); + } + } + + var message = {payload:"hello"} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: false + }]) + messageReceived.should.be.false() + }) + it("sends a message - cloning", function(done) { + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + + n2.receive = function(msg) { + try { + // Message should be cloned + msg.should.be.eql(message); + msg.should.not.be.exactly(message); + shutdownTest(); + } catch(err) { + shutdownTest(err); + } + } + + var message = {payload:"hello"} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + }) + it("sends multiple messages", function(done) { + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + + var messageCount = 0; + n2.receive = function(msg) { + try { + msg.should.be.exactly(messages[messageCount++]); + if (messageCount === 2) { + shutdownTest(); + } + } catch(err) { + shutdownTest(err); + } + } + + var messages = [{payload:"hello"},{payload:"world"}]; + + flow.send([{ + msg: messages[0], + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined } + },{ + msg: messages[1], + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined } + }]) + }) + it("sends a message - triggers hooks", function(done) { + var hookErrors = []; + var messageReceived = false; + var hooksCalled = []; + hooks.add("onSend", function(sendEvents) { + hooksCalled.push("onSend") + try { + messageReceived.should.be.false() + sendEvents.should.have.length(1); + sendEvents[0].msg.should.be.exactly(message); + } catch(err) { + hookErrors.push(err); + } + }) + hooks.add("preRoute", function(sendEvent) { + hooksCalled.push("preRoute") + try { + messageReceived.should.be.false() + sendEvent.msg.should.be.exactly(message); + should.not.exist(sendEvent.destination.node) + } catch(err) { + hookErrors.push(err); + } + + }) + hooks.add("preDeliver", function(sendEvent) { + hooksCalled.push("preDeliver") + try { + messageReceived.should.be.false() + // Cloning should have happened + sendEvent.msg.should.not.be.exactly(message); + // Destinatino node populated + should.exist(sendEvent.destination.node) + } catch(err) { + hookErrors.push(err); + } + + }) + hooks.add("postDeliver", function(sendEvent) { + hooksCalled.push("postDeliver") + try { + messageReceived.should.be.false() + + } catch(err) { + hookErrors.push(err); + } + + }) + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + n2.receive = function(msg) { + messageReceived = true; + try { + msg.should.be.eql(message); + msg.should.not.be.exactly(message); + hooksCalled.should.eql(["onSend","preRoute","preDeliver","postDeliver"]) + if (hookErrors.length > 0) { + shutdownTest(hookErrors[0]) + } else { + shutdownTest(); + } + } catch(err) { + shutdownTest(err); + } + } + + var message = {payload:"hello"} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + }) + + describe("errors thrown by hooks are reported to the sending node", function() { + var flow; + var n1,n2; + var messageReceived = false; + var errorReceived = null; + before(function() { + hooks.add("onSend", function(sendEvents) { + if (sendEvents[0].msg.payload === "trigger-onSend") { + throw new Error("onSend Error"); + } + }) + hooks.add("preRoute", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preRoute") { + throw new Error("preRoute Error"); + } + }) + hooks.add("preDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preDeliver") { + throw new Error("preDeliver Error"); + } + }) + hooks.add("postDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-postDeliver") { + throw new Error("postDeliver Error"); + } + }) + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + n1 = flow.getNode('1'); + n2 = flow.getNode('2'); + n2.receive = function(msg) { + messageReceived = true; + } + n1.error = function(err) { + errorReceived = err; + } + + }) + after(function(done) { + hooks.clear(); + flow.stop().then(() => { done() }); + }) + beforeEach(function() { + messageReceived = false; + errorReceived = null; + }) + function testHook(hook, msgExpected, done) { + var message = {payload:"trigger-"+hook} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + setTimeout(function() { + try { + messageReceived.should.equal(msgExpected); + should.exist(errorReceived) + errorReceived.toString().should.containEql(hook); + done(); + } catch(err) { + done(err); + } + },10) + } + + it("onSend", function(done) { testHook("onSend", false, done) }) + it("preRoute", function(done) { testHook("preRoute", false, done) }) + it("preDeliver", function(done) { testHook("preDeliver", false, done) }) + it("postDeliver", function(done) { testHook("postDeliver", true, done) }) + }) + + describe("hooks can stop the sending of messages", function() { + var flow; + var n1,n2; + var messageReceived = false; + var errorReceived = false; + before(function() { + hooks.add("onSend", function(sendEvents) { + if (sendEvents[0].msg.payload === "trigger-onSend") { + return false + } + }) + hooks.add("preRoute", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preRoute") { + return false + } + }) + hooks.add("preDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preDeliver") { + return false + } + }) + hooks.add("postDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-postDeliver") { + return false + } + }) + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + n1 = flow.getNode('1'); + n2 = flow.getNode('2'); + n2.receive = function(msg) { + messageReceived = true; + } + n1.error = function(err) { + errorReceived = true; + } + + }) + after(function(done) { + hooks.clear(); + flow.stop().then(() => { done() }); + }) + function testSend(payload,messageReceivedExpected,errorReceivedExpected,done) { + messageReceived = false; + errorReceived = false; + flow.send([{ + msg: {payload: payload}, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + setTimeout(function() { + try { + messageReceived.should.eql(messageReceivedExpected) + errorReceived.should.eql(errorReceivedExpected) + done(); + } catch(err) { + done(err); + } + },10) + } + function testHook(hook, done) { + testSend("pass",true,false,err => { + if (err) { + done(err) + } else { + testSend("trigger-"+hook,false,false,done); + } + }) + } + + it("onSend", function(done) { testHook("onSend", done) }) + it("preRoute", function(done) { testHook("preRoute", done) }) + it("preDeliver", function(done) { testHook("preDeliver", done) }) + // postDeliver happens after delivery is scheduled so cannot stop it + // it("postDeliver", function(done) { testHook("postDeliver", done) }) + }) + }) + }); diff --git a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js index 3354c4215..df362dc85 100644 --- a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js @@ -19,6 +19,7 @@ var sinon = require('sinon'); var NR_TEST_UTILS = require("nr-test-utils"); var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var Log = NR_TEST_UTILS.require("@node-red/util").log; +var hooks = NR_TEST_UTILS.require("@node-red/runtime/lib/hooks"); var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); describe('Node', function() { @@ -181,6 +182,93 @@ describe('Node', function() { }); n.receive(message); }); + + it('triggers onComplete hook when done callback provided', function(done) { + var handleCompleteCalled = false; + var hookCalled = false; + var n = new RedNode({id:'123',type:'abc', _flow: { + handleComplete: function(node,msg) { + handleCompleteCalled = true; + } + }}); + var hookError; + hooks.add("onComplete",function(completeEvent) { + hookCalled = true; + try { + handleCompleteCalled.should.be.false("onComplete should be called before handleComplete") + should.not.exist(completeEvent.error); + completeEvent.msg.should.deepEqual(message); + completeEvent.node.id.should.eql("123"); + completeEvent.node.node.should.equal(n); + } catch(err) { + hookError = err; + } + }) + var message = {payload:"hello world"}; + n.on('input',function(msg, nodeSend, nodeDone) { + nodeDone(); + }); + n.receive(message); + setTimeout(function() { + if (hookError) { + done(hookError); + return + } + try { + hookCalled.should.be.true("onComplete hook should be called"); + handleCompleteCalled.should.be.true("handleComplete should be called"); + done(); + } catch(err) { + done(err); + } + }) + }); + + it('triggers onComplete hook when done callback provided - with error', function(done) { + var handleCompleteCalled = false; + var hookCalled = false; + var errorReported = false; + var n = new RedNode({id:'123',type:'abc', _flow: { + handleComplete: function(node,msg) { + handleCompleteCalled = true; + } + }}); + var hookError; + hooks.add("onComplete",function(completeEvent) { + hookCalled = true; + try { + handleCompleteCalled.should.be.false("onComplete should be called before handleComplete") + should.exist(completeEvent.error); + completeEvent.error.toString().should.equal("Error: test error") + completeEvent.msg.should.deepEqual(message); + completeEvent.node.id.should.eql("123"); + completeEvent.node.node.should.equal(n); + } catch(err) { + hookError = err; + } + }) + var message = {payload:"hello world"}; + n.on('input',function(msg, nodeSend, nodeDone) { + nodeDone(new Error("test error")); + }); + n.error = function(err,msg) { + errorReported = true; + } + n.receive(message); + setTimeout(function() { + if (hookError) { + done(hookError); + return + } + try { + hookCalled.should.be.true("onComplete hook should be called"); + handleCompleteCalled.should.be.false("handleComplete should not be called"); + done(); + } catch(err) { + done(err); + } + }) + }); it('logs error if callback provides error', function(done) { var n = new RedNode({id:'123',type:'abc'}); sinon.stub(n,"error",function(err,msg) {}); @@ -201,153 +289,250 @@ describe('Node', function() { }); n.receive(message); }); + it("triggers hooks when receiving a message", function(done) { + var hookErrors = []; + var messageReceived = false; + var hooksCalled = []; + hooks.add("onReceive", function(receiveEvent) { + hooksCalled.push("onReceive") + try { + messageReceived.should.be.false("Message should not have been received before onReceive") + receiveEvent.msg.should.be.exactly(message); + receiveEvent.destination.id.should.equal("123") + receiveEvent.destination.node.should.equal(n) + } catch(err) { + hookErrors.push(err); + } + }) + hooks.add("postReceive", function(receiveEvent) { + hooksCalled.push("postReceive") + try { + messageReceived.should.be.true("Message should have been received before postReceive") + receiveEvent.msg.should.be.exactly(message); + receiveEvent.destination.id.should.equal("123") + receiveEvent.destination.node.should.equal(n) + } catch(err) { + hookErrors.push(err); + } + + }) + var n = new RedNode({id:'123',type:'abc'}); + var message = {payload:"hello world"}; + n.on('input',function(msg) { + messageReceived = true; + try { + should.strictEqual(this,n); + hooksCalled.should.eql(["onReceive"]) + should.deepEqual(msg,message); + } catch(err) { + hookErrors.push(err) + } + }); + n.receive(message); + setTimeout(function() { + hooks.clear(); + if (hookErrors.length > 0) { + done(hookErrors[0]) + } else { + done(); + } + },10); + }); + describe("errors thrown by hooks are reported", function() { + before(function() { + hooks.add("onReceive",function(recEvent) { + if (recEvent.msg.payload === "trigger-onReceive") { + throw new Error("onReceive Error") + } + }) + hooks.add("postReceive",function(recEvent) { + if (recEvent.msg.payload === "trigger-postReceive") { + throw new Error("postReceive Error") + } + }) + }) + after(function() { + hooks.clear(); + }) + function testHook(hook, msgExpected, done) { + var messageReceived = false; + var errorReceived; + var n = new RedNode({id:'123',type:'abc'}); + var message = {payload:"trigger-"+hook}; + n.on('input',function(msg) { + messageReceived = true; + }); + n.error = function (err) { + errorReceived = err; + } + + n.receive(message); + + setTimeout(function() { + try { + messageReceived.should.equal(msgExpected,`Hook ${hook} messageReceived expected ${msgExpected} actual ${messageReceived}`); + should.exist(errorReceived); + errorReceived.toString().should.containEql(hook) + done() + } catch(err) { + done(err); + } + },10); + } + it("onReceive", function(done) { testHook("onReceive", false, done)}) + it("postReceive", function(done) { testHook("postReceive", true, done)}) + }) }); + describe("hooks can halt receive", function() { + before(function() { + hooks.add("onReceive",function(recEvent) { + if (recEvent.msg.payload === "trigger-onReceive") { + return false; + } + }) + }) + after(function() { + hooks.clear(); + }) + + function testHook(hook, msgExpected, done) { + var messageReceived = false; + var errorReceived; + var n = new RedNode({id:'123',type:'abc'}); + var message = {payload:"trigger-"+hook}; + n.on('input',function(msg) { + messageReceived = true; + }); + n.error = function (err) { + errorReceived = err; + } + + n.receive(message); + + setTimeout(function() { + try { + messageReceived.should.equal(msgExpected,`Hook ${hook} messageReceived expected ${msgExpected} actual ${messageReceived}`); + should.not.exist(errorReceived); + done() + } catch(err) { + done(err); + } + },10); + } + it("onReceive", function(done) { testHook("onReceive", false, done)}) + }) + + describe('#send', function() { it('emits a single message', function(done) { var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} + send: (sendEvents) => { + try { + sendEvents.should.have.length(1); + sendEvents[0].msg.should.equal(message); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}) + done(); + } catch(err) { + done(err); + } + }, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var message = {payload:"hello world"}; - var messageReceived = false; - n2.on('input',function(msg) { - // msg equals message, and is not a new copy - messageReceived = true; - should.strictEqual(this,n2); - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(); - }); n1.send(message); - messageReceived.should.be.false(); - }); - - it('emits a single message - multiple input event listeners', function(done) { - var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var message = {payload:"hello world"}; - var messageReceived = 0; - n2.on('input',function(msg) { - // msg equals message, and is not a new copy - messageReceived++; - messageReceived.should.be.exactly(1); - should.strictEqual(this,n2); - should.deepEqual(msg,message); - should.strictEqual(msg,message); - }); - n2.on('input',function(msg) { - messageReceived++; - messageReceived.should.be.exactly(2); - should.strictEqual(this,n2); - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(); - }); - n1.send(message); - messageReceived.should.be.exactly(0); - }); - - it('emits a single message - synchronous mode', function(done) { - var flow = { - handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - send: (node,dst,msg) => { n2.receive(msg) ;}, - asyncMessageDelivery: false - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var message = {payload:"hello world"}; - var messageReceived = false; - var notSyncErr; - n2.on('input',function(msg) { - try { - // msg equals message, and is not a new copy - messageReceived = true; - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(notSyncErr); - } catch(err) { - done(err); - } - }); - n1.send(message); - try { - messageReceived.should.be.true(); - } catch(err) { - notSyncErr = err; - } }); it('emits a message with callback provided send', function(done) { var flow = { handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, handleComplete: (node,msg) => {}, - send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} + send: (sendEvents) => { + try { + sendEvents.should.have.length(1); + sendEvents[0].msg.should.equal(message); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[0].cloneMessage.should.be.false(); + done(); + } catch(err) { + done(err); + } + }, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var message = {payload:"hello world"}; - var messageReceived = false; n1.on('input',function(msg,nodeSend,nodeDone) { nodeSend(msg); nodeDone(); }); - n2.on('input',function(msg) { - // msg equals message, and is not a new copy - messageReceived = true; - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(); - }); n1.receive(message); - messageReceived.should.be.false(); }); it('emits multiple messages on a single output', function(done) { var flow = { handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - send: (node,dst,msg) => { setImmediate(function() { n2.receive(msg) ;})} + send: (sendEvents) => { + try { + sendEvents.should.have.length(2); + sendEvents[0].msg.should.equal(messages[0]); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[0].cloneMessage.should.be.false(); + + sendEvents[1].msg.should.equal(messages[1]); + sendEvents[1].destination.should.eql({id:"n2", node: undefined}); + sendEvents[1].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[1].cloneMessage.should.be.true(); + + done(); + } catch(err) { + done(err); + } + }, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var messages = [ {payload:"hello world"}, {payload:"hello world again"} ]; - var rcvdCount = 0; - - n2.on('input',function(msg) { - if (rcvdCount === 0) { - // first msg sent, don't clone - should.deepEqual(msg,messages[rcvdCount]); - should.strictEqual(msg,messages[rcvdCount]); - rcvdCount += 1; - } else { - // second msg sent, clone - msg.payload.should.equal(messages[rcvdCount].payload); - should.notStrictEqual(msg,messages[rcvdCount]); - done(); - } - }); n1.send([messages]); }); it('emits messages to multiple outputs', function(done) { var flow = { handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3,'n4':n4,'n5':n5}[id]}, - send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} + send: (sendEvents) => { + try { + sendEvents.should.have.length(3); + sendEvents[0].msg.should.equal(messages[0]); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[0].cloneMessage.should.be.false(); + should.exist(sendEvents[0].msg._msgid); + sendEvents[1].msg.should.equal(messages[2]); + sendEvents[1].destination.should.eql({id:"n4", node: undefined}); + sendEvents[1].source.should.eql({id:"n1", node: n1, port: 2}) + sendEvents[1].cloneMessage.should.be.true(); + should.exist(sendEvents[1].msg._msgid); + sendEvents[2].msg.should.equal(messages[2]); + sendEvents[2].destination.should.eql({id:"n5", node: undefined}); + sendEvents[2].source.should.eql({id:"n1", node: n1, port: 2}) + sendEvents[2].cloneMessage.should.be.true(); + should.exist(sendEvents[2].msg._msgid); + + sendEvents[0].msg._msgid.should.eql(sendEvents[1].msg._msgid) + sendEvents[1].msg._msgid.should.eql(sendEvents[2].msg._msgid) + + done(); + } catch(err) { + done(err); + } + } }; var n1 = new RedNode({_flow:flow, id:'n1',type:'abc',wires:[['n2'],['n3'],['n4','n5']]}); var n2 = new RedNode({_flow:flow, id:'n2',type:'abc'}); @@ -362,43 +547,6 @@ describe('Node', function() { var rcvdCount = 0; - // first message sent, don't clone - // message uuids should match - n2.on('input',function(msg) { - should.deepEqual(msg,messages[0]); - should.strictEqual(msg,messages[0]); - rcvdCount += 1; - if (rcvdCount == 3) { - done(); - } - }); - - n3.on('input',function(msg) { - should.fail(null,null,"unexpected message"); - }); - - // second message sent, clone - // message uuids wont match since we've cloned - n4.on('input',function(msg) { - msg.payload.should.equal(messages[2].payload); - should.notStrictEqual(msg,messages[2]); - rcvdCount += 1; - if (rcvdCount == 3) { - done(); - } - }); - - // third message sent, clone - // message uuids wont match since we've cloned - n5.on('input',function(msg) { - msg.payload.should.equal(messages[2].payload); - should.notStrictEqual(msg,messages[2]); - rcvdCount += 1; - if (rcvdCount == 3) { - done(); - } - }); - n1.send(messages); }); @@ -421,107 +569,85 @@ describe('Node', function() { n1.send(); }); - it('emits messages ignoring non-existent nodes', function(done) { - var flow = { - handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - send: (node,dst,msg) => { setImmediate(function() { var n = flow.getNode(dst); n && n.receive(msg) })} - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n9'],['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + // it('emits messages without cloning req or res', function(done) { + // var flow = { + // getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3}[id]}, + // send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} + // }; + // var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[[['n2'],['n3']]]}); + // var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + // var n3 = new RedNode({_flow:flow,id:'n3',type:'abc'}); + // + // var req = {}; + // var res = {}; + // var cloned = {}; + // var message = {payload: "foo", cloned: cloned, req: req, res: res}; + // + // var rcvdCount = 0; + // + // // first message to be sent, so should not be cloned + // n2.on('input',function(msg) { + // should.deepEqual(msg, message); + // msg.cloned.should.be.exactly(message.cloned); + // msg.req.should.be.exactly(message.req); + // msg.res.should.be.exactly(message.res); + // rcvdCount += 1; + // if (rcvdCount == 2) { + // done(); + // } + // }); + // + // // second message to be sent, so should be cloned + // // message uuids wont match since we've cloned + // n3.on('input',function(msg) { + // msg.payload.should.equal(message.payload); + // msg.cloned.should.not.be.exactly(message.cloned); + // msg.req.should.be.exactly(message.req); + // msg.res.should.be.exactly(message.res); + // rcvdCount += 1; + // if (rcvdCount == 2) { + // done(); + // } + // }); + // + // n1.send(message); + // }); - var messages = [ - {_msgid:"123", payload:"hello world"}, - {_msgid:"234", payload:"hello world again"} - ]; - - n2.on('input',function(msg) { - should.deepEqual(msg,messages[1]); - done(); - }); - - n1.send(messages); - }); - - it('emits messages without cloning req or res', function(done) { - var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3}[id]}, - send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[[['n2'],['n3']]]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var n3 = new RedNode({_flow:flow,id:'n3',type:'abc'}); - - var req = {}; - var res = {}; - var cloned = {}; - var message = {payload: "foo", cloned: cloned, req: req, res: res}; - - var rcvdCount = 0; - - // first message to be sent, so should not be cloned - n2.on('input',function(msg) { - should.deepEqual(msg, message); - msg.cloned.should.be.exactly(message.cloned); - msg.req.should.be.exactly(message.req); - msg.res.should.be.exactly(message.res); - rcvdCount += 1; - if (rcvdCount == 2) { - done(); - } - }); - - // second message to be sent, so should be cloned - // message uuids wont match since we've cloned - n3.on('input',function(msg) { - msg.payload.should.equal(message.payload); - msg.cloned.should.not.be.exactly(message.cloned); - msg.req.should.be.exactly(message.req); - msg.res.should.be.exactly(message.res); - rcvdCount += 1; - if (rcvdCount == 2) { - done(); - } - }); - - n1.send(message); - }); - - it("logs the uuid for all messages sent", function(done) { - var logHandler = { - msgIds:[], - messagesSent: 0, - emit: function(event, msg) { - if (msg.event == "node.abc.send" && msg.level == Log.METRIC) { - this.messagesSent++; - this.msgIds.push(msg.msgid); - (typeof msg.msgid).should.not.be.equal("undefined"); - } - } - }; - - Log.addHandler(logHandler); - var flow = { - getNode: (id) => { return {'n1':sender,'n2':receiver1,'n3':receiver2}[id]}, - send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} - }; - - var sender = new RedNode({_flow:flow,id:'n1',type:'abc', wires:[['n2', 'n3']]}); - var receiver1 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var receiver2 = new RedNode({_flow:flow,id:'n3',type:'abc'}); - sender.send({"some": "message"}); - setTimeout(function() { - try { - logHandler.messagesSent.should.equal(1); - should.exist(logHandler.msgIds[0]) - Log.removeHandler(logHandler); - done(); - } catch(err) { - Log.removeHandler(logHandler); - done(err); - } - },50) - }) + // it("logs the uuid for all messages sent", function(done) { + // var logHandler = { + // msgIds:[], + // messagesSent: 0, + // emit: function(event, msg) { + // if (msg.event == "node.abc.send" && msg.level == Log.METRIC) { + // this.messagesSent++; + // this.msgIds.push(msg.msgid); + // (typeof msg.msgid).should.not.be.equal("undefined"); + // } + // } + // }; + // + // Log.addHandler(logHandler); + // var flow = { + // getNode: (id) => { return {'n1':sender,'n2':receiver1,'n3':receiver2}[id]}, + // send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} + // }; + // + // var sender = new RedNode({_flow:flow,id:'n1',type:'abc', wires:[['n2', 'n3']]}); + // var receiver1 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + // var receiver2 = new RedNode({_flow:flow,id:'n3',type:'abc'}); + // sender.send({"some": "message"}); + // setTimeout(function() { + // try { + // logHandler.messagesSent.should.equal(1); + // should.exist(logHandler.msgIds[0]) + // Log.removeHandler(logHandler); + // done(); + // } catch(err) { + // Log.removeHandler(logHandler); + // done(err); + // } + // },50) + // }) });