Split comms across api and runtime

This commit is contained in:
Nick O'Leary
2018-04-23 11:21:02 +01:00
parent efc3cc24f4
commit f94a36613c
8 changed files with 307 additions and 184 deletions

View File

@@ -15,5 +15,115 @@
**/
/**
* @module red/comms
* @namespace RED.comms
*/
/**
* @typedef CommsConnection
* @type {object}
* @property {string} session - a unique session identifier
* @property {Object} user - the user associated with the connection
* @property {Function} send - publish a message to the connection
*/
var runtime;
var retained = {};
var connections = [];
function handleCommsEvent(event) {
publish(event.topic,event.data,event.retain);
}
function handleStatusEvent(event) {
publish("status/"+event.id,event.status,true);
}
function handleRuntimeEvent(event) {
runtime.log.trace("runtime event: "+JSON.stringify(event));
publish("notification/"+event.id,event.payload||{},event.retain);
}
function publish(topic,data,retain) {
if (retain) {
retained[topic] = data;
} else {
delete retained[topic];
}
connections.forEach(connection => connection.send(topic,data,retain))
}
var api = module.exports = {
init: function(_runtime) {
runtime = _runtime;
runtime.events.removeListener("node-status",handleStatusEvent);
runtime.events.on("node-status",handleStatusEvent);
runtime.events.removeListener("runtime-event",handleRuntimeEvent);
runtime.events.on("runtime-event",handleRuntimeEvent);
runtime.events.removeListener("comms",handleCommsEvent);
runtime.events.on("comms",handleCommsEvent);
},
/**
* Registers a new comms connection
* @param {Object} opts
* @param {User} opts.user - the user calling the api
* @param {CommsConnection} opts.client - the client connection
* @return {Promise<Object>} - resolves when complete
* @memberof RED.comms
*/
addConnection: function(opts) {
connections.push(opts.client);
return Promise.resolve();
},
/**
* Unregisters a comms connection
* @param {Object} opts
* @param {User} opts.user - the user calling the api
* @param {CommsConnection} opts.client - the client connection
* @return {Promise<Object>} - resolves when complete
* @memberof RED.comms
*/
removeConnection: function(opts) {
for (var i=0;i<connections.length;i++) {
if (connections[i] === opts.client) {
connections.splice(i,1);
break;
}
}
return Promise.resolve();
},
/**
* Subscribes a comms connection to a given topic. Currently, all clients get
* automatically subscribed to everything and cannot unsubscribe. Sending a subscribe
* request will trigger retained messages to be sent.
* @param {Object} opts
* @param {User} opts.user - the user calling the api
* @param {CommsConnection} opts.client - the client connection
* @param {String} opts.topic - the topic to subscribe to
* @return {Promise<Object>} - resolves when complete
* @memberof RED.comms
*/
subscribe: function(opts) {
var re = new RegExp("^"+opts.topic.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
for (var t in retained) {
if (re.test(t)) {
opts.client.send(t,retained[t]);
}
}
return Promise.resolve();
},
/**
* TODO: Unsubscribes a comms connection from a given topic
* @param {Object} opts
* @param {User} opts.user - the user calling the api
* @param {CommsConnection} opts.client - the client connection
* @param {String} opts.topic - the topic to unsubscribe from
* @return {Promise<Object>} - resolves when complete
* @memberof RED.comms
*/
unsubscribe: function(opts) {}
};

View File

@@ -20,7 +20,6 @@
/**
* @typedef Flows
* @alias Dave
* @type {object}
* @property {string} rev - the flow revision identifier
* @property {Array} flows - the flow configuration, an array of node configuration objects

View File

@@ -14,12 +14,11 @@
* limitations under the License.
**/
/**
* A user accessing the API
* @typedef User
* @type {object}
*/
/**
* A user accessing the API
* @typedef User
* @type {object}
*/
var runtime;
/**
@@ -28,6 +27,7 @@ var runtime;
var api = module.exports = {
init: function(_runtime, redUtil) {
runtime = _runtime;
api.comms.init(runtime);
api.flows.init(runtime);
api.nodes.init(runtime);
api.settings.init(runtime);
@@ -42,7 +42,24 @@ var api = module.exports = {
settings: require("./settings"),
projects: require("./projects"),
/**
* Returns whether the runtime is started
* @param {Object} opts
* @param {User} opts.user - the user calling the api
* @return {Promise<Boolean>} - whether the runtime is started
* @memberof RED
*/
isStarted: function(opts) {
return Promise.resolve(runtime.isStarted());
},
/**
* Returns version number of the runtime
* @param {Object} opts
* @param {User} opts.user - the user calling the api
* @return {Promise<String>} - the runtime version number
* @memberof RED
*/
version: function(opts) {
return Promise.resolve(runtime.version());
}