Refactor websocket + true receive (#471)

* save it

* ws: multiframe receive now works

* port cfg write with autocorrect to jsonprocessor
cleanup

* cleanup

* cleanup

* add support for image data over ws binary frame
This commit is contained in:
redPanther 2017-09-16 09:08:21 +02:00 committed by GitHub
parent 74ff5c7ada
commit 6f443a48dd
9 changed files with 361 additions and 226 deletions

View File

@ -421,7 +421,6 @@ $(document).ready(function() {
aceEdt.set(finalLedArray);
$('#collapse4').collapse('show');
});
// create and update editor

View File

@ -97,6 +97,7 @@ function initWebSocket()
case 1015: reason = "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)."; break;
default: reason = "Unknown reason";
}
console.log("[websocket::onclose] "+reason)
$(hyperion).trigger({type:"close", reason:reason});
watchdog = 10;
connectionLostDetection();
@ -262,15 +263,7 @@ function requestWriteConfig(config, full)
});
}
var config_str = escape(encode_utf8(JSON.stringify(serverConfig)));
$.post( "/cgi/cfg_set", { cfg: config_str })
.done(function( data ) {
$("html, body").animate({ scrollTop: 0 }, "slow");
})
.fail(function() {
showInfoDialog('error', $.i18n('infoDialog_writeconf_error_title'), $.i18n('infoDialog_writeconf_error_text'));
});
sendToHyperion("config","setconfig", '"config":'+JSON.stringify(serverConfig));
}
function requestWriteEffect(effectName,effectPy,effectArgs)

View File

@ -219,6 +219,12 @@ private:
///
void handleConfigGetCommand(const QJsonObject & message, const QString &command, const int tan);
/// Handle an incoming JSON SetConfig message from handleConfigCommand()
///
/// @param message the incoming message
///
void handleConfigSetCommand(const QJsonObject & message, const QString &command, const int tan);
///
/// Handle an incoming JSON Component State message
///

View File

@ -1,6 +1,7 @@
// Qt includes
#include <QCryptographicHash>
#include <QtEndian>
#include <unistd.h>
// project includes
#include "JsonClientConnection.h"
@ -13,8 +14,11 @@ JsonClientConnection::JsonClientConnection(QTcpSocket *socket)
, _hyperion(Hyperion::getInstance())
, _receiveBuffer()
, _webSocketHandshakeDone(false)
, _onContinuation(false)
, _log(Logger::getInstance("JSONCLIENTCONNECTION"))
, _notEnoughData(false)
, _clientAddress(socket->peerAddress())
, _connectionMode(CON_MODE::INIT)
{
// connect internal signals and slots
connect(_socket, SIGNAL(disconnected()), this, SLOT(socketClosed()));
@ -34,116 +38,182 @@ JsonClientConnection::~JsonClientConnection()
void JsonClientConnection::readData()
{
_receiveBuffer += _socket->readAll();
switch(_connectionMode)
{
case CON_MODE::INIT:
_receiveBuffer = _socket->readAll(); // initial read to determine connection
_connectionMode = (_receiveBuffer.contains("Upgrade: websocket")) ? CON_MODE::WEBSOCKET : CON_MODE::RAW;
if (_webSocketHandshakeDone)
{
// websocket mode, data frame
handleWebSocketFrame();
}
else
{
// might be a handshake request or raw socket data
if(_receiveBuffer.contains("Upgrade: websocket"))
{
doWebSocketHandshake();
} else
{
// raw socket data, handling as usual
int bytes = _receiveBuffer.indexOf('\n') + 1;
while(bytes > 0)
// init websockets
if (_connectionMode == CON_MODE::WEBSOCKET)
{
// create message string
QString message(QByteArray(_receiveBuffer.data(), bytes));
// remove message data from buffer
_receiveBuffer = _receiveBuffer.mid(bytes);
// handle message
_jsonProcessor->handleMessage(message);
// try too look up '\n' again
bytes = _receiveBuffer.indexOf('\n') + 1;
doWebSocketHandshake();
break;
}
}
// if no ws, hand over the data to raw handling
case CON_MODE::RAW:
handleRawJsonData();
break;
case CON_MODE::WEBSOCKET:
handleWebSocketFrame();
break;
}
}
void JsonClientConnection::handleRawJsonData()
{
_receiveBuffer += _socket->readAll();
// raw socket data, handling as usual
int bytes = _receiveBuffer.indexOf('\n') + 1;
while(bytes > 0)
{
// create message string
QString message(QByteArray(_receiveBuffer.data(), bytes));
// remove message data from buffer
_receiveBuffer = _receiveBuffer.mid(bytes);
// handle message
_jsonProcessor->handleMessage(message);
// try too look up '\n' again
bytes = _receiveBuffer.indexOf('\n') + 1;
}
}
void JsonClientConnection::getWsFrameHeader(WebSocketHeader* header)
{
char fin_rsv_opcode, mask_length;
_socket->getChar(&fin_rsv_opcode);
_socket->getChar(&mask_length);
header->fin = (fin_rsv_opcode & BHB0_FIN) == BHB0_FIN;
header->opCode = fin_rsv_opcode & BHB0_OPCODE;
header->masked = (mask_length & BHB1_MASK) == BHB1_MASK;
header->payloadLength = mask_length & BHB1_PAYLOAD;
// get size of payload
switch (header->payloadLength)
{
case payload_size_code_16bit:
{
QByteArray buf = _socket->read(2);
header->payloadLength = ((buf.at(2) << 8) & 0xFF00) | (buf.at(3) & 0xFF);
}
break;
case payload_size_code_64bit:
{
QByteArray buf = _socket->read(8);
header->payloadLength = 0;
for (uint i=0; i < 8; i++)
{
header->payloadLength |= ((quint64)(buf.at(i) & 0xFF)) << (8*(7-i));
}
}
break;
}
// if the data is masked we need to get the key for unmasking
if (header->masked)
{
_socket->read(header->key, 4);
}
}
void JsonClientConnection::handleWebSocketFrame()
{
if ((_receiveBuffer.at(0) & BHB0_FIN) == BHB0_FIN)
//printf("frame\n");
// we are on no continious reading from socket from call before
if (!_notEnoughData)
{
// final bit found, frame complete
quint8 * maskKey = NULL;
quint8 opCode = _receiveBuffer.at(0) & BHB0_OPCODE;
bool isMasked = (_receiveBuffer.at(1) & BHB0_FIN) == BHB0_FIN;
quint64 payloadLength = _receiveBuffer.at(1) & BHB1_PAYLOAD;
quint32 index = 2;
//printf("%ld\n", payloadLength);
switch (payloadLength)
{
case payload_size_code_16bit:
payloadLength = ((_receiveBuffer.at(2) << 8) & 0xFF00) | (_receiveBuffer.at(3) & 0xFF);
index += 2;
break;
case payload_size_code_64bit:
payloadLength = 0;
for (uint i=0; i < 8; i++)
{
payloadLength |= ((quint64)(_receiveBuffer.at(index+i) & 0xFF)) << (8*(7-i));
}
index += 8;
break;
default:
break;
}
getWsFrameHeader(&_wsh);
}
if (isMasked)
if(_socket->bytesAvailable() < (qint64)_wsh.payloadLength)
{
//printf("not enough data %llu %llu\n", _socket->bytesAvailable(), _wsh.payloadLength);
_notEnoughData=true;
return;
}
_notEnoughData = false;
QByteArray buf = _socket->read(_wsh.payloadLength);
//printf("opcode %x payload bytes %llu avail: %llu\n", _wsh.opCode, _wsh.payloadLength, _socket->bytesAvailable());
if (OPCODE::invalid((OPCODE::value)_wsh.opCode))
{
sendClose(CLOSECODE::INV_TYPE, "invalid opcode");
return;
}
// check the type of data frame
bool isContinuation=false;
switch (_wsh.opCode)
{
case OPCODE::CONTINUATION:
isContinuation = true;
// no break here, just jump over to opcode text
case OPCODE::BINARY:
case OPCODE::TEXT:
{
// if the data is masked we need to get the key for unmasking
maskKey = new quint8[4];
for (uint i=0; i < 4; i++)
// check for protocal violations
if (_onContinuation && !isContinuation)
{
maskKey[i] = _receiveBuffer.at(index + i);
sendClose(CLOSECODE::VIOLATION, "protocol violation, somebody sends frames in between continued frames");
return;
}
index += 4;
}
// check the type of data frame
switch (opCode)
{
case OPCODE::TEXT:
if (!_wsh.masked && _wsh.opCode == OPCODE::TEXT)
{
// frame contains text, extract it
QByteArray result = _receiveBuffer.mid(index, payloadLength);
_receiveBuffer.clear();
// unmask data if necessary
if (isMasked)
{
for (uint i=0; i < payloadLength; i++)
{
result[i] = (result[i] ^ maskKey[i % 4]);
}
if (maskKey != NULL)
{
delete[] maskKey;
maskKey = NULL;
}
}
_jsonProcessor->handleMessage(QString(result));
sendClose(CLOSECODE::VIOLATION, "protocol violation, unmasked text frames not allowed");
return;
}
break;
// unmask data
for (int i=0; i < buf.size(); i++)
{
buf[i] = buf[i] ^ _wsh.key[i % 4];
}
_onContinuation = !_wsh.fin || isContinuation;
// frame contains text, extract it, append data if this is a continuation
if (_wsh.fin && ! isContinuation) // one frame
{
_wsReceiveBuffer.clear();
}
_wsReceiveBuffer.append(buf);
// this is the final frame, decode and handle data
if (_wsh.fin)
{
_onContinuation = false;
if (_wsh.opCode == OPCODE::TEXT)
{
_jsonProcessor->handleMessage(QString(_wsReceiveBuffer));
}
else
{
handleBinaryMessage(_wsReceiveBuffer);
}
_wsReceiveBuffer.clear();
}
}
break;
case OPCODE::CLOSE:
{
// close request, confirm
quint8 close[] = {0x88, 0};
_socket->write((const char*)close, 2);
_socket->flush();
_socket->close();
sendClose(CLOSECODE::NORMAL);
}
break;
case OPCODE::PING:
{
// ping received, send pong
@ -152,16 +222,47 @@ void JsonClientConnection::handleWebSocketFrame()
_socket->flush();
}
break;
case OPCODE::PONG:
{
Error(_log, "pong recievied, protocol violation!");
}
default:
Warning(_log, "strange %d\n%s\n", _wsh.opCode, QSTRING_CSTR(QString(buf)));
}
}
/// See http://tools.ietf.org/html/rfc6455#section-5.2 for more information
void JsonClientConnection::sendClose(int status, QString reason)
{
Info(_log, "send close: %d %s", status, QSTRING_CSTR(reason));
ErrorIf(!reason.isEmpty(), _log, QSTRING_CSTR(reason));
_receiveBuffer.clear();
QByteArray sendBuffer;
sendBuffer.append(136+(status-1000));
int length = reason.size();
if(length >= 126)
{
sendBuffer.append( (length > 0xffff) ? 127 : 126);
int num_bytes = (length > 0xffff) ? 8 : 2;
for(int c = num_bytes - 1; c != -1; c--)
{
sendBuffer.append( quint8((static_cast<unsigned long long>(length) >> (8 * c)) % 256));
}
}
else
{
Error(_log, "Someone is sending very big messages over several frames... it's not supported yet");
quint8 close[] = {0x88, 0};
_socket->write((const char*)close, 2);
_socket->flush();
_socket->close();
sendBuffer.append(quint8(length));
}
sendBuffer.append(reason);
_socket->write(sendBuffer);
_socket->flush();
_socket->close();
}
void JsonClientConnection::doWebSocketHandshake()
@ -178,16 +279,14 @@ void JsonClientConnection::doWebSocketHandshake()
value += "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
// generate sha1 hash
QByteArray hash = QCryptographicHash::hash(value, QCryptographicHash::Sha1);
QByteArray hashB64 = hash.toBase64();
QByteArray hash = QCryptographicHash::hash(value, QCryptographicHash::Sha1).toBase64();
// prepare an answer
QString data
= QString("HTTP/1.1 101 Switching Protocols\r\n")
+ QString("Upgrade: websocket\r\n")
+ QString("Connection: Upgrade\r\n")
+ QString("Sec-WebSocket-Accept: ")
+ QString(hashB64.data()) + "\r\n\r\n";
+ QString("Sec-WebSocket-Accept: ")+QString(hash.data()) + "\r\n\r\n";
_socket->write(QSTRING_CSTR(data), data.size());
_socket->flush();
@ -202,12 +301,11 @@ void JsonClientConnection::socketClosed()
emit connectionClosed(this);
}
QByteArray JsonClientConnection::getFrameHeader(quint8 opCode, quint64 payloadLength, bool lastFrame)
QByteArray JsonClientConnection::makeFrameHeader(quint8 opCode, quint64 payloadLength, bool lastFrame)
{
QByteArray header;
bool ok = payloadLength <= 0x7FFFFFFFFFFFFFFFULL;
if (ok)
if (payloadLength <= 0x7FFFFFFFFFFFFFFFULL)
{
//FIN, RSV1-3, opcode (RSV-1, RSV-2 and RSV-3 are zero)
quint8 byte = static_cast<quint8>((opCode & 0x0F) | (lastFrame ? 0x80 : 0x00));
@ -226,7 +324,7 @@ QByteArray JsonClientConnection::getFrameHeader(quint8 opCode, quint64 payloadLe
quint16 swapped = qToBigEndian<quint16>(static_cast<quint16>(payloadLength));
header.append(static_cast<const char *>(static_cast<const void *>(&swapped)), 2);
}
else if (payloadLength <= 0x7FFFFFFFFFFFFFFFULL)
else
{
byte |= 127;
header.append(static_cast<char>(byte));
@ -258,7 +356,7 @@ qint64 JsonClientConnection::sendMessage_Raw(const char* data, quint64 size)
return _socket->write(data, size);
}
qint64 JsonClientConnection::sendMessage_Raw(QByteArray data)
qint64 JsonClientConnection::sendMessage_Raw(QByteArray &data)
{
return _socket->write(data.data(), data.size());
}
@ -278,7 +376,8 @@ qint64 JsonClientConnection::sendMessage_Websockets(QByteArray &data)
quint64 position = i * FRAME_SIZE_IN_BYTES;
quint32 frameSize = (payloadSize-position >= FRAME_SIZE_IN_BYTES) ? FRAME_SIZE_IN_BYTES : (payloadSize-position);
sendMessage_Raw(getFrameHeader(OPCODE::TEXT, frameSize, isLastFrame));
QByteArray buf = makeFrameHeader(OPCODE::TEXT, frameSize, isLastFrame);
sendMessage_Raw(buf);
qint64 written = sendMessage_Raw(payload+position,frameSize);
if (written > 0)
{
@ -300,4 +399,24 @@ qint64 JsonClientConnection::sendMessage_Websockets(QByteArray &data)
return payloadWritten;
}
void JsonClientConnection::handleBinaryMessage(QByteArray &data)
{
uint8_t priority = data.at(0);
unsigned duration_s = data.at(1);
unsigned imgSize = data.size() - 4;
unsigned width = ((data.at(2) << 8) & 0xFF00) | (data.at(3) & 0xFF);
unsigned height = imgSize / width;
if ( ! (imgSize) % width)
{
Error(_log, "data size is not multiple of width");
return;
}
Image<ColorRgb> image;
image.resize(width, height);
memcpy(image.memptr(), data.data()+4, imgSize);
_hyperion->setImage(priority, image, duration_s*1000);
}

View File

@ -69,6 +69,28 @@ namespace OPCODE {
}
}
namespace CLOSECODE {
enum value {
NORMAL = 1000,
AWAY = 1001,
TERM = 1002,
INV_TYPE = 1003,
INV_DATA = 1007,
VIOLATION = 1008,
BIG_MSG = 1009,
UNEXPECTED= 1011
};
}
namespace CON_MODE {
enum value {
INIT = 0,
RAW = 1,
WEBSOCKET = 2
};
}
///
/// The Connection object created by \a JsonServer when a new connection is establshed
///
@ -88,6 +110,14 @@ public:
///
~JsonClientConnection();
struct WebSocketHeader
{
bool fin;
quint8 opCode;
bool masked;
quint64 payloadLength;
char key[4];
};
public slots:
qint64 sendMessage(QJsonObject);
@ -124,11 +154,29 @@ private:
///
void handleWebSocketFrame();
QByteArray getFrameHeader(quint8 opCode, quint64 payloadLength, bool lastFrame);
///
/// Handle incoming raw data frame
///
void handleRawJsonData();
///
/// create ws header from socket and decode it
///
QByteArray makeFrameHeader(quint8 opCode, quint64 payloadLength, bool lastFrame);
///
/// handle binary message
///
/// This function should be placed elsewhere ....
///
void handleBinaryMessage(QByteArray &data);
qint64 sendMessage_Raw(const char* data, quint64 size);
qint64 sendMessage_Raw(QByteArray data);
qint64 sendMessage_Raw(QByteArray &data);
qint64 sendMessage_Websockets(QByteArray &data);
void sendClose(int status, QString reason = "");
void getWsFrameHeader(WebSocketHeader* header);
/// The TCP-Socket that is connected tot the Json-client
QTcpSocket * _socket;
@ -139,15 +187,25 @@ private:
/// The buffer used for reading data from the socket
QByteArray _receiveBuffer;
/// buffer for websockets multi frame receive
QByteArray _wsReceiveBuffer;
quint8 _maskKey[4];
/// used for WebSocket detection and connection handling
bool _webSocketHandshakeDone;
bool _onContinuation;
/// The logger instance
Logger * _log;
WebSocketHeader _wsh;
bool _notEnoughData;
/// address of client
QHostAddress _clientAddress;
CON_MODE::value _connectionMode;
// masks for fields in the basic header
static uint8_t const BHB0_OPCODE = 0x0F;
static uint8_t const BHB0_RSV3 = 0x10;

View File

@ -10,7 +10,7 @@
"subcommand": {
"type" : "string",
"required" : true,
"enum" : ["getconfig","getschema","reload"]
"enum" : ["setconfig","getconfig","getschema","reload"]
},
"tan" : {
"type" : "integer"

View File

@ -795,10 +795,15 @@ void JsonProcessor::handleConfigCommand(const QJsonObject& message, const QStrin
{
QString subcommand = message["subcommand"].toString("");
QString full_command = command + "-" + subcommand;
if (subcommand == "getschema")
{
handleSchemaGetCommand(message, full_command, tan);
}
else if (subcommand == "setconfig")
{
handleConfigSetCommand(message, full_command, tan);
}
else if (subcommand == "getconfig")
{
handleConfigGetCommand(message, full_command, tan);
@ -815,6 +820,67 @@ void JsonProcessor::handleConfigCommand(const QJsonObject& message, const QStrin
}
}
void JsonProcessor::handleConfigSetCommand(const QJsonObject& message, const QString &command, const int tan)
{
if(message.size() > 0)
{
if (message.contains("config"))
{
QJsonObject hyperionConfigJsonObj = message["config"].toObject();
try
{
Q_INIT_RESOURCE(resource);
QJsonObject schemaJson = QJsonFactory::readSchema(":/hyperion-schema");
QJsonSchemaChecker schemaChecker;
schemaChecker.setSchema(schemaJson);
QPair<bool, bool> validate = schemaChecker.validate(hyperionConfigJsonObj);
if (validate.first && validate.second)
{
QJsonFactory::writeJson(_hyperion->getConfigFileName(), hyperionConfigJsonObj);
}
else if (!validate.first && validate.second)
{
Warning(_log,"Errors have been found in the configuration file. Automatic correction is applied");
QStringList schemaErrors = schemaChecker.getMessages();
for (auto & schemaError : schemaErrors)
Info(_log, QSTRING_CSTR(schemaError));
hyperionConfigJsonObj = schemaChecker.getAutoCorrectedConfig(hyperionConfigJsonObj);
if (!QJsonFactory::writeJson(_hyperion->getConfigFileName(), hyperionConfigJsonObj))
throw std::runtime_error("ERROR: can not save configuration file, aborting");
}
else //Error in Schema
{
QString errorMsg = "ERROR: Json validation failed: \n";
QStringList schemaErrors = schemaChecker.getMessages();
for (auto & schemaError: schemaErrors)
{
Error(_log, "config write validation: %s", QSTRING_CSTR(schemaError));
errorMsg += schemaError + "\n";
}
throw std::runtime_error(errorMsg.toStdString());
}
sendSuccessReply(command, tan);
}
catch(const std::runtime_error& validate_error)
{
sendErrorReply("Error while validating json: " + QString(validate_error.what()), command, tan);
}
}
}
else
{
sendErrorReply("Error while parsing json: Message size " + QString(message.size()), command, tan);
}
}
void JsonProcessor::handleConfigGetCommand(const QJsonObject& message, const QString& command, const int tan)
{
// create result

View File

@ -37,15 +37,13 @@ void CgiHandler::exec(const QStringList & args, QtHttpRequest * request, QtHttpR
_request = request;
_reply = reply;
cmd_cfg_jsonserver();
cmd_cfg_get();
cmd_cfg_set();
// cmd_cfg_set();
cmd_runscript();
throw 1;
}
catch(int e)
{
if (e != 0)
throw 1;
if (e != 0) throw 1;
}
}
@ -68,108 +66,6 @@ void CgiHandler::cmd_cfg_jsonserver()
}
}
void CgiHandler::cmd_cfg_get()
{
if ( _args.at(0) == "cfg_get" )
{
QFile file ( _hyperion->getConfigFileName() );
if (file.exists ())
{
if (file.open (QFile::ReadOnly)) {
QByteArray data = file.readAll ();
_reply->addHeader ("Content-Type", "text/plain");
_reply->appendRawData (data);
file.close ();
}
}
throw 0;
}
}
void CgiHandler::cmd_cfg_set()
{
_reply->addHeader ("Content-Type", "text/plain");
if ( _args.at(0) == "cfg_set" )
{
QtHttpPostData data = _request->getPostData();
QJsonParseError error;
if (data.contains("cfg"))
{
QJsonDocument hyperionConfig = QJsonDocument::fromJson(QByteArray::fromPercentEncoding(data["cfg"]), &error);
if (error.error == QJsonParseError::NoError)
{
QJsonObject hyperionConfigJsonObj = hyperionConfig.object();
try
{
// make sure the resources are loaded (they may be left out after static linking)
Q_INIT_RESOURCE(resource);
QString schemaFile = ":/hyperion-schema";
QJsonObject schemaJson;
try
{
schemaJson = QJsonFactory::readSchema(schemaFile);
}
catch(const std::runtime_error& error)
{
throw std::runtime_error(error.what());
}
QJsonSchemaChecker schemaChecker;
schemaChecker.setSchema(schemaJson);
QPair<bool, bool> validate = schemaChecker.validate(hyperionConfigJsonObj);
if (validate.first && validate.second)
{
QJsonFactory::writeJson(_hyperion->getConfigFileName(), hyperionConfigJsonObj);
}
else if (!validate.first && validate.second)
{
Warning(_log,"Errors have been found in the configuration file. Automatic correction is applied");
QStringList schemaErrors = schemaChecker.getMessages();
foreach (auto & schemaError, schemaErrors)
Info(_log, schemaError.toUtf8().constData());
hyperionConfigJsonObj = schemaChecker.getAutoCorrectedConfig(hyperionConfigJsonObj);
if (!QJsonFactory::writeJson(_hyperion->getConfigFileName(), hyperionConfigJsonObj))
throw std::runtime_error("ERROR: can not save configuration file, aborting ");
}
else //Error in Schema
{
QString errorMsg = "ERROR: Json validation failed: \n";
QStringList schemaErrors = schemaChecker.getMessages();
foreach (auto & schemaError, schemaErrors)
{
Error(_log, "config write validation: %s", QSTRING_CSTR(schemaError));
errorMsg += schemaError + "\n";
}
throw std::runtime_error(errorMsg.toStdString());
}
}
catch(const std::runtime_error& validate_error)
{
_reply->appendRawData (QString(validate_error.what()).toUtf8());
}
}
else
{
//Debug(_log, "error while saving: %s", error.errorString()).toLocal8bit.constData());
_reply->appendRawData (QString("Error while validating json: "+error.errorString()).toUtf8());
}
}
throw 0;
}
}
void CgiHandler::cmd_runscript()
{
if ( _args.at(0) == "run" )

View File

@ -22,8 +22,6 @@ public:
// cgi commands
void cmd_cfg_jsonserver();
void cmd_cfg_get ();
void cmd_cfg_set ();
void cmd_runscript ();
private: