node-red/packages/node_modules/@node-red/editor-api/lib/editor/comms.js

253 lines
8.0 KiB
JavaScript
Raw Normal View History

2014-05-07 21:47:25 +02:00
/**
* Copyright JS Foundation and other contributors, http://js.foundation
2014-05-07 21:47:25 +02:00
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var ws = require("ws");
2018-04-23 12:21:02 +02:00
2018-08-04 23:23:06 +02:00
var log = require("@node-red/util").log; // TODO: separate module
2018-04-23 12:21:02 +02:00
var Tokens;
var Users;
var Permissions;
2014-05-07 21:47:25 +02:00
var server;
var settings;
2018-04-23 12:21:02 +02:00
var runtimeAPI;
2014-05-07 21:47:25 +02:00
var wsServer;
var activeConnections = [];
2018-04-24 16:01:49 +02:00
var anonymousUser;
2014-05-08 15:15:54 +02:00
var retained = {};
2014-05-07 21:47:25 +02:00
var heartbeatTimer;
var lastSentTime;
2018-04-23 12:21:02 +02:00
function init(_server,_settings,_runtimeAPI) {
server = _server;
settings = _settings;
runtimeAPI = _runtimeAPI;
Tokens = require("../auth/tokens");
Tokens.onSessionExpiry(handleSessionExpiry);
2018-04-23 12:21:02 +02:00
Users = require("../auth/users");
Permissions = require("../auth/permissions");
}
function handleSessionExpiry(session) {
activeConnections.forEach(connection => {
if (connection.token === session.accessToken) {
connection.ws.send(JSON.stringify({auth:"fail"}));
connection.ws.close();
}
})
}
2018-04-23 12:21:02 +02:00
function generateSession(length) {
var c = "ABCDEFGHIJKLMNOPQRSTUZWXYZabcdefghijklmnopqrstuvwxyz1234567890";
var token = [];
for (var i=0;i<length;i++) {
token.push(c[Math.floor(Math.random()*c.length)]);
}
return token.join("");
}
2018-04-23 12:21:02 +02:00
function CommsConnection(ws) {
this.session = generateSession(32);
this.ws = ws;
this.stack = [];
this.user = null;
this.lastSentTime = 0;
var self = this;
log.audit({event: "comms.open"});
log.trace("comms.open "+self.session);
var pendingAuth = (settings.adminAuth != null);
2018-04-23 12:21:02 +02:00
if (!pendingAuth) {
addActiveConnection(self);
}
ws.on('close',function() {
log.audit({event: "comms.close",user:self.user, session: self.session});
log.trace("comms.close "+self.session);
removeActiveConnection(self);
});
ws.on('message', function(data,flags) {
var msg = null;
try {
msg = JSON.parse(data);
} catch(err) {
log.trace("comms received malformed message : "+err.toString());
return;
}
if (!pendingAuth) {
if (msg.subscribe) {
self.subscribe(msg.subscribe);
// handleRemoteSubscription(ws,msg.subscribe);
}
} else {
var completeConnection = function(userScope,session,sendAck) {
2018-04-23 12:21:02 +02:00
try {
if (!userScope || !Permissions.hasPermission(userScope,"status.read")) {
ws.send(JSON.stringify({auth:"fail"}));
ws.close();
} else {
pendingAuth = false;
addActiveConnection(self);
self.token = msg.auth;
2018-04-23 12:21:02 +02:00
if (sendAck) {
ws.send(JSON.stringify({auth:"ok"}));
}
}
} catch(err) {
console.log(err.stack);
// Just in case the socket closes before we attempt
// to send anything.
}
}
if (msg.auth) {
Tokens.get(msg.auth).then(function(client) {
if (client) {
Users.get(client.user).then(function(user) {
if (user) {
self.user = user;
log.audit({event: "comms.auth",user:self.user});
completeConnection(client.scope,msg.auth,true);
2018-04-23 12:21:02 +02:00
} else {
log.audit({event: "comms.auth.fail"});
completeConnection(null,null,false);
2018-04-23 12:21:02 +02:00
}
});
} else {
log.audit({event: "comms.auth.fail"});
completeConnection(null,null,false);
2018-04-23 12:21:02 +02:00
}
});
} else {
if (anonymousUser) {
log.audit({event: "comms.auth",user:anonymousUser});
self.user = anonymousUser;
completeConnection(anonymousUser.permissions,null,false);
2018-04-23 12:21:02 +02:00
//TODO: duplicated code - pull non-auth message handling out
if (msg.subscribe) {
self.subscribe(msg.subscribe);
}
} else {
log.audit({event: "comms.auth.fail"});
completeConnection(null,null,false);
2018-04-23 12:21:02 +02:00
}
}
}
});
ws.on('error', function(err) {
log.warn(log._("comms.error",{message:err.toString()}));
});
}
CommsConnection.prototype.send = function(topic,data) {
var self = this;
if (topic && data) {
this.stack.push({topic:topic,data:data});
}
if (!this._xmitTimer) {
this._xmitTimer = setTimeout(function() {
try {
self.ws.send(JSON.stringify(self.stack));
self.lastSentTime = Date.now();
} catch(err) {
removeActiveConnection(self);
log.warn(log._("comms.error-send",{message:err.toString()}));
}
delete self._xmitTimer;
self.stack = [];
},50);
}
}
CommsConnection.prototype.subscribe = function(topic) {
runtimeAPI.comms.subscribe({
user: this.user,
client: this,
topic: topic
})
2014-05-07 21:47:25 +02:00
}
2014-12-10 15:16:07 +01:00
function start() {
2014-09-22 15:33:26 +02:00
if (!settings.disableEditor) {
2018-04-24 16:01:49 +02:00
Users.default().then(function(_anonymousUser) {
anonymousUser = _anonymousUser;
2014-12-10 15:16:07 +01:00
var webSocketKeepAliveTime = settings.webSocketKeepAliveTime || 15000;
var path = settings.httpAdminRoot || "/";
2015-03-21 18:42:06 +01:00
path = (path.slice(0,1) != "/" ? "/":"") + path + (path.slice(-1) == "/" ? "":"/") + "comms";
wsServer = new ws.Server({
server:server,
path:path,
// Disable the deflate option due to this issue
// https://github.com/websockets/ws/pull/632
// that is fixed in the 1.x release of the ws module
// that we cannot currently pickup as it drops node 0.10 support
//perMessageDeflate: false
});
2014-12-10 15:16:07 +01:00
wsServer.on('connection',function(ws) {
2018-04-23 12:21:02 +02:00
var commsConnection = new CommsConnection(ws);
2014-09-22 15:33:26 +02:00
});
2018-04-23 12:21:02 +02:00
wsServer.on('error', function(err) {
2015-05-08 15:21:01 +02:00
log.warn(log._("comms.error-server",{message:err.toString()}));
2014-09-22 15:33:26 +02:00
});
2014-12-10 15:16:07 +01:00
lastSentTime = Date.now();
2014-12-10 15:16:07 +01:00
heartbeatTimer = setInterval(function() {
var now = Date.now();
if (now-lastSentTime > webSocketKeepAliveTime) {
2018-04-23 12:21:02 +02:00
activeConnections.forEach(connection => connection.send("hb",lastSentTime));
}
2014-12-10 15:16:07 +01:00
}, webSocketKeepAliveTime);
2014-05-08 15:15:54 +02:00
});
2014-09-22 15:33:26 +02:00
}
2014-05-07 21:47:25 +02:00
}
function stop() {
2014-09-22 15:33:26 +02:00
if (heartbeatTimer) {
clearInterval(heartbeatTimer);
2015-03-21 18:42:06 +01:00
heartbeatTimer = null;
2014-09-22 15:33:26 +02:00
}
if (wsServer) {
wsServer.close();
2015-03-21 18:42:06 +01:00
wsServer = null;
2014-09-22 15:33:26 +02:00
}
}
2018-04-23 12:21:02 +02:00
function addActiveConnection(connection) {
activeConnections.push(connection);
runtimeAPI.comms.addConnection({client: connection});
2014-05-07 21:47:25 +02:00
}
2018-04-23 12:21:02 +02:00
function removeActiveConnection(connection) {
2014-11-12 14:21:59 +01:00
for (var i=0;i<activeConnections.length;i++) {
2018-04-23 12:21:02 +02:00
if (activeConnections[i] === connection) {
2014-11-12 14:21:59 +01:00
activeConnections.splice(i,1);
2018-04-23 12:21:02 +02:00
runtimeAPI.comms.removeConnection({client:connection})
2014-11-12 14:21:59 +01:00
break;
}
}
}
2014-05-07 21:47:25 +02:00
module.exports = {
init:init,
start:start,
2018-04-23 12:21:02 +02:00
stop:stop
2014-05-07 21:47:25 +02:00
}