1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00
node-red/packages/node_modules/@node-red/editor-api/lib/editor/comms.js
Nick O'Leary ed31a0cf15
Update to WS 6.x and fix all it broke
Significant update to the ws module to get it completely up to date.

The jump from 1.x to 6.x has required a rewrite of our WS handling. Most
specifically the means by which you can have multiple ws servers on a
single http server has completely changed; we now have to handle the
'upgrade' event on the server ourselves.
2019-01-08 16:21:36 +00:00

255 lines
8.2 KiB
JavaScript

/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var ws = require("ws");
var url = require("url");
var log = require("@node-red/util").log; // TODO: separate module
var Tokens;
var Users;
var Permissions;
var server;
var settings;
var runtimeAPI;
var wsServer;
var activeConnections = [];
var anonymousUser;
var retained = {};
var heartbeatTimer;
var lastSentTime;
function init(_server,_settings,_runtimeAPI) {
server = _server;
settings = _settings;
runtimeAPI = _runtimeAPI;
Tokens = require("../auth/tokens");
Tokens.onSessionExpiry(handleSessionExpiry);
Users = require("../auth/users");
Permissions = require("../auth/permissions");
}
function handleSessionExpiry(session) {
activeConnections.forEach(connection => {
if (connection.token === session.accessToken) {
connection.ws.send(JSON.stringify({auth:"fail"}));
connection.ws.close();
}
})
}
function generateSession(length) {
var c = "ABCDEFGHIJKLMNOPQRSTUZWXYZabcdefghijklmnopqrstuvwxyz1234567890";
var token = [];
for (var i=0;i<length;i++) {
token.push(c[Math.floor(Math.random()*c.length)]);
}
return token.join("");
}
function CommsConnection(ws) {
this.session = generateSession(32);
this.ws = ws;
this.stack = [];
this.user = null;
this.lastSentTime = 0;
var self = this;
log.audit({event: "comms.open"});
log.trace("comms.open "+self.session);
var pendingAuth = (settings.adminAuth != null);
if (!pendingAuth) {
addActiveConnection(self);
}
ws.on('close',function() {
console.log(arguments);
log.audit({event: "comms.close",user:self.user, session: self.session});
log.trace("comms.close "+self.session);
removeActiveConnection(self);
});
ws.on('message', function(data,flags) {
var msg = null;
try {
msg = JSON.parse(data);
} catch(err) {
log.trace("comms received malformed message : "+err.toString());
return;
}
if (!pendingAuth) {
if (msg.subscribe) {
self.subscribe(msg.subscribe);
// handleRemoteSubscription(ws,msg.subscribe);
}
} else {
var completeConnection = function(userScope,session,sendAck) {
try {
if (!userScope || !Permissions.hasPermission(userScope,"status.read")) {
ws.send(JSON.stringify({auth:"fail"}));
ws.close();
} else {
pendingAuth = false;
addActiveConnection(self);
self.token = msg.auth;
if (sendAck) {
ws.send(JSON.stringify({auth:"ok"}));
}
}
} catch(err) {
console.log(err.stack);
// Just in case the socket closes before we attempt
// to send anything.
}
}
if (msg.auth) {
Tokens.get(msg.auth).then(function(client) {
if (client) {
Users.get(client.user).then(function(user) {
if (user) {
self.user = user;
log.audit({event: "comms.auth",user:self.user});
completeConnection(client.scope,msg.auth,true);
} else {
log.audit({event: "comms.auth.fail"});
completeConnection(null,null,false);
}
});
} else {
log.audit({event: "comms.auth.fail"});
completeConnection(null,null,false);
}
});
} else {
if (anonymousUser) {
log.audit({event: "comms.auth",user:anonymousUser});
self.user = anonymousUser;
completeConnection(anonymousUser.permissions,null,false);
//TODO: duplicated code - pull non-auth message handling out
if (msg.subscribe) {
self.subscribe(msg.subscribe);
}
} else {
log.audit({event: "comms.auth.fail"});
completeConnection(null,null,false);
}
}
}
});
ws.on('error', function(err) {
log.warn(log._("comms.error",{message:err.toString()}));
});
}
CommsConnection.prototype.send = function(topic,data) {
var self = this;
if (topic && data) {
this.stack.push({topic:topic,data:data});
}
if (!this._xmitTimer) {
this._xmitTimer = setTimeout(function() {
try {
self.ws.send(JSON.stringify(self.stack));
self.lastSentTime = Date.now();
} catch(err) {
removeActiveConnection(self);
log.warn(log._("comms.error-send",{message:err.toString()}));
}
delete self._xmitTimer;
self.stack = [];
},50);
}
}
CommsConnection.prototype.subscribe = function(topic) {
runtimeAPI.comms.subscribe({
user: this.user,
client: this,
topic: topic
})
}
function start() {
if (!settings.disableEditor) {
Users.default().then(function(_anonymousUser) {
anonymousUser = _anonymousUser;
var webSocketKeepAliveTime = settings.webSocketKeepAliveTime || 15000;
var commsPath = settings.httpAdminRoot || "/";
commsPath = (commsPath.slice(0,1) != "/" ? "/":"") + commsPath + (commsPath.slice(-1) == "/" ? "":"/") + "comms";
wsServer = new ws.Server({ noServer: true });
wsServer.on('connection',function(ws) {
var commsConnection = new CommsConnection(ws);
});
wsServer.on('error', function(err) {
log.warn(log._("comms.error-server",{message:err.toString()}));
});
server.on('upgrade', function upgrade(request, socket, head) {
const pathname = url.parse(request.url).pathname;
if (pathname === commsPath) {
wsServer.handleUpgrade(request, socket, head, function done(ws) {
wsServer.emit('connection', ws, request);
});
}
// Don't destroy the socket as other listeners may want to handle the
// event.
});
lastSentTime = Date.now();
heartbeatTimer = setInterval(function() {
var now = Date.now();
if (now-lastSentTime > webSocketKeepAliveTime) {
activeConnections.forEach(connection => connection.send("hb",lastSentTime));
}
}, webSocketKeepAliveTime);
});
}
}
function stop() {
if (heartbeatTimer) {
clearInterval(heartbeatTimer);
heartbeatTimer = null;
}
if (wsServer) {
wsServer.close();
wsServer = null;
}
}
function addActiveConnection(connection) {
activeConnections.push(connection);
runtimeAPI.comms.addConnection({client: connection});
}
function removeActiveConnection(connection) {
for (var i=0;i<activeConnections.length;i++) {
if (activeConnections[i] === connection) {
activeConnections.splice(i,1);
runtimeAPI.comms.removeConnection({client:connection})
break;
}
}
}
module.exports = {
init:init,
start:start,
stop:stop
}