diff --git a/Gruntfile.js b/Gruntfile.js index 1eaf3f6a9..ff711e654 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -457,6 +457,7 @@ module.exports = function(grunt) { 'packages/node_modules/@node-red/runtime/lib/index.js', 'packages/node_modules/@node-red/runtime/lib/api/*.js', 'packages/node_modules/@node-red/runtime/lib/events.js', + 'packages/node_modules/@node-red/runtime/lib/hooks.js', 'packages/node_modules/@node-red/util/**/*.js', 'packages/node_modules/@node-red/editor-api/lib/index.js', 'packages/node_modules/@node-red/editor-api/lib/auth/index.js' diff --git a/packages/node_modules/@node-red/registry/lib/util.js b/packages/node_modules/@node-red/registry/lib/util.js index 4e68caf51..5a6d3da38 100644 --- a/packages/node_modules/@node-red/registry/lib/util.js +++ b/packages/node_modules/@node-red/registry/lib/util.js @@ -57,6 +57,7 @@ function createNodeApi(node) { log: {}, settings: {}, events: runtime.events, + hooks: runtime.hooks, util: runtime.util, version: runtime.version, require: requireModule, diff --git a/packages/node_modules/@node-red/runtime/lib/api/flows.js b/packages/node_modules/@node-red/runtime/lib/api/flows.js index 4207c83ce..811d7b5cb 100644 --- a/packages/node_modules/@node-red/runtime/lib/api/flows.js +++ b/packages/node_modules/@node-red/runtime/lib/api/flows.js @@ -49,11 +49,9 @@ var api = module.exports = { * @return {Promise} - the active flow configuration * @memberof @node-red/runtime_flows */ - getFlows: function(opts) { - return new Promise(function(resolve,reject) { - runtime.log.audit({event: "flows.get"}, opts.req); - return resolve(runtime.nodes.getFlows()); - }); + getFlows: async function(opts) { + runtime.log.audit({event: "flows.get"}, opts.req); + return runtime.flows.getFlows(); }, /** * Sets the current flow configuration @@ -65,38 +63,35 @@ var api = module.exports = { * @return {Promise} - the active flow configuration * @memberof @node-red/runtime_flows */ - setFlows: function(opts) { - return mutex.runExclusive(function() { - return new Promise(function(resolve,reject) { + setFlows: async function(opts) { + return mutex.runExclusive(async function() { + var flows = opts.flows; + var deploymentType = opts.deploymentType||"full"; + runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req); - var flows = opts.flows; - var deploymentType = opts.deploymentType||"full"; - runtime.log.audit({event: "flows.set",type:deploymentType}, opts.req); - - var apiPromise; - if (deploymentType === 'reload') { - apiPromise = runtime.nodes.loadFlows(true); - } else { - if (flows.hasOwnProperty('rev')) { - var currentVersion = runtime.nodes.getFlows().rev; - if (currentVersion !== flows.rev) { - var err; - err = new Error(); - err.code = "version_mismatch"; - err.status = 409; - //TODO: log warning - return reject(err); - } + var apiPromise; + if (deploymentType === 'reload') { + apiPromise = runtime.flows.loadFlows(true); + } else { + if (flows.hasOwnProperty('rev')) { + var currentVersion = runtime.flows.getFlows().rev; + if (currentVersion !== flows.rev) { + var err; + err = new Error(); + err.code = "version_mismatch"; + err.status = 409; + //TODO: log warning + throw err; } - apiPromise = runtime.nodes.setFlows(flows.flows,flows.credentials,deploymentType,null,null,opts.user); } - apiPromise.then(function(flowId) { - return resolve({rev:flowId}); - }).catch(function(err) { - runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message})); - runtime.log.warn(err.stack); - return reject(err); - }); + apiPromise = runtime.flows.setFlows(flows.flows,flows.credentials,deploymentType,null,null,opts.user); + } + return apiPromise.then(function(flowId) { + return {rev:flowId}; + }).catch(function(err) { + runtime.log.warn(runtime.log._("api.flows.error-"+(deploymentType === 'reload'?'reload':'save'),{message:err.message})); + runtime.log.warn(err.stack); + throw err }); }); }, @@ -110,22 +105,20 @@ var api = module.exports = { * @return {Promise} - the id of the added flow * @memberof @node-red/runtime_flows */ - addFlow: function(opts) { - return mutex.runExclusive(function() { - return new Promise(function (resolve, reject) { - var flow = opts.flow; - runtime.nodes.addFlow(flow,opts.user).then(function (id) { - runtime.log.audit({event: "flow.add", id: id}, opts.req); - return resolve(id); - }).catch(function (err) { - runtime.log.audit({ - event: "flow.add", - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - }) + addFlow: async function(opts) { + return mutex.runExclusive(async function() { + var flow = opts.flow; + return runtime.flows.addFlow(flow, opts.user).then(function (id) { + runtime.log.audit({event: "flow.add", id: id}, opts.req); + return id; + }).catch(function (err) { + runtime.log.audit({ + event: "flow.add", + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; + throw err; }) }); }, @@ -139,20 +132,18 @@ var api = module.exports = { * @return {Promise} - the active flow configuration * @memberof @node-red/runtime_flows */ - getFlow: function(opts) { - return new Promise(function (resolve,reject) { - var flow = runtime.nodes.getFlow(opts.id); - if (flow) { - runtime.log.audit({event: "flow.get",id:opts.id}, opts.req); - return resolve(flow); - } else { - runtime.log.audit({event: "flow.get",id:opts.id,error:"not_found"}, opts.req); - var err = new Error(); - err.code = "not_found"; - err.status = 404; - return reject(err); - } - }) + getFlow: async function(opts) { + var flow = runtime.flows.getFlow(opts.id); + if (flow) { + runtime.log.audit({event: "flow.get",id:opts.id}, opts.req); + return flow; + } else { + runtime.log.audit({event: "flow.get",id:opts.id,error:"not_found"}, opts.req); + var err = new Error(); + err.code = "not_found"; + err.status = 404; + throw err; + } }, /** * Updates an existing flow configuration @@ -164,42 +155,29 @@ var api = module.exports = { * @return {Promise} - the id of the updated flow * @memberof @node-red/runtime_flows */ - updateFlow: function(opts) { - return mutex.runExclusive(function() { - return new Promise(function (resolve, reject) { - var flow = opts.flow; - var id = opts.id; - try { - runtime.nodes.updateFlow(id, flow, opts.user).then(function () { - runtime.log.audit({event: "flow.update", id: id}, opts.req); - return resolve(id); - }).catch(function (err) { - runtime.log.audit({ - event: "flow.update", - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - }) - } catch (err) { - if (err.code === 404) { - runtime.log.audit({event: "flow.update", id: id, error: "not_found"}, opts.req); - // TODO: this swap around of .code and .status isn't ideal - err.status = 404; - err.code = "not_found"; - return reject(err); - } else { - runtime.log.audit({ - event: "flow.update", - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - } + updateFlow: async function(opts) { + return mutex.runExclusive(async function() { + var flow = opts.flow; + var id = opts.id; + return runtime.flows.updateFlow(id, flow, opts.user).then(function () { + runtime.log.audit({event: "flow.update", id: id}, opts.req); + return id; + }).catch(function (err) { + if (err.code === 404) { + runtime.log.audit({event: "flow.update", id: id, error: "not_found"}, opts.req); + // TODO: this swap around of .code and .status isn't ideal + err.status = 404; + err.code = "not_found"; + } else { + runtime.log.audit({ + event: "flow.update", + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; } - }); + throw err; + }) }); }, /** @@ -211,42 +189,28 @@ var api = module.exports = { * @return {Promise} - resolves if successful * @memberof @node-red/runtime_flows */ - deleteFlow: function(opts) { + deleteFlow: async function(opts) { return mutex.runExclusive(function() { - return new Promise(function (resolve, reject) { - var id = opts.id; - try { - runtime.nodes.removeFlow(id, opts.user).then(function () { - runtime.log.audit({event: "flow.remove", id: id}, opts.req); - return resolve(); - }).catch(function (err) { - runtime.log.audit({ - event: "flow.remove", - id: id, - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - }); - } catch (err) { - if (err.code === 404) { - runtime.log.audit({event: "flow.remove", id: id, error: "not_found"}, opts.req); - // TODO: this swap around of .code and .status isn't ideal - err.status = 404; - err.code = "not_found"; - return reject(err); - } else { - runtime.log.audit({ - event: "flow.remove", - id: id, - error: err.code || "unexpected_error", - message: err.toString() - }, opts.req); - err.status = 400; - return reject(err); - } + var id = opts.id; + return runtime.flows.removeFlow(id, opts.user).then(function () { + runtime.log.audit({event: "flow.remove", id: id}, opts.req); + return; + }).catch(function (err) { + if (err.code === 404) { + runtime.log.audit({event: "flow.remove", id: id, error: "not_found"}, opts.req); + // TODO: this swap around of .code and .status isn't ideal + err.status = 404; + err.code = "not_found"; + } else { + runtime.log.audit({ + event: "flow.remove", + id: id, + error: err.code || "unexpected_error", + message: err.toString() + }, opts.req); + err.status = 400; } + throw err; }); }); }, @@ -261,35 +225,33 @@ var api = module.exports = { * @return {Promise} - the safe credentials * @memberof @node-red/runtime_flows */ - getNodeCredentials: function(opts) { - return new Promise(function(resolve,reject) { - runtime.log.audit({event: "credentials.get",type:opts.type,id:opts.id}, opts.req); - var credentials = runtime.nodes.getCredentials(opts.id); - if (!credentials) { - return resolve({}); - } - var sendCredentials = {}; - var cred; - if (/^subflow(:|$)/.test(opts.type)) { - for (cred in credentials) { - if (credentials.hasOwnProperty(cred)) { - sendCredentials['has_'+cred] = credentials[cred] != null && credentials[cred] !== ''; - } - } - } else { - var definition = runtime.nodes.getCredentialDefinition(opts.type) || {}; - for (cred in definition) { - if (definition.hasOwnProperty(cred)) { - if (definition[cred].type == "password") { - var key = 'has_' + cred; - sendCredentials[key] = credentials[cred] != null && credentials[cred] !== ''; - continue; - } - sendCredentials[cred] = credentials[cred] || ''; - } + getNodeCredentials: async function(opts) { + runtime.log.audit({event: "credentials.get",type:opts.type,id:opts.id}, opts.req); + var credentials = runtime.nodes.getCredentials(opts.id); + if (!credentials) { + return {}; + } + var sendCredentials = {}; + var cred; + if (/^subflow(:|$)/.test(opts.type)) { + for (cred in credentials) { + if (credentials.hasOwnProperty(cred)) { + sendCredentials['has_'+cred] = credentials[cred] != null && credentials[cred] !== ''; } } - resolve(sendCredentials); - }) + } else { + var definition = runtime.nodes.getCredentialDefinition(opts.type) || {}; + for (cred in definition) { + if (definition.hasOwnProperty(cred)) { + if (definition[cred].type == "password") { + var key = 'has_' + cred; + sendCredentials[key] = credentials[cred] != null && credentials[cred] !== ''; + continue; + } + sendCredentials[cred] = credentials[cred] || ''; + } + } + } + return sendCredentials; } } diff --git a/packages/node_modules/@node-red/runtime/lib/events.js b/packages/node_modules/@node-red/runtime/lib/events.js index ed280a618..8e6935222 100644 --- a/packages/node_modules/@node-red/runtime/lib/events.js +++ b/packages/node_modules/@node-red/runtime/lib/events.js @@ -14,9 +14,34 @@ * limitations under the License. **/ -var events = require("events"); +const events = new (require("events")).EventEmitter(); -module.exports = new events.EventEmitter(); + +const deprecatedEvents = { + "nodes-stopped": "flows:stopped", + "nodes-started": "flows:started" +} + +function wrapEventFunction(obj,func) { + events["_"+func] = events[func]; + return function(eventName, listener) { + if (deprecatedEvents.hasOwnProperty(eventName)) { + const log = require("@node-red/util").log; + const stack = (new Error().stack).split("\n")[2].split("(")[1].slice(0,-1); + log.warn(`[RED.events] Deprecated use of "${eventName}" event from "${stack}". Use "${deprecatedEvents[eventName]}" instead.`) + } + return events["_"+func].call(events,eventName,listener) + } +} + + +events.on = wrapEventFunction(events,"on"); +events.once = wrapEventFunction(events,"once"); +events.addListener = events.on; + + + +module.exports = events; /** * Runtime events emitter diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js similarity index 85% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js rename to packages/node_modules/@node-red/runtime/lib/flows/Flow.js index df4e810fa..20a4f6c3d 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -17,8 +17,9 @@ var clone = require("clone"); var redUtil = require("@node-red/util").util; var flowUtil = require("./util"); -var events = require("../../events"); -const context = require('../context'); +var events = require("../events"); +const context = require('../nodes/context'); +const hooks = require("../hooks"); var Subflow; var Log; @@ -539,8 +540,25 @@ class Flow { } } - get asyncMessageDelivery() { - return asyncMessageDelivery + send(sendEvents) { + // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. + // preRoute - called once for each SendEvent object in turn + // preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed. + // postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) + // onReceive - a node is about to receive a message + // postReceive - the message has been passed to the node's input handler + // onDone, onError - the node has completed with a message or logged an error + handleOnSend(this,sendEvents, (err, eventData) => { + if (err) { + let srcNode; + if (Array.isArray(eventData)) { + srcNode = eventData[0].source.node; + } else { + srcNode = eventData.source.node; + } + srcNode.error(err); + } + }); } dump() { @@ -589,6 +607,67 @@ function stopNode(node,removed) { } +function handleOnSend(flow,sendEvents, reportError) { + // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. + hooks.trigger("onSend",sendEvents,(err) => { + if (err) { + reportError(err,sendEvents); + return + } else if (err !== false) { + for (var i=0;i { + if (err) { + reportError(err,sendEvent); + return; + } else if (err !== false) { + sendEvent.destination.node = flow.getNode(sendEvent.destination.id); + if (sendEvent.destination.node) { + if (sendEvent.cloneMessage) { + sendEvent.msg = redUtil.cloneMessage(sendEvent.msg); + } + handlePreDeliver(flow,sendEvent,reportError); + } + } + }) +} + +function handlePreDeliver(flow,sendEvent, reportError) { + // preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed. + hooks.trigger("preDeliver",sendEvent,(err) => { + if (err) { + reportError(err,sendEvent); + return; + } else if (err !== false) { + if (asyncMessageDelivery) { + setImmediate(function() { + if (sendEvent.destination.node) { + sendEvent.destination.node.receive(sendEvent.msg); + } + }) + } else { + if (sendEvent.destination.node) { + sendEvent.destination.node.receive(sendEvent.msg); + + } + } + // postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) + hooks.trigger("postDeliver", sendEvent, function(err) { + if (err) { + reportError(err,sendEvent); + } + }) + } + }) +} + module.exports = { init: function(runtime) { nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000; diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Subflow.js b/packages/node_modules/@node-red/runtime/lib/flows/Subflow.js similarity index 99% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/Subflow.js rename to packages/node_modules/@node-red/runtime/lib/flows/Subflow.js index cca230d79..c651e77aa 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Subflow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Subflow.js @@ -16,14 +16,14 @@ const clone = require("clone"); const Flow = require('./Flow').Flow; -const context = require('../context'); +const context = require('../nodes/context'); const util = require("util"); const redUtil = require("@node-red/util").util; const flowUtil = require("./util"); -const credentials = require("../credentials"); +const credentials = require("../nodes/credentials"); var Log; @@ -159,7 +159,7 @@ class Subflow extends Flow { start(diff) { var self = this; // Create a subflow node to accept inbound messages and route appropriately - var Node = require("../Node"); + var Node = require("../nodes/Node"); if (this.subflowDef.status) { var subflowStatusConfig = { diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js similarity index 96% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js rename to packages/node_modules/@node-red/runtime/lib/flows/index.js index 38742daf3..147f62ace 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -15,21 +15,19 @@ **/ var clone = require("clone"); -var when = require("when"); var Flow = require('./Flow'); var typeRegistry = require("@node-red/registry"); var deprecated = typeRegistry.deprecated; - -var context = require("../context") -var credentials = require("../credentials"); - +var context = require("../nodes/context") +var credentials = require("../nodes/credentials"); var flowUtil = require("./util"); var log; -var events = require("../../events"); +var events = require("../events"); var redUtil = require("@node-red/util").util; +const hooks = require("../hooks"); var storage = null; var settings = null; @@ -294,6 +292,8 @@ function start(type,diff,muteLog) { } } + events.emit("flows:starting", {config: activeConfig, type: type, diff: diff}) + var id; if (type === "full") { // A full start means everything should @@ -354,6 +354,8 @@ function start(type,diff,muteLog) { } } } + events.emit("flows:started", {config: activeConfig, type: type, diff: diff}); + // Deprecated event events.emit("nodes-started"); if (credentialsPendingReset === true) { @@ -401,6 +403,8 @@ function stop(type,diff,muteLog) { stopList = diff.changed.concat(diff.removed).concat(diff.linked); } + events.emit("flows:stopping",{config: activeConfig, type: type, diff: diff}) + for (var id in activeFlows) { if (activeFlows.hasOwnProperty(id)) { var flowStateChanged = diff && (diff.added.indexOf(id) !== -1 || diff.removed.indexOf(id) !== -1); @@ -432,6 +436,8 @@ function stop(type,diff,muteLog) { log.info(log._("nodes.flows.stopped-flows")); } } + events.emit("flows:stopped",{config: activeConfig, type: type, diff: diff}); + // Deprecated event events.emit("nodes-stopped"); }); } @@ -481,7 +487,7 @@ function updateMissingTypes() { } } -function addFlow(flow, user) { +async function addFlow(flow, user) { var i,node; if (!flow.hasOwnProperty('nodes')) { throw new Error('missing nodes property'); @@ -506,10 +512,10 @@ function addFlow(flow, user) { node = flow.nodes[i]; if (activeFlowConfig.allNodes[node.id]) { // TODO nls - return when.reject(new Error('duplicate id')); + throw new Error('duplicate id'); } if (node.type === 'tab' || node.type === 'subflow') { - return when.reject(new Error('invalid node type: '+node.type)); + throw new Error('invalid node type: '+node.type); } node.z = flow.id; nodes.push(node); @@ -519,10 +525,10 @@ function addFlow(flow, user) { node = flow.configs[i]; if (activeFlowConfig.allNodes[node.id]) { // TODO nls - return when.reject(new Error('duplicate id')); + throw new Error('duplicate id'); } if (node.type === 'tab' || node.type === 'subflow') { - return when.reject(new Error('invalid node type: '+node.type)); + throw new Error('invalid node type: '+node.type); } node.z = flow.id; nodes.push(node); @@ -607,7 +613,7 @@ function getFlow(id) { return result; } -function updateFlow(id,newFlow, user) { +async function updateFlow(id,newFlow, user) { var label = id; if (id !== 'global') { if (!activeFlowConfig.flows[id]) { @@ -667,7 +673,7 @@ function updateFlow(id,newFlow, user) { }) } -function removeFlow(id, user) { +async function removeFlow(id, user) { if (id === 'global') { // TODO: nls + error code throw new Error('not allowed to remove global'); diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/util.js b/packages/node_modules/@node-red/runtime/lib/flows/util.js similarity index 100% rename from packages/node_modules/@node-red/runtime/lib/nodes/flows/util.js rename to packages/node_modules/@node-red/runtime/lib/flows/util.js diff --git a/packages/node_modules/@node-red/runtime/lib/hooks.js b/packages/node_modules/@node-red/runtime/lib/hooks.js new file mode 100644 index 000000000..4ae537cc4 --- /dev/null +++ b/packages/node_modules/@node-red/runtime/lib/hooks.js @@ -0,0 +1,182 @@ +const Log = require("@node-red/util").log; + +const VALID_HOOKS = [ + // Message Routing Path + "onSend", + "preRoute", + "preDeliver", + "postDeliver", + "onReceive", + "postReceive", + "onComplete" +] + + +// Flags for what hooks have handlers registered +let states = { } + +// Hooks by id +let hooks = { } + +// Hooks by label +let labelledHooks = { } + +/** + * Runtime hooks engine + * + * The following hooks can be used: + * + * Message sending + * - `onSend` - passed an array of `SendEvent` objects. The messages inside these objects are exactly what the node has passed to `node.send` - meaning there could be duplicate references to the same message object. + * - `preRoute` - passed a `SendEvent` + * - `preDeliver` - passed a `SendEvent`. The local router has identified the node it is going to send to. At this point, the message has been cloned if needed. + * - `postDeliver` - passed a `SendEvent`. The message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery) + * - `onReceive` - passed a `ReceiveEvent` when a node is about to receive a message + * - `postReceive` - passed a `ReceiveEvent` when the message has been given to the node's `input` handler(s) + * - `onComplete` - passed a `CompleteEvent` when the node has completed with a message or logged an error + * + * @mixin @node-red/runtime_hooks + */ + +/** + * Register a handler to a named hook + * @memberof @node-red/runtime_hooks + * @param {String} hookId - the name of the hook to attach to + * @param {Function} callback - the callback function for the hook + */ +function add(hookId, callback) { + let [id, label] = hookId.split("."); + if (VALID_HOOKS.indexOf(id) === -1) { + throw new Error(`Invalid hook '${id}'`); + } + if (label) { + if (labelledHooks[label] && labelledHooks[label][id]) { + throw new Error("Hook "+hookId+" already registered") + } + labelledHooks[label] = labelledHooks[label]||{}; + labelledHooks[label][id] = callback; + } + // Get location of calling code + const stack = new Error().stack; + const callModule = stack.split("\n")[2].split("(")[1].slice(0,-1); + Log.debug(`Adding hook '${hookId}' from ${callModule}`); + + hooks[id] = hooks[id] || []; + hooks[id].push({cb:callback,location:callModule}); + states[id] = true; +} + +/** + * Remove a handled from a named hook + * @memberof @node-red/runtime_hooks + * @param {String} hookId - the name of the hook event to remove - must be `name.label` + */ +function remove(hookId) { + let [id,label] = hookId.split("."); + if ( !label) { + throw new Error("Cannot remove hook without label: ",hookId) + } + Log.debug(`Removing hook '${hookId}'`); + if (labelledHooks[label]) { + if (id === "*") { + // Remove all hooks for this label + let hookList = Object.keys(labelledHooks[label]); + for (let i=0;i hook.cb === callback); + if (i !== -1) { + hooks[id].splice(i,1); + if (hooks[id].length === 0) { + delete hooks[id]; + delete states[id]; + } + } +} + + +function trigger(hookId, payload, done) { + const hookStack = hooks[hookId]; + if (!hookStack || hookStack.length === 0) { + done(); + return; + } + let i = 0; + + function callNextHook(err) { + if (i === hookStack.length || err) { + done(err); + return; + } + const hook = hookStack[i++]; + const callback = hook.cb; + if (callback.length === 1) { + try { + let result = callback(payload); + if (result === false) { + // Halting the flow + done(false); + return + } + if (result && typeof result.then === 'function') { + result.then(handleResolve, callNextHook) + return; + } + callNextHook(); + } catch(err) { + done(err); + return; + } + } else { + try { + callback(payload,handleResolve) + } catch(err) { + done(err); + return; + } + } + } + callNextHook(); + + function handleResolve(result) { + if (result === undefined) { + callNextHook(); + } else { + done(result); + } + } +} + +function clear() { + hooks = {} + labelledHooks = {} + states = {} +} + +function has(hookId) { + let [id, label] = hookId.split("."); + if (label) { + return !!(labelledHooks[label] && labelledHooks[label][id]) + } + return !!states[id] +} + +module.exports = { + has, + clear, + add, + remove, + trigger +} \ No newline at end of file diff --git a/packages/node_modules/@node-red/runtime/lib/index.js b/packages/node_modules/@node-red/runtime/lib/index.js index 66bff6ff2..f9a772ea3 100644 --- a/packages/node_modules/@node-red/runtime/lib/index.js +++ b/packages/node_modules/@node-red/runtime/lib/index.js @@ -19,9 +19,11 @@ var when = require('when'); var externalAPI = require("./api"); var redNodes = require("./nodes"); +var flows = require("./flows"); var storage = require("./storage"); var library = require("./library"); var events = require("./events"); +var hooks = require("./hooks"); var settings = require("./settings"); var exec = require("./exec"); @@ -272,7 +274,9 @@ var runtime = { settings: settings, storage: storage, events: events, + hooks: hooks, nodes: redNodes, + flows: flows, library: library, exec: exec, util: require("@node-red/util").util, @@ -355,6 +359,7 @@ module.exports = { storage: storage, events: events, + hooks: hooks, util: require("@node-red/util").util, get httpNode() { return nodeApp }, get httpAdmin() { return adminApp }, diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index 60b106426..dfc8b5ed1 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -20,7 +20,9 @@ var EventEmitter = require("events").EventEmitter; var redUtil = require("@node-red/util").util; var Log = require("@node-red/util").log; var context = require("./context"); -var flows = require("./flows"); +var flows = require("../flows"); +const hooks = require("../hooks"); + const NOOP_SEND = function() {} @@ -55,10 +57,6 @@ function Node(n) { // as part of its constructor - config._flow will overwrite this._flow // which we can tolerate as they are the same object. Object.defineProperty(this,'_flow', {value: n._flow, enumerable: false, writable: true }) - this._asyncDelivery = n._flow.asyncMessageDelivery; - } - if (this._asyncDelivery === undefined) { - this._asyncDelivery = true; } this.updateWires(n.wires); } @@ -125,6 +123,11 @@ Node.prototype.context = function() { */ Node.prototype._complete = function(msg,error) { this.metric("done",msg); + hooks.trigger("onComplete",{ msg: msg, error: error, node: { id: this.id, node: this }}, err => { + if (err) { + this.error(err); + } + }) if (error) { // For now, delegate this to this.error // But at some point, the timeout handling will need to know about @@ -173,15 +176,7 @@ Node.prototype._emit = Node.prototype.emit; Node.prototype.emit = function(event, ...args) { var node = this; if (event === "input") { - // When Pluggable Message Routing arrives, this will be called from - // that and will already be sync/async depending on the router. - if (this._asyncDelivery) { - setImmediate(function() { - node._emitInput.apply(node,args); - }); - } else { - this._emitInput.apply(this,args); - } + this._emitInput.apply(this,args); } else { this._emit.apply(this,arguments); } @@ -195,42 +190,53 @@ Node.prototype.emit = function(event, ...args) { Node.prototype._emitInput = function(arg) { var node = this; this.metric("receive", arg); - if (node._inputCallback) { - // Just one callback registered. - try { - node._inputCallback( - arg, - function() { node.send.apply(node,arguments) }, - function(err) { node._complete(arg,err); } - ); - } catch(err) { - node.error(err,arg); - } - } else if (node._inputCallbacks) { - // Multiple callbacks registered. Call each one, tracking eventual completion - var c = node._inputCallbacks.length; - for (var i=0;i { + if (err) { + node.error(err); + return + } else if (err !== false) { + if (node._inputCallback) { + // Just one callback registered. + try { + node._inputCallback( + arg, + function() { node.send.apply(node,arguments) }, + function(err) { node._complete(arg,err); } + ); + } catch(err) { + node.error(err,arg); + } + } else if (node._inputCallbacks) { + // Multiple callbacks registered. Call each one, tracking eventual completion + var c = node._inputCallbacks.length; + for (var i=0;i {if (err) { node.error(err) }}); } - } + }); } /** @@ -366,11 +372,19 @@ Node.prototype.send = function(msg) { msg._msgid = redUtil.generateId(); } this.metric("send",msg); - node = this._flow.getNode(this._wire); - /* istanbul ignore else */ - if (node) { - node.receive(msg); - } + this._flow.send([{ + msg: msg, + source: { + id: this.id, + node: this, + port: 0 + }, + destination: { + id: this._wire, + node: undefined + }, + cloneMessage: false + }]); return; } else { msg = [msg]; @@ -384,7 +398,7 @@ Node.prototype.send = function(msg) { var sendEvents = []; var sentMessageId = null; - + var hasMissingIds = false; // for each output of node eg. [msgs to output 0, msgs to output 1, ...] for (var i = 0; i < numOutputs; i++) { var wires = this.wires[i]; // wires leaving output i @@ -398,24 +412,31 @@ Node.prototype.send = function(msg) { var k = 0; // for each recipent node of that output for (var j = 0; j < wires.length; j++) { - node = this._flow.getNode(wires[j]); // node at end of wire j - if (node) { - // for each msg to send eg. [[m1, m2, ...], ...] - for (k = 0; k < msgs.length; k++) { - var m = msgs[k]; - if (m !== null && m !== undefined) { - /* istanbul ignore else */ - if (!sentMessageId) { - sentMessageId = m._msgid; - } - if (msgSent) { - var clonedmsg = redUtil.cloneMessage(m); - sendEvents.push({n:node,m:clonedmsg}); - } else { - sendEvents.push({n:node,m:m}); - msgSent = true; - } + // for each msg to send eg. [[m1, m2, ...], ...] + for (k = 0; k < msgs.length; k++) { + var m = msgs[k]; + if (m !== null && m !== undefined) { + if (!m._msgid) { + hasMissingIds = true; } + /* istanbul ignore else */ + if (!sentMessageId) { + sentMessageId = m._msgid; + } + sendEvents.push({ + msg: m, + source: { + id: this.id, + node: this, + port: i + }, + destination: { + id: wires[j], + node: undefined + }, + cloneMessage: msgSent + }); + msgSent = true; } } } @@ -428,21 +449,22 @@ Node.prototype.send = function(msg) { } this.metric("send",{_msgid:sentMessageId}); - for (i=0;i{}); + return p; } else if (id === "error") { var err = new Error(); // TODO: quirk of internal api - uses .code for .status @@ -268,7 +270,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { updateFlow: updateFlow } }); @@ -311,7 +313,9 @@ describe("runtime-api/flows", function() { var err = new Error(); // TODO: quirk of internal api - uses .code for .status err.code = 404; - throw err; + var p = Promise.reject(err); + p.catch(()=>{}); + return p; } else if (flow === "error") { var err = new Error(); // TODO: quirk of internal api - uses .code for .status @@ -324,7 +328,7 @@ describe("runtime-api/flows", function() { }); flows.init({ log: mockLog(), - nodes: { + flows: { removeFlow: removeFlow } }); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js similarity index 70% rename from test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js rename to test/unit/@node-red/runtime/lib/flows/Flow_spec.js index e1315dc50..bc27aaaef 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js @@ -22,10 +22,11 @@ var util = require("util"); var NR_TEST_UTILS = require("nr-test-utils"); -var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util"); -var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util"); +var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); +var hooks = NR_TEST_UTILS.require("@node-red/runtime/lib/hooks"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry"); @@ -826,4 +827,371 @@ describe('Flow', function() { }); + describe("#send", function() { + it("sends a message - no cloning", function(done) { + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + var messageReceived = false; + n2.receive = function(msg) { + messageReceived = true; + try { + msg.should.be.exactly(message); + shutdownTest(); + } catch(err) { + shutdownTest(err); + } + } + + var message = {payload:"hello"} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: false + }]) + messageReceived.should.be.false() + }) + it("sends a message - cloning", function(done) { + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + + n2.receive = function(msg) { + try { + // Message should be cloned + msg.should.be.eql(message); + msg.should.not.be.exactly(message); + shutdownTest(); + } catch(err) { + shutdownTest(err); + } + } + + var message = {payload:"hello"} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + }) + it("sends multiple messages", function(done) { + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + + var messageCount = 0; + n2.receive = function(msg) { + try { + msg.should.be.exactly(messages[messageCount++]); + if (messageCount === 2) { + shutdownTest(); + } + } catch(err) { + shutdownTest(err); + } + } + + var messages = [{payload:"hello"},{payload:"world"}]; + + flow.send([{ + msg: messages[0], + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined } + },{ + msg: messages[1], + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined } + }]) + }) + it("sends a message - triggers hooks", function(done) { + var hookErrors = []; + var messageReceived = false; + var hooksCalled = []; + hooks.add("onSend", function(sendEvents) { + hooksCalled.push("onSend") + try { + messageReceived.should.be.false() + sendEvents.should.have.length(1); + sendEvents[0].msg.should.be.exactly(message); + } catch(err) { + hookErrors.push(err); + } + }) + hooks.add("preRoute", function(sendEvent) { + hooksCalled.push("preRoute") + try { + messageReceived.should.be.false() + sendEvent.msg.should.be.exactly(message); + should.not.exist(sendEvent.destination.node) + } catch(err) { + hookErrors.push(err); + } + + }) + hooks.add("preDeliver", function(sendEvent) { + hooksCalled.push("preDeliver") + try { + messageReceived.should.be.false() + // Cloning should have happened + sendEvent.msg.should.not.be.exactly(message); + // Destinatino node populated + should.exist(sendEvent.destination.node) + } catch(err) { + hookErrors.push(err); + } + + }) + hooks.add("postDeliver", function(sendEvent) { + hooksCalled.push("postDeliver") + try { + messageReceived.should.be.false() + + } catch(err) { + hookErrors.push(err); + } + + }) + var shutdownTest = function(err) { + hooks.clear(); + flow.stop().then(() => { done(err) }); + } + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + + Object.keys(flow.getActiveNodes()).should.have.length(2); + + var n1 = flow.getNode('1'); + var n2 = flow.getNode('2'); + n2.receive = function(msg) { + messageReceived = true; + try { + msg.should.be.eql(message); + msg.should.not.be.exactly(message); + hooksCalled.should.eql(["onSend","preRoute","preDeliver","postDeliver"]) + if (hookErrors.length > 0) { + shutdownTest(hookErrors[0]) + } else { + shutdownTest(); + } + } catch(err) { + shutdownTest(err); + } + } + + var message = {payload:"hello"} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + }) + + describe("errors thrown by hooks are reported to the sending node", function() { + var flow; + var n1,n2; + var messageReceived = false; + var errorReceived = null; + before(function() { + hooks.add("onSend", function(sendEvents) { + if (sendEvents[0].msg.payload === "trigger-onSend") { + throw new Error("onSend Error"); + } + }) + hooks.add("preRoute", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preRoute") { + throw new Error("preRoute Error"); + } + }) + hooks.add("preDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preDeliver") { + throw new Error("preDeliver Error"); + } + }) + hooks.add("postDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-postDeliver") { + throw new Error("postDeliver Error"); + } + }) + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + n1 = flow.getNode('1'); + n2 = flow.getNode('2'); + n2.receive = function(msg) { + messageReceived = true; + } + n1.error = function(err) { + errorReceived = err; + } + + }) + after(function(done) { + hooks.clear(); + flow.stop().then(() => { done() }); + }) + beforeEach(function() { + messageReceived = false; + errorReceived = null; + }) + function testHook(hook, msgExpected, done) { + var message = {payload:"trigger-"+hook} + flow.send([{ + msg: message, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + setTimeout(function() { + try { + messageReceived.should.equal(msgExpected); + should.exist(errorReceived) + errorReceived.toString().should.containEql(hook); + done(); + } catch(err) { + done(err); + } + },10) + } + + it("onSend", function(done) { testHook("onSend", false, done) }) + it("preRoute", function(done) { testHook("preRoute", false, done) }) + it("preDeliver", function(done) { testHook("preDeliver", false, done) }) + it("postDeliver", function(done) { testHook("postDeliver", true, done) }) + }) + + describe("hooks can stop the sending of messages", function() { + var flow; + var n1,n2; + var messageReceived = false; + var errorReceived = false; + before(function() { + hooks.add("onSend", function(sendEvents) { + if (sendEvents[0].msg.payload === "trigger-onSend") { + return false + } + }) + hooks.add("preRoute", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preRoute") { + return false + } + }) + hooks.add("preDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-preDeliver") { + return false + } + }) + hooks.add("postDeliver", function(sendEvent) { + if (sendEvent.msg.payload === "trigger-postDeliver") { + return false + } + }) + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + ]); + flow = Flow.create({},config,config.flows["t1"]); + flow.start(); + n1 = flow.getNode('1'); + n2 = flow.getNode('2'); + n2.receive = function(msg) { + messageReceived = true; + } + n1.error = function(err) { + errorReceived = true; + } + + }) + after(function(done) { + hooks.clear(); + flow.stop().then(() => { done() }); + }) + function testSend(payload,messageReceivedExpected,errorReceivedExpected,done) { + messageReceived = false; + errorReceived = false; + flow.send([{ + msg: {payload: payload}, + source: { id:"1", node: n1 }, + destination: { id:"2", node: undefined }, + cloneMessage: true + }]) + setTimeout(function() { + try { + messageReceived.should.eql(messageReceivedExpected) + errorReceived.should.eql(errorReceivedExpected) + done(); + } catch(err) { + done(err); + } + },10) + } + function testHook(hook, done) { + testSend("pass",true,false,err => { + if (err) { + done(err) + } else { + testSend("trigger-"+hook,false,false,done); + } + }) + } + + it("onSend", function(done) { testHook("onSend", done) }) + it("preRoute", function(done) { testHook("preRoute", done) }) + it("preDeliver", function(done) { testHook("preDeliver", done) }) + // postDeliver happens after delivery is scheduled so cannot stop it + // it("postDeliver", function(done) { testHook("postDeliver", done) }) + }) + }) + }); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js rename to test/unit/@node-red/runtime/lib/flows/Subflow_spec.js index 71bd9ad19..a55771391 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js @@ -22,11 +22,11 @@ var util = require("util"); var NR_TEST_UTILS = require("nr-test-utils"); -var Subflow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Subflow"); -var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow"); +var Subflow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Subflow"); +var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow"); -var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flowUtils = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry"); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/index_spec.js b/test/unit/@node-red/runtime/lib/flows/index_spec.js similarity index 97% rename from test/unit/@node-red/runtime/lib/nodes/flows/index_spec.js rename to test/unit/@node-red/runtime/lib/flows/index_spec.js index 8865c5d3f..df5012844 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/index_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/index_spec.js @@ -20,13 +20,13 @@ var when = require("when"); var clone = require("clone"); var NR_TEST_UTILS = require("nr-test-utils"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var RED = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes"); var events = NR_TEST_UTILS.require("@node-red/runtime/lib/events"); var credentials = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/credentials"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry") -var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/Flow"); +var Flow = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/Flow"); describe('flows/index', function() { @@ -209,8 +209,7 @@ describe('flows/index', function() { storage.getFlows = function() { return when.resolve({flows:originalConfig}); } - - events.once('nodes-started',function() { + events.once('flows:started',function() { flows.setFlows(newConfig,"nodes").then(function() { flows.getFlows().flows.should.eql(newConfig); flowCreate.flows['t1'].update.called.should.be.true(); @@ -239,7 +238,7 @@ describe('flows/index', function() { return when.resolve({flows:originalConfig}); } - events.once('nodes-started',function() { + events.once('flows:started',function() { flows.setFlows(newConfig,"nodes").then(function() { flows.getFlows().flows.should.eql(newConfig); flowCreate.flows['t1'].update.called.should.be.true(); @@ -301,7 +300,7 @@ describe('flows/index', function() { return when.resolve({flows:originalConfig}); } - events.once('nodes-started',function() { + events.once('flows:started',function() { Object.keys(flowCreate.flows).should.eql(['_GLOBAL_','t1']); done(); }); @@ -398,7 +397,7 @@ describe('flows/index', function() { // return when.resolve({flows:originalConfig}); // } // - // events.once('nodes-started',function() { + // events.once('flows:started',function() { // flows.handleError(originalConfig[0],"message",{}); // flowCreate.flows['t1'].handleError.called.should.be.true(); // done(); @@ -423,7 +422,7 @@ describe('flows/index', function() { // return when.resolve({flows:originalConfig}); // } // - // events.once('nodes-started',function() { + // events.once('flows:started',function() { // flows.handleError(originalConfig[0],"message",{}); // try { // flowCreate.flows['t1'].handleError.called.should.be.true(); @@ -451,7 +450,7 @@ describe('flows/index', function() { // return when.resolve({flows:originalConfig}); // } // - // events.once('nodes-started',function() { + // events.once('flows:started',function() { // flows.handleStatus(originalConfig[0],"message"); // flowCreate.flows['t1'].handleStatus.called.should.be.true(); // done(); @@ -477,7 +476,7 @@ describe('flows/index', function() { // return when.resolve({flows:originalConfig}); // } // - // events.once('nodes-started',function() { + // events.once('flows:started',function() { // flows.handleStatus(originalConfig[0],"message"); // try { // flowCreate.flows['t1'].handleStatus.called.should.be.true(); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/util_spec.js b/test/unit/@node-red/runtime/lib/flows/util_spec.js similarity index 99% rename from test/unit/@node-red/runtime/lib/nodes/flows/util_spec.js rename to test/unit/@node-red/runtime/lib/flows/util_spec.js index b83a37660..8e1f15754 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/util_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/util_spec.js @@ -19,7 +19,7 @@ var sinon = require("sinon"); var when = require("when"); var clone = require("clone"); var NR_TEST_UTILS = require("nr-test-utils"); -var flowUtil = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows/util"); +var flowUtil = NR_TEST_UTILS.require("@node-red/runtime/lib/flows/util"); var typeRegistry = NR_TEST_UTILS.require("@node-red/registry"); var redUtil = NR_TEST_UTILS.require("@node-red/util").util; diff --git a/test/unit/@node-red/runtime/lib/hooks_spec.js b/test/unit/@node-red/runtime/lib/hooks_spec.js new file mode 100644 index 000000000..4fb95b68a --- /dev/null +++ b/test/unit/@node-red/runtime/lib/hooks_spec.js @@ -0,0 +1,252 @@ +const should = require("should"); +const NR_TEST_UTILS = require("nr-test-utils"); + +const hooks = NR_TEST_UTILS.require("@node-red/runtime/lib/hooks"); + +describe("runtime/hooks", function() { + afterEach(function() { + hooks.clear(); + }) + it("allows a hook to be registered", function(done) { + let calledWith = null; + hooks.has("onSend").should.be.false(); + hooks.add("onSend", function(payload) { calledWith = payload } ) + hooks.has("onSend").should.be.true(); + let data = { a: 1 }; + hooks.trigger("onSend",data,err => { + calledWith.should.equal(data); + done(err); + }) + }) + it("rejects invalid hook id", function(done) { + try { + hooks.add("foo", function(payload) {}) + done(new Error("Invalid hook accepted")) + } catch(err) { + done(); + } + }) + it("calls hooks in the order they were registered", function(done) { + hooks.add("onSend", function(payload) { payload.order.push("A") } ) + hooks.add("onSend", function(payload) { payload.order.push("B") } ) + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql(["A","B"]) + done(err); + }) + }) + + it("does not allow multiple hooks with same id.label", function() { + hooks.has("onSend.one").should.be.false(); + hooks.has("onSend").should.be.false(); + hooks.add("onSend.one", function(payload) { payload.order.push("A") } ); + hooks.has("onSend.one").should.be.true(); + hooks.has("onSend").should.be.true(); + (function() { + hooks.add("onSend.one", function(payload) { payload.order.push("B") } ) + }).should.throw(); + }) + + it("removes labelled hook", function(done) { + hooks.has("onSend.A").should.be.false(); + hooks.has("onSend.B").should.be.false(); + hooks.has("onSend").should.be.false(); + + hooks.add("onSend.A", function(payload) { payload.order.push("A") } ) + + hooks.has("onSend.A").should.be.true(); + hooks.has("onSend.B").should.be.false(); + hooks.has("onSend").should.be.true(); + + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + hooks.has("onSend.A").should.be.true(); + hooks.has("onSend.B").should.be.true(); + hooks.has("onSend").should.be.true(); + + hooks.remove("onSend.A"); + + hooks.has("onSend.A").should.be.false(); + hooks.has("onSend.B").should.be.true(); + hooks.has("onSend").should.be.true(); + + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + try { + data.order.should.eql(["B"]) + + hooks.remove("onSend.B"); + + hooks.has("onSend.A").should.be.false(); + hooks.has("onSend.B").should.be.false(); + hooks.has("onSend").should.be.false(); + + done(err); + } catch(err2) { + done(err2); + } + }) + }) + + it("cannot remove unlabelled hook", function() { + hooks.add("onSend", function(payload) { payload.order.push("A") } ); + (function() { + hooks.remove("onSend") + }).should.throw(); + }) + it("removes all hooks with same label", function(done) { + hooks.add("onSend.A", function(payload) { payload.order.push("A") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + hooks.add("preRoute.A", function(payload) { payload.order.push("C") } ) + hooks.add("preRoute.B", function(payload) { payload.order.push("D") } ) + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql(["A","B"]) + hooks.trigger("preRoute", data, err => { + data.order.should.eql(["A","B","C","D"]) + + data.order = []; + + hooks.remove("*.A"); + + hooks.trigger("onSend",data,err => { + data.order.should.eql(["B"]) + hooks.trigger("preRoute", data, err => { + data.order.should.eql(["B","D"]) + }) + done(err); + }) + }) + }) + }) + + + it("halts execution on return false", function(done) { + hooks.add("onSend.A", function(payload) { payload.order.push("A"); return false } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql(["A"]) + err.should.be.false(); + done(); + }) + }) + it("halts execution on thrown error", function(done) { + hooks.add("onSend.A", function(payload) { payload.order.push("A"); throw new Error("error") } ) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql(["A"]) + should.exist(err); + err.should.not.be.false() + done(); + }) + }) + + it("handler can use callback function", function(done) { + hooks.add("onSend.A", function(payload, done) { + setTimeout(function() { + payload.order.push("A") + done() + },30) + }) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql(["A","B"]) + done(err); + }) + }) + + it("handler can use callback function - halt execution", function(done) { + hooks.add("onSend.A", function(payload, done) { + setTimeout(function() { + payload.order.push("A") + done(false) + },30) + }) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql(["A"]) + err.should.be.false() + done(); + }) + }) + it("handler can use callback function - halt on error", function(done) { + hooks.add("onSend.A", function(payload, done) { + setTimeout(function() { + done(new Error("test error")) + },30) + }) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql([]) + should.exist(err); + err.should.not.be.false() + done(); + }) + }) + + it("handler be an async function", function(done) { + hooks.add("onSend.A", async function(payload) { + return new Promise(resolve => { + setTimeout(function() { + payload.order.push("A") + resolve() + },30) + }); + }) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql(["A","B"]) + done(err); + }) + }) + + it("handler be an async function - halt execution", function(done) { + hooks.add("onSend.A", async function(payload) { + return new Promise(resolve => { + setTimeout(function() { + payload.order.push("A") + resolve(false) + },30) + }); + }) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql(["A"]) + done(err); + }) + }) + it("handler be an async function - halt on error", function(done) { + hooks.add("onSend.A", async function(payload) { + return new Promise((resolve,reject) => { + setTimeout(function() { + reject(new Error("test error")) + },30) + }); + }) + hooks.add("onSend.B", function(payload) { payload.order.push("B") } ) + + let data = { order:[] }; + hooks.trigger("onSend",data,err => { + data.order.should.eql([]) + should.exist(err); + err.should.not.be.false() + done(); + }) + }) +}); diff --git a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js index 61b4446f7..df362dc85 100644 --- a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js @@ -19,7 +19,8 @@ var sinon = require('sinon'); var NR_TEST_UTILS = require("nr-test-utils"); var RedNode = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node"); var Log = NR_TEST_UTILS.require("@node-red/util").log; -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var hooks = NR_TEST_UTILS.require("@node-red/runtime/lib/hooks"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); describe('Node', function() { describe('#constructor',function() { @@ -181,6 +182,93 @@ describe('Node', function() { }); n.receive(message); }); + + it('triggers onComplete hook when done callback provided', function(done) { + var handleCompleteCalled = false; + var hookCalled = false; + var n = new RedNode({id:'123',type:'abc', _flow: { + handleComplete: function(node,msg) { + handleCompleteCalled = true; + } + }}); + var hookError; + hooks.add("onComplete",function(completeEvent) { + hookCalled = true; + try { + handleCompleteCalled.should.be.false("onComplete should be called before handleComplete") + should.not.exist(completeEvent.error); + completeEvent.msg.should.deepEqual(message); + completeEvent.node.id.should.eql("123"); + completeEvent.node.node.should.equal(n); + } catch(err) { + hookError = err; + } + }) + var message = {payload:"hello world"}; + n.on('input',function(msg, nodeSend, nodeDone) { + nodeDone(); + }); + n.receive(message); + setTimeout(function() { + if (hookError) { + done(hookError); + return + } + try { + hookCalled.should.be.true("onComplete hook should be called"); + handleCompleteCalled.should.be.true("handleComplete should be called"); + done(); + } catch(err) { + done(err); + } + }) + }); + + it('triggers onComplete hook when done callback provided - with error', function(done) { + var handleCompleteCalled = false; + var hookCalled = false; + var errorReported = false; + var n = new RedNode({id:'123',type:'abc', _flow: { + handleComplete: function(node,msg) { + handleCompleteCalled = true; + } + }}); + var hookError; + hooks.add("onComplete",function(completeEvent) { + hookCalled = true; + try { + handleCompleteCalled.should.be.false("onComplete should be called before handleComplete") + should.exist(completeEvent.error); + completeEvent.error.toString().should.equal("Error: test error") + completeEvent.msg.should.deepEqual(message); + completeEvent.node.id.should.eql("123"); + completeEvent.node.node.should.equal(n); + } catch(err) { + hookError = err; + } + }) + var message = {payload:"hello world"}; + n.on('input',function(msg, nodeSend, nodeDone) { + nodeDone(new Error("test error")); + }); + n.error = function(err,msg) { + errorReported = true; + } + n.receive(message); + setTimeout(function() { + if (hookError) { + done(hookError); + return + } + try { + hookCalled.should.be.true("onComplete hook should be called"); + handleCompleteCalled.should.be.false("handleComplete should not be called"); + done(); + } catch(err) { + done(err); + } + }) + }); it('logs error if callback provides error', function(done) { var n = new RedNode({id:'123',type:'abc'}); sinon.stub(n,"error",function(err,msg) {}); @@ -201,151 +289,256 @@ describe('Node', function() { }); n.receive(message); }); + it("triggers hooks when receiving a message", function(done) { + var hookErrors = []; + var messageReceived = false; + var hooksCalled = []; + hooks.add("onReceive", function(receiveEvent) { + hooksCalled.push("onReceive") + try { + messageReceived.should.be.false("Message should not have been received before onReceive") + receiveEvent.msg.should.be.exactly(message); + receiveEvent.destination.id.should.equal("123") + receiveEvent.destination.node.should.equal(n) + } catch(err) { + hookErrors.push(err); + } + }) + hooks.add("postReceive", function(receiveEvent) { + hooksCalled.push("postReceive") + try { + messageReceived.should.be.true("Message should have been received before postReceive") + receiveEvent.msg.should.be.exactly(message); + receiveEvent.destination.id.should.equal("123") + receiveEvent.destination.node.should.equal(n) + } catch(err) { + hookErrors.push(err); + } + + }) + var n = new RedNode({id:'123',type:'abc'}); + var message = {payload:"hello world"}; + n.on('input',function(msg) { + messageReceived = true; + try { + should.strictEqual(this,n); + hooksCalled.should.eql(["onReceive"]) + should.deepEqual(msg,message); + } catch(err) { + hookErrors.push(err) + } + }); + n.receive(message); + setTimeout(function() { + hooks.clear(); + if (hookErrors.length > 0) { + done(hookErrors[0]) + } else { + done(); + } + },10); + }); + describe("errors thrown by hooks are reported", function() { + before(function() { + hooks.add("onReceive",function(recEvent) { + if (recEvent.msg.payload === "trigger-onReceive") { + throw new Error("onReceive Error") + } + }) + hooks.add("postReceive",function(recEvent) { + if (recEvent.msg.payload === "trigger-postReceive") { + throw new Error("postReceive Error") + } + }) + }) + after(function() { + hooks.clear(); + }) + function testHook(hook, msgExpected, done) { + var messageReceived = false; + var errorReceived; + var n = new RedNode({id:'123',type:'abc'}); + var message = {payload:"trigger-"+hook}; + n.on('input',function(msg) { + messageReceived = true; + }); + n.error = function (err) { + errorReceived = err; + } + + n.receive(message); + + setTimeout(function() { + try { + messageReceived.should.equal(msgExpected,`Hook ${hook} messageReceived expected ${msgExpected} actual ${messageReceived}`); + should.exist(errorReceived); + errorReceived.toString().should.containEql(hook) + done() + } catch(err) { + done(err); + } + },10); + } + it("onReceive", function(done) { testHook("onReceive", false, done)}) + it("postReceive", function(done) { testHook("postReceive", true, done)}) + }) }); + describe("hooks can halt receive", function() { + before(function() { + hooks.add("onReceive",function(recEvent) { + if (recEvent.msg.payload === "trigger-onReceive") { + return false; + } + }) + }) + after(function() { + hooks.clear(); + }) + + function testHook(hook, msgExpected, done) { + var messageReceived = false; + var errorReceived; + var n = new RedNode({id:'123',type:'abc'}); + var message = {payload:"trigger-"+hook}; + n.on('input',function(msg) { + messageReceived = true; + }); + n.error = function (err) { + errorReceived = err; + } + + n.receive(message); + + setTimeout(function() { + try { + messageReceived.should.equal(msgExpected,`Hook ${hook} messageReceived expected ${msgExpected} actual ${messageReceived}`); + should.not.exist(errorReceived); + done() + } catch(err) { + done(err); + } + },10); + } + it("onReceive", function(done) { testHook("onReceive", false, done)}) + }) + + describe('#send', function() { it('emits a single message', function(done) { var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + send: (sendEvents) => { + try { + sendEvents.should.have.length(1); + sendEvents[0].msg.should.equal(message); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}) + done(); + } catch(err) { + done(err); + } + }, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var message = {payload:"hello world"}; - var messageReceived = false; - n2.on('input',function(msg) { - // msg equals message, and is not a new copy - messageReceived = true; - should.strictEqual(this,n2); - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(); - }); n1.send(message); - messageReceived.should.be.false(); - }); - - it('emits a single message - multiple input event listeners', function(done) { - var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var message = {payload:"hello world"}; - var messageReceived = 0; - n2.on('input',function(msg) { - // msg equals message, and is not a new copy - messageReceived++; - messageReceived.should.be.exactly(1); - should.strictEqual(this,n2); - should.deepEqual(msg,message); - should.strictEqual(msg,message); - }); - n2.on('input',function(msg) { - messageReceived++; - messageReceived.should.be.exactly(2); - should.strictEqual(this,n2); - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(); - }); - n1.send(message); - messageReceived.should.be.exactly(0); - }); - - it('emits a single message - synchronous mode', function(done) { - var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - asyncMessageDelivery: false - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var message = {payload:"hello world"}; - var messageReceived = false; - var notSyncErr; - n2.on('input',function(msg) { - try { - // msg equals message, and is not a new copy - messageReceived = true; - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(notSyncErr); - } catch(err) { - done(err); - } - }); - n1.send(message); - try { - messageReceived.should.be.true(); - } catch(err) { - notSyncErr = err; - } }); it('emits a message with callback provided send', function(done) { var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - handleComplete: (node,msg) => {} + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, + handleComplete: (node,msg) => {}, + send: (sendEvents) => { + try { + sendEvents.should.have.length(1); + sendEvents[0].msg.should.equal(message); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[0].cloneMessage.should.be.false(); + done(); + } catch(err) { + done(err); + } + }, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var message = {payload:"hello world"}; - var messageReceived = false; n1.on('input',function(msg,nodeSend,nodeDone) { nodeSend(msg); nodeDone(); }); - n2.on('input',function(msg) { - // msg equals message, and is not a new copy - messageReceived = true; - should.deepEqual(msg,message); - should.strictEqual(msg,message); - done(); - }); n1.receive(message); - messageReceived.should.be.false(); }); it('emits multiple messages on a single output', function(done) { var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, + send: (sendEvents) => { + try { + sendEvents.should.have.length(2); + sendEvents[0].msg.should.equal(messages[0]); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[0].cloneMessage.should.be.false(); + + sendEvents[1].msg.should.equal(messages[1]); + sendEvents[1].destination.should.eql({id:"n2", node: undefined}); + sendEvents[1].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[1].cloneMessage.should.be.true(); + + done(); + } catch(err) { + done(err); + } + }, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var messages = [ {payload:"hello world"}, {payload:"hello world again"} ]; - var rcvdCount = 0; - - n2.on('input',function(msg) { - if (rcvdCount === 0) { - // first msg sent, don't clone - should.deepEqual(msg,messages[rcvdCount]); - should.strictEqual(msg,messages[rcvdCount]); - rcvdCount += 1; - } else { - // second msg sent, clone - msg.payload.should.equal(messages[rcvdCount].payload); - should.notStrictEqual(msg,messages[rcvdCount]); - done(); - } - }); n1.send([messages]); }); it('emits messages to multiple outputs', function(done) { var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3,'n4':n4,'n5':n5}[id]}, + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, + send: (sendEvents) => { + try { + sendEvents.should.have.length(3); + sendEvents[0].msg.should.equal(messages[0]); + sendEvents[0].destination.should.eql({id:"n2", node: undefined}); + sendEvents[0].source.should.eql({id:"n1", node: n1, port: 0}); + sendEvents[0].cloneMessage.should.be.false(); + should.exist(sendEvents[0].msg._msgid); + sendEvents[1].msg.should.equal(messages[2]); + sendEvents[1].destination.should.eql({id:"n4", node: undefined}); + sendEvents[1].source.should.eql({id:"n1", node: n1, port: 2}) + sendEvents[1].cloneMessage.should.be.true(); + should.exist(sendEvents[1].msg._msgid); + sendEvents[2].msg.should.equal(messages[2]); + sendEvents[2].destination.should.eql({id:"n5", node: undefined}); + sendEvents[2].source.should.eql({id:"n1", node: n1, port: 2}) + sendEvents[2].cloneMessage.should.be.true(); + should.exist(sendEvents[2].msg._msgid); + + sendEvents[0].msg._msgid.should.eql(sendEvents[1].msg._msgid) + sendEvents[1].msg._msgid.should.eql(sendEvents[2].msg._msgid) + + done(); + } catch(err) { + done(err); + } + } }; var n1 = new RedNode({_flow:flow, id:'n1',type:'abc',wires:[['n2'],['n3'],['n4','n5']]}); var n2 = new RedNode({_flow:flow, id:'n2',type:'abc'}); var n3 = new RedNode({_flow:flow, id:'n3',type:'abc'}); var n4 = new RedNode({_flow:flow, id:'n4',type:'abc'}); var n5 = new RedNode({_flow:flow, id:'n5',type:'abc'}); - var messages = [ {payload:"hello world"}, null, @@ -354,48 +547,12 @@ describe('Node', function() { var rcvdCount = 0; - // first message sent, don't clone - // message uuids should match - n2.on('input',function(msg) { - should.deepEqual(msg,messages[0]); - should.strictEqual(msg,messages[0]); - rcvdCount += 1; - if (rcvdCount == 3) { - done(); - } - }); - - n3.on('input',function(msg) { - should.fail(null,null,"unexpected message"); - }); - - // second message sent, clone - // message uuids wont match since we've cloned - n4.on('input',function(msg) { - msg.payload.should.equal(messages[2].payload); - should.notStrictEqual(msg,messages[2]); - rcvdCount += 1; - if (rcvdCount == 3) { - done(); - } - }); - - // third message sent, clone - // message uuids wont match since we've cloned - n5.on('input',function(msg) { - msg.payload.should.equal(messages[2].payload); - should.notStrictEqual(msg,messages[2]); - rcvdCount += 1; - if (rcvdCount == 3) { - done(); - } - }); - n1.send(messages); }); it('emits no messages', function(done) { var flow = { + handleError: (node,logMessage,msg,reportingNode) => {done(logMessage)}, getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, }; var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); @@ -412,105 +569,85 @@ describe('Node', function() { n1.send(); }); - it('emits messages ignoring non-existent nodes', function(done) { - var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n9'],['n2']]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + // it('emits messages without cloning req or res', function(done) { + // var flow = { + // getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3}[id]}, + // send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} + // }; + // var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[[['n2'],['n3']]]}); + // var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + // var n3 = new RedNode({_flow:flow,id:'n3',type:'abc'}); + // + // var req = {}; + // var res = {}; + // var cloned = {}; + // var message = {payload: "foo", cloned: cloned, req: req, res: res}; + // + // var rcvdCount = 0; + // + // // first message to be sent, so should not be cloned + // n2.on('input',function(msg) { + // should.deepEqual(msg, message); + // msg.cloned.should.be.exactly(message.cloned); + // msg.req.should.be.exactly(message.req); + // msg.res.should.be.exactly(message.res); + // rcvdCount += 1; + // if (rcvdCount == 2) { + // done(); + // } + // }); + // + // // second message to be sent, so should be cloned + // // message uuids wont match since we've cloned + // n3.on('input',function(msg) { + // msg.payload.should.equal(message.payload); + // msg.cloned.should.not.be.exactly(message.cloned); + // msg.req.should.be.exactly(message.req); + // msg.res.should.be.exactly(message.res); + // rcvdCount += 1; + // if (rcvdCount == 2) { + // done(); + // } + // }); + // + // n1.send(message); + // }); - var messages = [ - {payload:"hello world"}, - {payload:"hello world again"} - ]; - - // only one message sent, so no copy needed - n2.on('input',function(msg) { - should.deepEqual(msg,messages[1]); - should.strictEqual(msg,messages[1]); - done(); - }); - - n1.send(messages); - }); - - it('emits messages without cloning req or res', function(done) { - var flow = { - getNode: (id) => { return {'n1':n1,'n2':n2,'n3':n3}[id]}, - }; - var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[[['n2'],['n3']]]}); - var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var n3 = new RedNode({_flow:flow,id:'n3',type:'abc'}); - - var req = {}; - var res = {}; - var cloned = {}; - var message = {payload: "foo", cloned: cloned, req: req, res: res}; - - var rcvdCount = 0; - - // first message to be sent, so should not be cloned - n2.on('input',function(msg) { - should.deepEqual(msg, message); - msg.cloned.should.be.exactly(message.cloned); - msg.req.should.be.exactly(message.req); - msg.res.should.be.exactly(message.res); - rcvdCount += 1; - if (rcvdCount == 2) { - done(); - } - }); - - // second message to be sent, so should be cloned - // message uuids wont match since we've cloned - n3.on('input',function(msg) { - msg.payload.should.equal(message.payload); - msg.cloned.should.not.be.exactly(message.cloned); - msg.req.should.be.exactly(message.req); - msg.res.should.be.exactly(message.res); - rcvdCount += 1; - if (rcvdCount == 2) { - done(); - } - }); - - n1.send(message); - }); - - it("logs the uuid for all messages sent", function(done) { - var logHandler = { - msgIds:[], - messagesSent: 0, - emit: function(event, msg) { - if (msg.event == "node.abc.send" && msg.level == Log.METRIC) { - this.messagesSent++; - this.msgIds.push(msg.msgid); - (typeof msg.msgid).should.not.be.equal("undefined"); - } - } - }; - - Log.addHandler(logHandler); - var flow = { - getNode: (id) => { return {'n1':sender,'n2':receiver1,'n3':receiver2}[id]}, - }; - - var sender = new RedNode({_flow:flow,id:'n1',type:'abc', wires:[['n2', 'n3']]}); - var receiver1 = new RedNode({_flow:flow,id:'n2',type:'abc'}); - var receiver2 = new RedNode({_flow:flow,id:'n3',type:'abc'}); - sender.send({"some": "message"}); - setTimeout(function() { - try { - logHandler.messagesSent.should.equal(1); - should.exist(logHandler.msgIds[0]) - Log.removeHandler(logHandler); - done(); - } catch(err) { - Log.removeHandler(logHandler); - done(err); - } - },50) - }) + // it("logs the uuid for all messages sent", function(done) { + // var logHandler = { + // msgIds:[], + // messagesSent: 0, + // emit: function(event, msg) { + // if (msg.event == "node.abc.send" && msg.level == Log.METRIC) { + // this.messagesSent++; + // this.msgIds.push(msg.msgid); + // (typeof msg.msgid).should.not.be.equal("undefined"); + // } + // } + // }; + // + // Log.addHandler(logHandler); + // var flow = { + // getNode: (id) => { return {'n1':sender,'n2':receiver1,'n3':receiver2}[id]}, + // send: (node,dst,msg) => { setImmediate(function() { flow.getNode(dst).receive(msg) })} + // }; + // + // var sender = new RedNode({_flow:flow,id:'n1',type:'abc', wires:[['n2', 'n3']]}); + // var receiver1 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + // var receiver2 = new RedNode({_flow:flow,id:'n3',type:'abc'}); + // sender.send({"some": "message"}); + // setTimeout(function() { + // try { + // logHandler.messagesSent.should.equal(1); + // should.exist(logHandler.msgIds[0]) + // Log.removeHandler(logHandler); + // done(); + // } catch(err) { + // Log.removeHandler(logHandler); + // done(err); + // } + // },50) + // }) }); diff --git a/test/unit/@node-red/runtime/lib/nodes/index_spec.js b/test/unit/@node-red/runtime/lib/nodes/index_spec.js index 66958196a..7a4992624 100644 --- a/test/unit/@node-red/runtime/lib/nodes/index_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/index_spec.js @@ -23,7 +23,7 @@ var inherits = require("util").inherits; var NR_TEST_UTILS = require("nr-test-utils"); var index = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/index"); -var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/flows"); +var flows = NR_TEST_UTILS.require("@node-red/runtime/lib/flows"); var registry = NR_TEST_UTILS.require("@node-red/registry") var Node = NR_TEST_UTILS.require("@node-red/runtime/lib/nodes/Node");