Move WebSocket to Webserver & HttpJsonRpc async (#486)

* Move WebSocket to Webserver and HttpJsonRpc is now async

* revert...
This commit is contained in:
brindosch 2017-11-20 00:06:45 +01:00 committed by GitHub
parent 9799fae7f9
commit 0f9f3a17e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 733 additions and 722 deletions

View File

@ -65,73 +65,71 @@ function initWebSocket()
{ {
if (websocket == null) if (websocket == null)
{ {
$.ajax({ url: "/cgi/cfg_jsonserver" }).done(function(data) { jsonPort = (document.location.port == '') ? '80' : document.location.port;
jsonPort = data.substr(1); websocket = new WebSocket('ws://'+document.location.hostname+":"+document.location.port);
websocket = new WebSocket('ws://'+document.location.hostname+data); console.log(jsonPort)
websocket.onopen = function (event) {
$(hyperion).trigger({type:"open"});
websocket.onopen = function (event) { $(hyperion).on("cmd-serverinfo", function(event) {
$(hyperion).trigger({type:"open"}); watchdog = 0;
});
};
$(hyperion).on("cmd-serverinfo", function(event) { websocket.onclose = function (event) {
watchdog = 0; // See http://tools.ietf.org/html/rfc6455#section-7.4.1
}); var reason;
}; switch(event.code)
{
case 1000: reason = "Normal closure, meaning that the purpose for which the connection was established has been fulfilled."; break;
case 1001: reason = "An endpoint is \"going away\", such as a server going down or a browser having navigated away from a page."; break;
case 1002: reason = "An endpoint is terminating the connection due to a protocol error"; break;
case 1003: reason = "An endpoint is terminating the connection because it has received a type of data it cannot accept (e.g., an endpoint that understands only text data MAY send this if it receives a binary message)."; break;
case 1004: reason = "Reserved. The specific meaning might be defined in the future."; break;
case 1005: reason = "No status code was actually present."; break;
case 1006: reason = "The connection was closed abnormally, e.g., without sending or receiving a Close control frame"; break;
case 1007: reason = "An endpoint is terminating the connection because it has received data within a message that was not consistent with the type of the message (e.g., non-UTF-8 [http://tools.ietf.org/html/rfc3629] data within a text message)."; break;
case 1008: reason = "An endpoint is terminating the connection because it has received a message that \"violates its policy\". This reason is given either if there is no other sutible reason, or if there is a need to hide specific details about the policy."; break;
case 1009: reason = "An endpoint is terminating the connection because it has received a message that is too big for it to process."; break;
case 1010: reason = "An endpoint (client) is terminating the connection because it has expected the server to negotiate one or more extension, but the server didn't return them in the response message of the WebSocket handshake. <br /> Specifically, the extensions that are needed are: " + event.reason; break;
case 1011: reason = "A server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request."; break;
case 1015: reason = "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)."; break;
default: reason = "Unknown reason";
}
console.log("[websocket::onclose] "+reason)
$(hyperion).trigger({type:"close", reason:reason});
watchdog = 10;
connectionLostDetection();
};
websocket.onclose = function (event) { websocket.onmessage = function (event) {
// See http://tools.ietf.org/html/rfc6455#section-7.4.1 try
var reason; {
switch(event.code) response = JSON.parse(event.data);
success = response.success;
cmd = response.command;
if (success)
{ {
case 1000: reason = "Normal closure, meaning that the purpose for which the connection was established has been fulfilled."; break; $(hyperion).trigger({type:"cmd-"+cmd, response:response});
case 1001: reason = "An endpoint is \"going away\", such as a server going down or a browser having navigated away from a page."; break;
case 1002: reason = "An endpoint is terminating the connection due to a protocol error"; break;
case 1003: reason = "An endpoint is terminating the connection because it has received a type of data it cannot accept (e.g., an endpoint that understands only text data MAY send this if it receives a binary message)."; break;
case 1004: reason = "Reserved. The specific meaning might be defined in the future."; break;
case 1005: reason = "No status code was actually present."; break;
case 1006: reason = "The connection was closed abnormally, e.g., without sending or receiving a Close control frame"; break;
case 1007: reason = "An endpoint is terminating the connection because it has received data within a message that was not consistent with the type of the message (e.g., non-UTF-8 [http://tools.ietf.org/html/rfc3629] data within a text message)."; break;
case 1008: reason = "An endpoint is terminating the connection because it has received a message that \"violates its policy\". This reason is given either if there is no other sutible reason, or if there is a need to hide specific details about the policy."; break;
case 1009: reason = "An endpoint is terminating the connection because it has received a message that is too big for it to process."; break;
case 1010: reason = "An endpoint (client) is terminating the connection because it has expected the server to negotiate one or more extension, but the server didn't return them in the response message of the WebSocket handshake. <br /> Specifically, the extensions that are needed are: " + event.reason; break;
case 1011: reason = "A server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request."; break;
case 1015: reason = "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)."; break;
default: reason = "Unknown reason";
} }
console.log("[websocket::onclose] "+reason) else
$(hyperion).trigger({type:"close", reason:reason});
watchdog = 10;
connectionLostDetection();
};
websocket.onmessage = function (event) {
try
{ {
response = JSON.parse(event.data); error = response.hasOwnProperty("error")? response.error : "unknown";
success = response.success; $(hyperion).trigger({type:"error",reason:error});
cmd = response.command; console.log("[websocket::onmessage] "+error)
if (success)
{
$(hyperion).trigger({type:"cmd-"+cmd, response:response});
}
else
{
error = response.hasOwnProperty("error")? response.error : "unknown";
$(hyperion).trigger({type:"error",reason:error});
console.log("[websocket::onmessage] "+error)
}
} }
catch(exception_error) }
{ catch(exception_error)
$(hyperion).trigger({type:"error",reason:exception_error}); {
console.log("[websocket::onmessage] "+exception_error) $(hyperion).trigger({type:"error",reason:exception_error});
} console.log("[websocket::onmessage] "+exception_error)
}; }
};
websocket.onerror = function (error) { websocket.onerror = function (error) {
$(hyperion).trigger({type:"error",reason:error}); $(hyperion).trigger({type:"error",reason:error});
console.log("[websocket::onerror] "+error) console.log("[websocket::onerror] "+error)
}; };
});
} }
} }
else else

View File

@ -1,8 +1,5 @@
#pragma once #pragma once
// system includes
#include <cstdint>
// Qt includes // Qt includes
#include <QTcpServer> #include <QTcpServer>
#include <QSet> #include <QSet>
@ -45,9 +42,8 @@ private slots:
/// ///
/// Slot which is called when a client closes a connection /// Slot which is called when a client closes a connection
/// @param connection The Connection object which is being closed
/// ///
void closedConnection(JsonClientConnection * connection); void closedConnection(void);
/// forward message to all json slaves /// forward message to all json slaves
void forwardJsonMessage(const QJsonObject &message); void forwardJsonMessage(const QJsonObject &message);

View File

@ -46,18 +46,17 @@ public:
/// ///
/// @param peerAddress provide the Address of the peer /// @param peerAddress provide the Address of the peer
/// @param log The Logger class of the creator /// @param log The Logger class of the creator
/// @param parent Parent QObject
/// @param noListener if true, this instance won't listen for hyperion push events /// @param noListener if true, this instance won't listen for hyperion push events
/// ///
JsonProcessor(QString peerAddress, Logger* log, bool noListener = false); JsonProcessor(QString peerAddress, Logger* log, QObject* parent, bool noListener = false);
~JsonProcessor();
/// ///
/// Handle an incoming JSON message /// Handle an incoming JSON message
/// ///
/// @param message the incoming message as string /// @param message the incoming message as string
/// @param peerAddress overwrite peerAddress of constructor
/// ///
void handleMessage(const QString & message, const QString peerAddress = NULL); void handleMessage(const QString & message);
/// ///
/// send a forced serverinfo to a client /// send a forced serverinfo to a client

View File

@ -1,69 +1,23 @@
// Qt includes
#include <QCryptographicHash>
#include <QtEndian>
#include <unistd.h>
// project includes // project includes
#include "JsonClientConnection.h" #include "JsonClientConnection.h"
#include <utils/JsonProcessor.h>
const quint64 FRAME_SIZE_IN_BYTES = 512 * 512 * 2; //maximum size of a frame when sending a message #include <QTcpSocket>
JsonClientConnection::JsonClientConnection(QTcpSocket *socket) JsonClientConnection::JsonClientConnection(QTcpSocket *socket)
: QObject() : QObject()
, _socket(socket) , _socket(socket)
, _hyperion(Hyperion::getInstance())
, _receiveBuffer() , _receiveBuffer()
, _webSocketHandshakeDone(false)
, _onContinuation(false)
, _log(Logger::getInstance("JSONCLIENTCONNECTION")) , _log(Logger::getInstance("JSONCLIENTCONNECTION"))
, _notEnoughData(false)
, _clientAddress(socket->peerAddress())
, _connectionMode(CON_MODE::INIT)
{ {
// connect internal signals and slots connect(_socket, &QTcpSocket::disconnected, this, &JsonClientConnection::disconnected);
connect(_socket, SIGNAL(disconnected()), this, SLOT(socketClosed())); connect(_socket, &QTcpSocket::readyRead, this, &JsonClientConnection::readRequest);
connect(_socket, SIGNAL(readyRead()), this, SLOT(readData()));
// create a new instance of JsonProcessor // create a new instance of JsonProcessor
_jsonProcessor = new JsonProcessor(_clientAddress.toString(), _log); _jsonProcessor = new JsonProcessor(socket->peerAddress().toString(), _log, this);
// get the callback messages from JsonProcessor and send it to the client // get the callback messages from JsonProcessor and send it to the client
connect(_jsonProcessor,SIGNAL(callbackMessage(QJsonObject)),this,SLOT(sendMessage(QJsonObject))); connect(_jsonProcessor,SIGNAL(callbackMessage(QJsonObject)),this,SLOT(sendMessage(QJsonObject)));
} }
JsonClientConnection::~JsonClientConnection() void JsonClientConnection::readRequest()
{
delete _socket;
delete _jsonProcessor;
}
void JsonClientConnection::readData()
{
switch(_connectionMode)
{
case CON_MODE::INIT:
_receiveBuffer = _socket->readAll(); // initial read to determine connection
_connectionMode = (_receiveBuffer.contains("Upgrade: websocket")) ? CON_MODE::WEBSOCKET : CON_MODE::RAW;
// init websockets
if (_connectionMode == CON_MODE::WEBSOCKET)
{
doWebSocketHandshake();
break;
}
// if no ws, hand over the data to raw handling
case CON_MODE::RAW:
handleRawJsonData();
break;
case CON_MODE::WEBSOCKET:
handleWebSocketFrame();
break;
}
}
void JsonClientConnection::handleRawJsonData()
{ {
_receiveBuffer += _socket->readAll(); _receiveBuffer += _socket->readAll();
// raw socket data, handling as usual // raw socket data, handling as usual
@ -84,339 +38,16 @@ void JsonClientConnection::handleRawJsonData()
} }
} }
void JsonClientConnection::getWsFrameHeader(WebSocketHeader* header)
{
char fin_rsv_opcode, mask_length;
_socket->getChar(&fin_rsv_opcode);
_socket->getChar(&mask_length);
header->fin = (fin_rsv_opcode & BHB0_FIN) == BHB0_FIN;
header->opCode = fin_rsv_opcode & BHB0_OPCODE;
header->masked = (mask_length & BHB1_MASK) == BHB1_MASK;
header->payloadLength = mask_length & BHB1_PAYLOAD;
// get size of payload
switch (header->payloadLength)
{
case payload_size_code_16bit:
{
QByteArray buf = _socket->read(2);
header->payloadLength = ((buf.at(0) << 8) & 0xFF00) | (buf.at(1) & 0xFF);
}
break;
case payload_size_code_64bit:
{
QByteArray buf = _socket->read(8);
header->payloadLength = 0;
for (uint i=0; i < 8; i++)
{
header->payloadLength |= ((quint64)(buf.at(i) & 0xFF)) << (8*(7-i));
}
}
break;
}
// if the data is masked we need to get the key for unmasking
if (header->masked)
{
_socket->read(header->key, 4);
}
}
void JsonClientConnection::handleWebSocketFrame()
{
//printf("frame\n");
// we are on no continious reading from socket from call before
if (!_notEnoughData)
{
getWsFrameHeader(&_wsh);
}
if(_socket->bytesAvailable() < (qint64)_wsh.payloadLength)
{
//printf("not enough data %llu %llu\n", _socket->bytesAvailable(), _wsh.payloadLength);
_notEnoughData=true;
return;
}
_notEnoughData = false;
QByteArray buf = _socket->read(_wsh.payloadLength);
//printf("opcode %x payload bytes %llu avail: %llu\n", _wsh.opCode, _wsh.payloadLength, _socket->bytesAvailable());
if (OPCODE::invalid((OPCODE::value)_wsh.opCode))
{
sendClose(CLOSECODE::INV_TYPE, "invalid opcode");
return;
}
// check the type of data frame
bool isContinuation=false;
switch (_wsh.opCode)
{
case OPCODE::CONTINUATION:
isContinuation = true;
// no break here, just jump over to opcode text
case OPCODE::BINARY:
case OPCODE::TEXT:
{
// check for protocal violations
if (_onContinuation && !isContinuation)
{
sendClose(CLOSECODE::VIOLATION, "protocol violation, somebody sends frames in between continued frames");
return;
}
if (!_wsh.masked && _wsh.opCode == OPCODE::TEXT)
{
sendClose(CLOSECODE::VIOLATION, "protocol violation, unmasked text frames not allowed");
return;
}
// unmask data
for (int i=0; i < buf.size(); i++)
{
buf[i] = buf[i] ^ _wsh.key[i % 4];
}
_onContinuation = !_wsh.fin || isContinuation;
// frame contains text, extract it, append data if this is a continuation
if (_wsh.fin && ! isContinuation) // one frame
{
_wsReceiveBuffer.clear();
}
_wsReceiveBuffer.append(buf);
// this is the final frame, decode and handle data
if (_wsh.fin)
{
_onContinuation = false;
if (_wsh.opCode == OPCODE::TEXT)
{
_jsonProcessor->handleMessage(QString(_wsReceiveBuffer));
}
else
{
handleBinaryMessage(_wsReceiveBuffer);
}
_wsReceiveBuffer.clear();
}
}
break;
case OPCODE::CLOSE:
{
sendClose(CLOSECODE::NORMAL);
}
break;
case OPCODE::PING:
{
// ping received, send pong
quint8 pong[] = {OPCODE::PONG, 0};
_socket->write((const char*)pong, 2);
_socket->flush();
}
break;
case OPCODE::PONG:
{
Error(_log, "pong recievied, protocol violation!");
}
default:
Warning(_log, "strange %d\n%s\n", _wsh.opCode, QSTRING_CSTR(QString(buf)));
}
}
/// See http://tools.ietf.org/html/rfc6455#section-5.2 for more information
void JsonClientConnection::sendClose(int status, QString reason)
{
Info(_log, "send close: %d %s", status, QSTRING_CSTR(reason));
ErrorIf(!reason.isEmpty(), _log, QSTRING_CSTR(reason));
_receiveBuffer.clear();
QByteArray sendBuffer;
sendBuffer.append(136+(status-1000));
int length = reason.size();
if(length >= 126)
{
sendBuffer.append( (length > 0xffff) ? 127 : 126);
int num_bytes = (length > 0xffff) ? 8 : 2;
for(int c = num_bytes - 1; c != -1; c--)
{
sendBuffer.append( quint8((static_cast<unsigned long long>(length) >> (8 * c)) % 256));
}
}
else
{
sendBuffer.append(quint8(length));
}
sendBuffer.append(reason);
_socket->write(sendBuffer);
_socket->flush();
_socket->close();
}
void JsonClientConnection::doWebSocketHandshake()
{
// http header, might not be a very reliable check...
Debug(_log, "Websocket handshake");
// get the key to prepare an answer
int start = _receiveBuffer.indexOf("Sec-WebSocket-Key") + 19;
QByteArray value = _receiveBuffer.mid(start, _receiveBuffer.indexOf("\r\n", start) - start);
_receiveBuffer.clear();
// must be always appended
value += "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
// generate sha1 hash
QByteArray hash = QCryptographicHash::hash(value, QCryptographicHash::Sha1).toBase64();
// prepare an answer
QString data
= QString("HTTP/1.1 101 Switching Protocols\r\n")
+ QString("Upgrade: websocket\r\n")
+ QString("Connection: Upgrade\r\n")
+ QString("Sec-WebSocket-Accept: ")+QString(hash.data()) + "\r\n\r\n";
_socket->write(QSTRING_CSTR(data), data.size());
_socket->flush();
// we are in WebSocket mode, data frames should follow next
_webSocketHandshakeDone = true;
}
void JsonClientConnection::socketClosed()
{
_webSocketHandshakeDone = false;
emit connectionClosed(this);
}
QByteArray JsonClientConnection::makeFrameHeader(quint8 opCode, quint64 payloadLength, bool lastFrame)
{
QByteArray header;
if (payloadLength <= 0x7FFFFFFFFFFFFFFFULL)
{
//FIN, RSV1-3, opcode (RSV-1, RSV-2 and RSV-3 are zero)
quint8 byte = static_cast<quint8>((opCode & 0x0F) | (lastFrame ? 0x80 : 0x00));
header.append(static_cast<char>(byte));
byte = 0x00;
if (payloadLength <= 125)
{
byte |= static_cast<quint8>(payloadLength);
header.append(static_cast<char>(byte));
}
else if (payloadLength <= 0xFFFFU)
{
byte |= 126;
header.append(static_cast<char>(byte));
quint16 swapped = qToBigEndian<quint16>(static_cast<quint16>(payloadLength));
header.append(static_cast<const char *>(static_cast<const void *>(&swapped)), 2);
}
else
{
byte |= 127;
header.append(static_cast<char>(byte));
quint64 swapped = qToBigEndian<quint64>(payloadLength);
header.append(static_cast<const char *>(static_cast<const void *>(&swapped)), 8);
}
}
else
{
Error(_log, "JsonClientConnection::getHeader: payload too big!");
}
return header;
}
qint64 JsonClientConnection::sendMessage(QJsonObject message) qint64 JsonClientConnection::sendMessage(QJsonObject message)
{ {
QJsonDocument writer(message); QJsonDocument writer(message);
QByteArray serializedReply = writer.toJson(QJsonDocument::Compact) + "\n"; QByteArray data = writer.toJson(QJsonDocument::Compact) + "\n";
if (!_socket || (_socket->state() != QAbstractSocket::ConnectedState)) return 0; if (!_socket || (_socket->state() != QAbstractSocket::ConnectedState)) return 0;
if (_webSocketHandshakeDone) return sendMessage_Websockets(serializedReply);
return sendMessage_Raw(serializedReply);
}
qint64 JsonClientConnection::sendMessage_Raw(const char* data, quint64 size)
{
return _socket->write(data, size);
}
qint64 JsonClientConnection::sendMessage_Raw(QByteArray &data)
{
return _socket->write(data.data(), data.size()); return _socket->write(data.data(), data.size());
} }
qint64 JsonClientConnection::sendMessage_Websockets(QByteArray &data) void JsonClientConnection::disconnected(void)
{ {
qint64 payloadWritten = 0; emit connectionClosed();
quint32 payloadSize = data.size();
const char * payload = data.data();
qint32 numFrames = payloadSize / FRAME_SIZE_IN_BYTES + ((quint64(payloadSize) % FRAME_SIZE_IN_BYTES) > 0 ? 1 : 0);
for (int i = 0; i < numFrames; ++i)
{
const bool isLastFrame = (i == (numFrames - 1));
quint64 position = i * FRAME_SIZE_IN_BYTES;
quint32 frameSize = (payloadSize-position >= FRAME_SIZE_IN_BYTES) ? FRAME_SIZE_IN_BYTES : (payloadSize-position);
QByteArray buf = makeFrameHeader(OPCODE::TEXT, frameSize, isLastFrame);
sendMessage_Raw(buf);
qint64 written = sendMessage_Raw(payload+position,frameSize);
if (written > 0)
{
payloadWritten += written;
}
else
{
_socket->flush();
Error(_log, "Error writing bytes to socket: %s", QSTRING_CSTR(_socket->errorString()));
break;
}
}
if (payloadSize != payloadWritten)
{
Error(_log, "Error writing bytes to socket %d bytes from %d writte", payloadWritten, payloadSize);
return -1;
}
return payloadWritten;
}
void JsonClientConnection::handleBinaryMessage(QByteArray &data)
{
uint8_t priority = data.at(0);
unsigned duration_s = data.at(1);
unsigned imgSize = data.size() - 4;
unsigned width = ((data.at(2) << 8) & 0xFF00) | (data.at(3) & 0xFF);
unsigned height = imgSize / width;
if ( ! (imgSize) % width)
{
Error(_log, "data size is not multiple of width");
return;
}
Image<ColorRgb> image;
image.resize(width, height);
memcpy(image.memptr(), data.data()+4, imgSize);
_hyperion->setImage(priority, image, duration_s*1000);
} }

View File

@ -1,95 +1,14 @@
#pragma once #pragma once
#include <map>
// Qt includes // Qt includes
#include <QByteArray>
#include <QTcpSocket>
#include <QHostAddress>
#include <QString> #include <QString>
#include <QByteArray>
// Hyperion includes
#include <hyperion/Hyperion.h>
// util includes // util includes
#include <utils/Logger.h> #include <utils/Logger.h>
#include <utils/JsonProcessor.h>
/// Constants and utility functions related to WebSocket opcodes
/**
* WebSocket Opcodes are 4 bits. See RFC6455 section 5.2.
*/
namespace OPCODE {
enum value {
CONTINUATION = 0x0,
TEXT = 0x1,
BINARY = 0x2,
RSV3 = 0x3,
RSV4 = 0x4,
RSV5 = 0x5,
RSV6 = 0x6,
RSV7 = 0x7,
CLOSE = 0x8,
PING = 0x9,
PONG = 0xA,
CONTROL_RSVB = 0xB,
CONTROL_RSVC = 0xC,
CONTROL_RSVD = 0xD,
CONTROL_RSVE = 0xE,
CONTROL_RSVF = 0xF
};
/// Check if an opcode is reserved
/**
* @param v The opcode to test.
* @return Whether or not the opcode is reserved.
*/
inline bool reserved(value v) {
return (v >= RSV3 && v <= RSV7) || (v >= CONTROL_RSVB && v <= CONTROL_RSVF);
}
/// Check if an opcode is invalid
/**
* Invalid opcodes are negative or require greater than 4 bits to store.
*
* @param v The opcode to test.
* @return Whether or not the opcode is invalid.
*/
inline bool invalid(value v) {
return (v > 0xF || v < 0);
}
/// Check if an opcode is for a control frame
/**
* @param v The opcode to test.
* @return Whether or not the opcode is a control opcode.
*/
inline bool is_control(value v) {
return v >= 0x8;
}
}
namespace CLOSECODE {
enum value {
NORMAL = 1000,
AWAY = 1001,
TERM = 1002,
INV_TYPE = 1003,
INV_DATA = 1007,
VIOLATION = 1008,
BIG_MSG = 1009,
UNEXPECTED= 1011
};
}
namespace CON_MODE {
enum value {
INIT = 0,
RAW = 1,
WEBSOCKET = 2
};
}
class JsonProcessor;
class QTcpSocket;
/// ///
/// The Connection object created by \a JsonServer when a new connection is establshed /// The Connection object created by \a JsonServer when a new connection is establshed
@ -105,117 +24,28 @@ public:
/// ///
JsonClientConnection(QTcpSocket * socket); JsonClientConnection(QTcpSocket * socket);
/// signals:
/// Destructor void connectionClosed();
///
~JsonClientConnection();
struct WebSocketHeader
{
bool fin;
quint8 opCode;
bool masked;
quint64 payloadLength;
char key[4];
};
public slots: public slots:
qint64 sendMessage(QJsonObject); qint64 sendMessage(QJsonObject);
signals:
///
/// Signal which is emitted when the connection is being closed
/// @param connection This connection object
///
void connectionClosed(JsonClientConnection * connection);
private slots: private slots:
/// ///
/// Slot called when new data has arrived /// Slot called when new data has arrived
/// ///
void readData(); void readRequest();
///
/// Slot called when this connection is being closed
///
void socketClosed();
void disconnected();
private: private:
QTcpSocket* _socket;
/// new instance of JsonProcessor /// new instance of JsonProcessor
JsonProcessor * _jsonProcessor; JsonProcessor * _jsonProcessor;
///
/// Do handshake for a websocket connection
///
void doWebSocketHandshake();
///
/// Handle incoming websocket data frame
///
void handleWebSocketFrame();
///
/// Handle incoming raw data frame
///
void handleRawJsonData();
///
/// create ws header from socket and decode it
///
QByteArray makeFrameHeader(quint8 opCode, quint64 payloadLength, bool lastFrame);
///
/// handle binary message
///
/// This function should be placed elsewhere ....
///
void handleBinaryMessage(QByteArray &data);
qint64 sendMessage_Raw(const char* data, quint64 size);
qint64 sendMessage_Raw(QByteArray &data);
qint64 sendMessage_Websockets(QByteArray &data);
void sendClose(int status, QString reason = "");
void getWsFrameHeader(WebSocketHeader* header);
/// The TCP-Socket that is connected tot the Json-client
QTcpSocket * _socket;
/// Link to Hyperion for writing led-values to a priority channel
Hyperion * _hyperion;
/// The buffer used for reading data from the socket /// The buffer used for reading data from the socket
QByteArray _receiveBuffer; QByteArray _receiveBuffer;
/// buffer for websockets multi frame receive
QByteArray _wsReceiveBuffer;
quint8 _maskKey[4];
/// used for WebSocket detection and connection handling
bool _webSocketHandshakeDone;
bool _onContinuation;
/// The logger instance /// The logger instance
Logger * _log; Logger * _log;
WebSocketHeader _wsh;
bool _notEnoughData;
/// address of client
QHostAddress _clientAddress;
CON_MODE::value _connectionMode;
// masks for fields in the basic header
static uint8_t const BHB0_OPCODE = 0x0F;
static uint8_t const BHB0_RSV3 = 0x10;
static uint8_t const BHB0_RSV2 = 0x20;
static uint8_t const BHB0_RSV1 = 0x40;
static uint8_t const BHB0_FIN = 0x80;
static uint8_t const BHB1_PAYLOAD = 0x7F;
static uint8_t const BHB1_MASK = 0x80;
static uint8_t const payload_size_code_16bit = 0x7E; // 126
static uint8_t const payload_size_code_64bit = 0x7F; // 127
}; };

View File

@ -57,21 +57,23 @@ uint16_t JsonServer::getPort() const
void JsonServer::newConnection() void JsonServer::newConnection()
{ {
QTcpSocket * socket = _server.nextPendingConnection(); while(_server.hasPendingConnections())
if (socket != nullptr)
{ {
Debug(_log, "New connection from: %s ",socket->localAddress().toString().toStdString().c_str()); if (QTcpSocket * socket = _server.nextPendingConnection())
JsonClientConnection * connection = new JsonClientConnection(socket); {
_openConnections.insert(connection); Debug(_log, "New connection from: %s ",socket->localAddress().toString().toStdString().c_str());
JsonClientConnection * connection = new JsonClientConnection(socket);
_openConnections.insert(connection);
// register slot for cleaning up after the connection closed // register slot for cleaning up after the connection closed
connect(connection, SIGNAL(connectionClosed(JsonClientConnection*)), this, SLOT(closedConnection(JsonClientConnection*))); connect(connection, &JsonClientConnection::connectionClosed, this, &JsonServer::closedConnection);
}
} }
} }
void JsonServer::closedConnection(JsonClientConnection *connection) void JsonServer::closedConnection(void)
{ {
JsonClientConnection* connection = qobject_cast<JsonClientConnection*>(sender());
Debug(_log, "Connection closed"); Debug(_log, "Connection closed");
_openConnections.remove(connection); _openConnections.remove(connection);

View File

@ -35,8 +35,8 @@ using namespace hyperion;
std::map<hyperion::Components, bool> JsonProcessor::_componentsPrevState; std::map<hyperion::Components, bool> JsonProcessor::_componentsPrevState;
JsonProcessor::JsonProcessor(QString peerAddress, Logger* log, bool noListener) JsonProcessor::JsonProcessor(QString peerAddress, Logger* log, QObject* parent, bool noListener)
: QObject() : QObject(parent)
, _peerAddress(peerAddress) , _peerAddress(peerAddress)
, _log(log) , _log(log)
, _hyperion(Hyperion::getInstance()) , _hyperion(Hyperion::getInstance())
@ -61,16 +61,8 @@ JsonProcessor::JsonProcessor(QString peerAddress, Logger* log, bool noListener)
_image_stream_mutex.unlock(); _image_stream_mutex.unlock();
} }
JsonProcessor::~JsonProcessor() void JsonProcessor::handleMessage(const QString& messageString)
{ {
}
void JsonProcessor::handleMessage(const QString& messageString, const QString peerAddress)
{
if(!peerAddress.isNull())
_peerAddress = peerAddress;
const QString ident = "JsonRpc@"+_peerAddress; const QString ident = "JsonRpc@"+_peerAddress;
Q_INIT_RESOURCE(JSONRPC_schemas); Q_INIT_RESOURCE(JSONRPC_schemas);
QJsonObject message; QJsonObject message;
@ -808,7 +800,7 @@ void JsonProcessor::handleConfigSetCommand(const QJsonObject& message, const QSt
else if (!validate.first && validate.second) else if (!validate.first && validate.second)
{ {
Warning(_log,"Errors have been found in the configuration file. Automatic correction is applied"); Warning(_log,"Errors have been found in the configuration file. Automatic correction is applied");
QStringList schemaErrors = schemaChecker.getMessages(); QStringList schemaErrors = schemaChecker.getMessages();
for (auto & schemaError : schemaErrors) for (auto & schemaError : schemaErrors)
Info(_log, QSTRING_CSTR(schemaError)); Info(_log, QSTRING_CSTR(schemaError));

View File

@ -4,6 +4,8 @@
#include "QtHttpReply.h" #include "QtHttpReply.h"
#include "QtHttpServer.h" #include "QtHttpServer.h"
#include "QtHttpHeader.h" #include "QtHttpHeader.h"
#include "WebSocketClient.h"
#include "WebJsonRpc.h"
#include <QCryptographicHash> #include <QCryptographicHash>
#include <QTcpSocket> #include <QTcpSocket>
@ -109,7 +111,18 @@ void QtHttpClientWrapper::onClientDataReceived (void) {
} }
switch (m_parsingStatus) { // handle parsing status end/error switch (m_parsingStatus) { // handle parsing status end/error
case RequestParsed: { // a valid request has ben fully parsed case RequestParsed: { // a valid request has ben fully parsed
// add post data to request // Catch websocket header "Upgrade"
if(m_currentRequest->getHeader(QtHttpHeader::Upgrade) == "websocket")
{
if(m_websocketClient == Q_NULLPTR)
{
// disconnect this slot from socket for further requests
disconnect(m_sockClient, &QTcpSocket::readyRead, this, &QtHttpClientWrapper::onClientDataReceived);
m_websocketClient = new WebSocketClient(m_currentRequest, m_sockClient, this);
}
break;
}
// add post data to request and catch /jsonrpc subroute url
if ( m_currentRequest->getCommand() == "POST") if ( m_currentRequest->getCommand() == "POST")
{ {
QtHttpPostData postData; QtHttpPostData postData;
@ -126,12 +139,27 @@ void QtHttpClientWrapper::onClientDataReceived (void) {
postData.insert(QString::fromUtf8(keyValue.at(0)),value); postData.insert(QString::fromUtf8(keyValue.at(0)),value);
} }
m_currentRequest->setPostData(postData); m_currentRequest->setPostData(postData);
// catch /jsonrpc in url, we need async callback, StaticFileServing is sync
QString path = m_currentRequest->getUrl ().path ();
QStringList uri_parts = path.split('/', QString::SkipEmptyParts);
if ( ! uri_parts.empty() && uri_parts.at(0) == "json-rpc" )
{
if(m_webJsonRpc == Q_NULLPTR)
{
m_webJsonRpc = new WebJsonRpc(m_currentRequest, m_serverHandle, this);
}
m_webJsonRpc->handleMessage(m_currentRequest);
break;
}
} }
QtHttpReply reply (m_serverHandle); QtHttpReply reply (m_serverHandle);
connect (&reply, &QtHttpReply::requestSendHeaders, connect (&reply, &QtHttpReply::requestSendHeaders,
this, &QtHttpClientWrapper::onReplySendHeadersRequested); this, &QtHttpClientWrapper::onReplySendHeadersRequested);
connect (&reply, &QtHttpReply::requestSendData, connect (&reply, &QtHttpReply::requestSendData,
this, &QtHttpClientWrapper::onReplySendDataRequested); this, &QtHttpClientWrapper::onReplySendDataRequested);
emit m_serverHandle->requestNeedsReply (m_currentRequest, &reply); // allow app to handle request emit m_serverHandle->requestNeedsReply (m_currentRequest, &reply); // allow app to handle request
m_parsingStatus = sendReplyToClient (&reply); m_parsingStatus = sendReplyToClient (&reply);
break; break;
@ -201,8 +229,16 @@ void QtHttpClientWrapper::onReplySendDataRequested (void) {
} }
} }
void QtHttpClientWrapper::sendToClientWithReply(QtHttpReply * reply) {
connect (reply, &QtHttpReply::requestSendHeaders,
this, &QtHttpClientWrapper::onReplySendHeadersRequested);
connect (reply, &QtHttpReply::requestSendData,
this, &QtHttpClientWrapper::onReplySendDataRequested);
m_parsingStatus = sendReplyToClient (reply);
}
QtHttpClientWrapper::ParsingStatus QtHttpClientWrapper::sendReplyToClient (QtHttpReply * reply) { QtHttpClientWrapper::ParsingStatus QtHttpClientWrapper::sendReplyToClient (QtHttpReply * reply) {
if (reply != Q_NULLPTR) { if (reply != Q_NULLPTR) {
if (!reply->useChunked ()) { if (!reply->useChunked ()) {
//reply->appendRawData (CRLF); //reply->appendRawData (CRLF);
// send all headers and all data in one shot // send all headers and all data in one shot

View File

@ -9,6 +9,8 @@ class QTcpSocket;
class QtHttpRequest; class QtHttpRequest;
class QtHttpReply; class QtHttpReply;
class QtHttpServer; class QtHttpServer;
class WebSocketClient;
class WebJsonRpc;
class QtHttpClientWrapper : public QObject { class QtHttpClientWrapper : public QObject {
Q_OBJECT Q_OBJECT
@ -29,6 +31,8 @@ public:
}; };
QString getGuid (void); QString getGuid (void);
/// @brief Wrapper for sendReplyToClient(), handles m_parsingStatus and signal connect
void sendToClientWithReply (QtHttpReply * reply);
private slots: private slots:
void onClientDataReceived (void); void onClientDataReceived (void);
@ -41,11 +45,13 @@ protected slots:
void onReplySendDataRequested (void); void onReplySendDataRequested (void);
private: private:
QString m_guid; QString m_guid;
ParsingStatus m_parsingStatus; ParsingStatus m_parsingStatus;
QTcpSocket * m_sockClient; QTcpSocket * m_sockClient;
QtHttpRequest * m_currentRequest; QtHttpRequest * m_currentRequest;
QtHttpServer * m_serverHandle; QtHttpServer * m_serverHandle;
WebSocketClient * m_websocketClient = nullptr;
WebJsonRpc * m_webJsonRpc = nullptr;
}; };
#endif // QTHTTPCLIENTWRAPPER_H #endif // QTHTTPCLIENTWRAPPER_H

View File

@ -3,30 +3,34 @@
#include <QByteArray> #include <QByteArray>
const QByteArray & QtHttpHeader::Server = QByteArrayLiteral ("Server"); const QByteArray & QtHttpHeader::Server = QByteArrayLiteral ("Server");
const QByteArray & QtHttpHeader::Date = QByteArrayLiteral ("Date"); const QByteArray & QtHttpHeader::Date = QByteArrayLiteral ("Date");
const QByteArray & QtHttpHeader::Host = QByteArrayLiteral ("Host"); const QByteArray & QtHttpHeader::Host = QByteArrayLiteral ("Host");
const QByteArray & QtHttpHeader::Accept = QByteArrayLiteral ("Accept"); const QByteArray & QtHttpHeader::Accept = QByteArrayLiteral ("Accept");
const QByteArray & QtHttpHeader::Cookie = QByteArrayLiteral ("Cookie"); const QByteArray & QtHttpHeader::Cookie = QByteArrayLiteral ("Cookie");
const QByteArray & QtHttpHeader::ContentType = QByteArrayLiteral ("Content-Type"); const QByteArray & QtHttpHeader::ContentType = QByteArrayLiteral ("Content-Type");
const QByteArray & QtHttpHeader::ContentLength = QByteArrayLiteral ("Content-Length"); const QByteArray & QtHttpHeader::ContentLength = QByteArrayLiteral ("Content-Length");
const QByteArray & QtHttpHeader::Connection = QByteArrayLiteral ("Connection"); const QByteArray & QtHttpHeader::Connection = QByteArrayLiteral ("Connection");
const QByteArray & QtHttpHeader::UserAgent = QByteArrayLiteral ("User-Agent"); const QByteArray & QtHttpHeader::UserAgent = QByteArrayLiteral ("User-Agent");
const QByteArray & QtHttpHeader::AcceptCharset = QByteArrayLiteral ("Accept-Charset"); const QByteArray & QtHttpHeader::AcceptCharset = QByteArrayLiteral ("Accept-Charset");
const QByteArray & QtHttpHeader::AcceptEncoding = QByteArrayLiteral ("Accept-Encoding"); const QByteArray & QtHttpHeader::AcceptEncoding = QByteArrayLiteral ("Accept-Encoding");
const QByteArray & QtHttpHeader::AcceptLanguage = QByteArrayLiteral ("Accept-Language"); const QByteArray & QtHttpHeader::AcceptLanguage = QByteArrayLiteral ("Accept-Language");
const QByteArray & QtHttpHeader::Authorization = QByteArrayLiteral ("Authorization"); const QByteArray & QtHttpHeader::Authorization = QByteArrayLiteral ("Authorization");
const QByteArray & QtHttpHeader::CacheControl = QByteArrayLiteral ("Cache-Control"); const QByteArray & QtHttpHeader::CacheControl = QByteArrayLiteral ("Cache-Control");
const QByteArray & QtHttpHeader::ContentMD5 = QByteArrayLiteral ("Content-MD5"); const QByteArray & QtHttpHeader::ContentMD5 = QByteArrayLiteral ("Content-MD5");
const QByteArray & QtHttpHeader::ProxyAuthorization = QByteArrayLiteral ("Proxy-Authorization"); const QByteArray & QtHttpHeader::ProxyAuthorization = QByteArrayLiteral ("Proxy-Authorization");
const QByteArray & QtHttpHeader::Range = QByteArrayLiteral ("Range"); const QByteArray & QtHttpHeader::Range = QByteArrayLiteral ("Range");
const QByteArray & QtHttpHeader::ContentEncoding = QByteArrayLiteral ("Content-Encoding"); const QByteArray & QtHttpHeader::ContentEncoding = QByteArrayLiteral ("Content-Encoding");
const QByteArray & QtHttpHeader::ContentLanguage = QByteArrayLiteral ("Content-Language"); const QByteArray & QtHttpHeader::ContentLanguage = QByteArrayLiteral ("Content-Language");
const QByteArray & QtHttpHeader::ContentLocation = QByteArrayLiteral ("Content-Location"); const QByteArray & QtHttpHeader::ContentLocation = QByteArrayLiteral ("Content-Location");
const QByteArray & QtHttpHeader::ContentRange = QByteArrayLiteral ("Content-Range"); const QByteArray & QtHttpHeader::ContentRange = QByteArrayLiteral ("Content-Range");
const QByteArray & QtHttpHeader::Expires = QByteArrayLiteral ("Expires"); const QByteArray & QtHttpHeader::Expires = QByteArrayLiteral ("Expires");
const QByteArray & QtHttpHeader::LastModified = QByteArrayLiteral ("Last-Modified"); const QByteArray & QtHttpHeader::LastModified = QByteArrayLiteral ("Last-Modified");
const QByteArray & QtHttpHeader::Location = QByteArrayLiteral ("Location"); const QByteArray & QtHttpHeader::Location = QByteArrayLiteral ("Location");
const QByteArray & QtHttpHeader::SetCookie = QByteArrayLiteral ("Set-Cookie"); const QByteArray & QtHttpHeader::SetCookie = QByteArrayLiteral ("Set-Cookie");
const QByteArray & QtHttpHeader::TransferEncoding = QByteArrayLiteral ("Transfer-Encoding"); const QByteArray & QtHttpHeader::TransferEncoding = QByteArrayLiteral ("Transfer-Encoding");
const QByteArray & QtHttpHeader::ContentDisposition = QByteArrayLiteral ("Content-Disposition"); const QByteArray & QtHttpHeader::ContentDisposition = QByteArrayLiteral ("Content-Disposition");
const QByteArray & QtHttpHeader::Upgrade = QByteArrayLiteral ("Upgrade");
const QByteArray & QtHttpHeader::SecWebSocketKey = QByteArrayLiteral ("Sec-WebSocket-Key");
const QByteArray & QtHttpHeader::SecWebSocketProtocol = QByteArrayLiteral ("Sec-WebSocket-Protocol");
const QByteArray & QtHttpHeader::SecWebSocketVersion = QByteArrayLiteral ("Sec-WebSocket-Version");

View File

@ -32,6 +32,11 @@ public:
static const QByteArray & SetCookie; static const QByteArray & SetCookie;
static const QByteArray & TransferEncoding; static const QByteArray & TransferEncoding;
static const QByteArray & ContentDisposition; static const QByteArray & ContentDisposition;
// Websocket specific headers
static const QByteArray & Upgrade;
static const QByteArray & SecWebSocketKey;
static const QByteArray & SecWebSocketProtocol;
static const QByteArray & SecWebSocketVersion;
}; };
#endif // QTHTTPHEADER_H #endif // QTHTTPHEADER_H

View File

@ -4,7 +4,6 @@
#include "QtHttpReply.h" #include "QtHttpReply.h"
#include "QtHttpClientWrapper.h" #include "QtHttpClientWrapper.h"
#include <QDebug>
#include <QUrlQuery> #include <QUrlQuery>
const QString & QtHttpServer::HTTP_VERSION = QStringLiteral ("HTTP/1.1"); const QString & QtHttpServer::HTTP_VERSION = QStringLiteral ("HTTP/1.1");

View File

@ -34,7 +34,6 @@ StaticFileServing::StaticFileServing (Hyperion *hyperion, QString baseUrl, quint
connect (_server, &QtHttpServer::requestNeedsReply, this, &StaticFileServing::onRequestNeedsReply); connect (_server, &QtHttpServer::requestNeedsReply, this, &StaticFileServing::onRequestNeedsReply);
_server->start (port); _server->start (port);
} }
StaticFileServing::~StaticFileServing () StaticFileServing::~StaticFileServing ()
@ -60,14 +59,10 @@ void StaticFileServing::onServerStarted (quint16 port)
txtRecord txtRecord
); );
Debug(_log, "Web Config mDNS responder started"); Debug(_log, "Web Config mDNS responder started");
// json-rpc for http
_jsonProcessor = new JsonProcessor(QString("HTTP-API"), _log, true);
} }
void StaticFileServing::onServerStopped () { void StaticFileServing::onServerStopped () {
Info(_log, "stopped %s", _server->getServerName().toStdString().c_str()); Info(_log, "stopped %s", _server->getServerName().toStdString().c_str());
delete _jsonProcessor;
} }
void StaticFileServing::onServerError (QString msg) void StaticFileServing::onServerError (QString msg)
@ -113,50 +108,30 @@ void StaticFileServing::printErrorToReply (QtHttpReply * reply, QtHttpReply::Sta
void StaticFileServing::onRequestNeedsReply (QtHttpRequest * request, QtHttpReply * reply) void StaticFileServing::onRequestNeedsReply (QtHttpRequest * request, QtHttpReply * reply)
{ {
QString command = request->getCommand (); QString command = request->getCommand ();
if (command == QStringLiteral ("GET") || command == QStringLiteral ("POST")) if (command == QStringLiteral ("GET"))
{ {
QString path = request->getUrl ().path (); QString path = request->getUrl ().path ();
QStringList uri_parts = path.split('/', QString::SkipEmptyParts); QStringList uri_parts = path.split('/', QString::SkipEmptyParts);
// special uri handling for server commands // special uri handling for server commands
if ( ! uri_parts.empty() ) if ( ! uri_parts.empty() && uri_parts.at(0) == "cgi" )
{ {
if(uri_parts.at(0) == "cgi") uri_parts.removeAt(0);
try
{ {
uri_parts.removeAt(0); _cgi.exec(uri_parts, request, reply);
try
{
_cgi.exec(uri_parts, request, reply);
}
catch(int err)
{
Error(_log,"Exception while executing cgi %s : %d", path.toStdString().c_str(), err);
printErrorToReply (reply, QtHttpReply::InternalError, "script failed (" % path % ")");
}
catch(std::exception &e)
{
Error(_log,"Exception while executing cgi %s : %s", path.toStdString().c_str(), e.what());
printErrorToReply (reply, QtHttpReply::InternalError, "script failed (" % path % ")");
}
return;
} }
else if ( uri_parts.at(0) == "json-rpc" ) catch(int err)
{ {
QMetaObject::Connection m_connection; Error(_log,"Exception while executing cgi %s : %d", path.toStdString().c_str(), err);
QByteArray data = request->getRawData(); printErrorToReply (reply, QtHttpReply::InternalError, "script failed (" % path % ")");
QtHttpRequest::ClientInfo info = request->getClientInfo();
m_connection = QObject::connect(_jsonProcessor, &JsonProcessor::callbackMessage,
[reply](QJsonObject result) {
QJsonDocument doc(result);
reply->addHeader ("Content-Type", "application/json");
reply->appendRawData (doc.toJson());
});
_jsonProcessor->handleMessage(data,info.clientAddress.toString());
QObject::disconnect( m_connection );
return;
} }
catch(std::exception &e)
{
Error(_log,"Exception while executing cgi %s : %s", path.toStdString().c_str(), e.what());
printErrorToReply (reply, QtHttpReply::InternalError, "script failed (" % path % ")");
}
return;
} }
Q_INIT_RESOURCE(WebConfig); Q_INIT_RESOURCE(WebConfig);

View File

@ -12,8 +12,6 @@
#include <hyperion/Hyperion.h> #include <hyperion/Hyperion.h>
#include <utils/Logger.h> #include <utils/Logger.h>
#include <utils/JsonProcessor.h>
class StaticFileServing : public QObject { class StaticFileServing : public QObject {
Q_OBJECT Q_OBJECT
@ -35,7 +33,6 @@ private:
QMimeDatabase * _mimeDb; QMimeDatabase * _mimeDb;
CgiHandler _cgi; CgiHandler _cgi;
Logger * _log; Logger * _log;
JsonProcessor * _jsonProcessor;
void printErrorToReply (QtHttpReply * reply, QtHttpReply::StatusCode code, QString errorMessage); void printErrorToReply (QtHttpReply * reply, QtHttpReply::StatusCode code, QString errorMessage);

View File

@ -0,0 +1,38 @@
#include "WebJsonRpc.h"
#include "QtHttpReply.h"
#include "QtHttpRequest.h"
#include "QtHttpServer.h"
#include "QtHttpClientWrapper.h"
#include <utils/JsonProcessor.h>
WebJsonRpc::WebJsonRpc(QtHttpRequest* request, QtHttpServer* server, QtHttpClientWrapper* parent)
: QObject(parent)
, _server(server)
, _wrapper(parent)
, _log(Logger::getInstance("HTTPJSONRPC"))
{
const QString client = request->getClientInfo().clientAddress.toString();
_jsonProcessor = new JsonProcessor(client, _log, this, true);
connect(_jsonProcessor, &JsonProcessor::callbackMessage, this, &WebJsonRpc::handleCallback);
}
void WebJsonRpc::handleMessage(QtHttpRequest* request)
{
QByteArray data = request->getRawData();
_unlocked = true;
_jsonProcessor->handleMessage(data);
}
void WebJsonRpc::handleCallback(QJsonObject obj)
{
// guard against wrong callbacks; TODO: Remove when JsonProcessor is more solid
if(!_unlocked) return;
_unlocked = false;
// construct reply with headers timestamp and server name
QtHttpReply reply(_server);
QJsonDocument doc(obj);
reply.addHeader ("Content-Type", "application/json");
reply.appendRawData (doc.toJson());
_wrapper->sendToClientWithReply(&reply);
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <utils/Logger.h>
class QtHttpServer;
class QtHttpRequest;
class QtHttpClientWrapper;
class JsonProcessor;
class WebJsonRpc : public QObject {
Q_OBJECT
public:
WebJsonRpc(QtHttpRequest* request, QtHttpServer* server, QtHttpClientWrapper* parent);
void handleMessage(QtHttpRequest* request);
private:
QtHttpServer* _server;
QtHttpClientWrapper* _wrapper;
Logger* _log;
JsonProcessor* _jsonProcessor;
bool _unlocked = false;
private slots:
void handleCallback(QJsonObject obj);
};

View File

@ -0,0 +1,336 @@
#include "WebSocketClient.h"
#include "QtHttpRequest.h"
#include "QtHttpHeader.h"
#include <hyperion/Hyperion.h>
#include <utils/JsonProcessor.h>
#include <QTcpSocket>
#include <QtEndian>
#include <QCryptographicHash>
WebSocketClient::WebSocketClient(QtHttpRequest* request, QTcpSocket* sock, QObject* parent)
: QObject(parent)
, _socket(sock)
, _log(Logger::getInstance("WEBSOCKET"))
, _hyperion(Hyperion::getInstance())
{
// connect socket; disconnect handled from QtHttpServer
connect(_socket, &QTcpSocket::readyRead , this, &WebSocketClient::handleWebSocketFrame);
// QtHttpRequest contains all headers for handshake
QByteArray secWebSocketKey = request->getHeader(QtHttpHeader::SecWebSocketKey);
const QString client = request->getClientInfo().clientAddress.toString();
// Json processor
_jsonProcessor = new JsonProcessor(client, _log, this);
connect(_jsonProcessor, &JsonProcessor::callbackMessage, this, &WebSocketClient::sendMessage);
Debug(_log, "New connection from %s", QSTRING_CSTR(client));
// do handshake
secWebSocketKey += "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
QByteArray hash = QCryptographicHash::hash(secWebSocketKey, QCryptographicHash::Sha1).toBase64();
QString data
= QString("HTTP/1.1 101 Switching Protocols\r\n")
+ QString("Upgrade: websocket\r\n")
+ QString("Connection: Upgrade\r\n")
+ QString("Sec-WebSocket-Accept: ")+QString(hash.data()) + "\r\n\r\n";
_socket->write(QSTRING_CSTR(data), data.size());
_socket->flush();
}
void WebSocketClient::handleWebSocketFrame(void)
{
// we are on no continious reading from socket from call before
if (!_notEnoughData)
{
getWsFrameHeader(&_wsh);
}
if(_socket->bytesAvailable() < (qint64)_wsh.payloadLength)
{
//printf("not enough data %llu %llu\n", _socket->bytesAvailable(), _wsh.payloadLength);
_notEnoughData=true;
return;
}
_notEnoughData = false;
QByteArray buf = _socket->read(_wsh.payloadLength);
//printf("opcode %x payload bytes %llu avail: %llu\n", _wsh.opCode, _wsh.payloadLength, _socket->bytesAvailable());
if (OPCODE::invalid((OPCODE::value)_wsh.opCode))
{
sendClose(CLOSECODE::INV_TYPE, "invalid opcode");
return;
}
// check the type of data frame
bool isContinuation=false;
switch (_wsh.opCode)
{
case OPCODE::CONTINUATION:
isContinuation = true;
// no break here, just jump over to opcode text
case OPCODE::BINARY:
case OPCODE::TEXT:
{
// check for protocal violations
if (_onContinuation && !isContinuation)
{
sendClose(CLOSECODE::VIOLATION, "protocol violation, somebody sends frames in between continued frames");
return;
}
if (!_wsh.masked && _wsh.opCode == OPCODE::TEXT)
{
sendClose(CLOSECODE::VIOLATION, "protocol violation, unmasked text frames not allowed");
return;
}
// unmask data
for (int i=0; i < buf.size(); i++)
{
buf[i] = buf[i] ^ _wsh.key[i % 4];
}
_onContinuation = !_wsh.fin || isContinuation;
// frame contains text, extract it, append data if this is a continuation
if (_wsh.fin && ! isContinuation) // one frame
{
_wsReceiveBuffer.clear();
}
_wsReceiveBuffer.append(buf);
// this is the final frame, decode and handle data
if (_wsh.fin)
{
_onContinuation = false;
if (_wsh.opCode == OPCODE::TEXT)
{
_jsonProcessor->handleMessage(QString(_wsReceiveBuffer));
}
else
{
handleBinaryMessage(_wsReceiveBuffer);
}
_wsReceiveBuffer.clear();
}
}
break;
case OPCODE::CLOSE:
{
sendClose(CLOSECODE::NORMAL);
}
break;
case OPCODE::PING:
{
// ping received, send pong
quint8 pong[] = {OPCODE::PONG, 0};
_socket->write((const char*)pong, 2);
_socket->flush();
}
break;
case OPCODE::PONG:
{
Error(_log, "pong received, protocol violation!");
}
default:
Warning(_log, "strange %d\n%s\n", _wsh.opCode, QSTRING_CSTR(QString(buf)));
}
}
void WebSocketClient::getWsFrameHeader(WebSocketHeader* header)
{
char fin_rsv_opcode, mask_length;
_socket->getChar(&fin_rsv_opcode);
_socket->getChar(&mask_length);
header->fin = (fin_rsv_opcode & BHB0_FIN) == BHB0_FIN;
header->opCode = fin_rsv_opcode & BHB0_OPCODE;
header->masked = (mask_length & BHB1_MASK) == BHB1_MASK;
header->payloadLength = mask_length & BHB1_PAYLOAD;
// get size of payload
switch (header->payloadLength)
{
case payload_size_code_16bit:
{
QByteArray buf = _socket->read(2);
header->payloadLength = ((buf.at(0) << 8) & 0xFF00) | (buf.at(1) & 0xFF);
}
break;
case payload_size_code_64bit:
{
QByteArray buf = _socket->read(8);
header->payloadLength = 0;
for (uint i=0; i < 8; i++)
{
header->payloadLength |= ((quint64)(buf.at(i) & 0xFF)) << (8*(7-i));
}
}
break;
}
// if the data is masked we need to get the key for unmasking
if (header->masked)
{
_socket->read(header->key, 4);
}
}
/// See http://tools.ietf.org/html/rfc6455#section-5.2 for more information
void WebSocketClient::sendClose(int status, QString reason)
{
Debug(_log, "send close: %d %s", status, QSTRING_CSTR(reason));
ErrorIf(!reason.isEmpty(), _log, QSTRING_CSTR(reason));
_receiveBuffer.clear();
QByteArray sendBuffer;
sendBuffer.append(136+(status-1000));
int length = reason.size();
if(length >= 126)
{
sendBuffer.append( (length > 0xffff) ? 127 : 126);
int num_bytes = (length > 0xffff) ? 8 : 2;
for(int c = num_bytes - 1; c != -1; c--)
{
sendBuffer.append( quint8((static_cast<unsigned long long>(length) >> (8 * c)) % 256));
}
}
else
{
sendBuffer.append(quint8(length));
}
sendBuffer.append(reason);
_socket->write(sendBuffer);
_socket->flush();
_socket->close();
}
void WebSocketClient::handleBinaryMessage(QByteArray &data)
{
uint8_t priority = data.at(0);
unsigned duration_s = data.at(1);
unsigned imgSize = data.size() - 4;
unsigned width = ((data.at(2) << 8) & 0xFF00) | (data.at(3) & 0xFF);
unsigned height = imgSize / width;
if ( ! (imgSize) % width)
{
Error(_log, "data size is not multiple of width");
return;
}
Image<ColorRgb> image;
image.resize(width, height);
memcpy(image.memptr(), data.data()+4, imgSize);
_hyperion->setImage(priority, image, duration_s*1000);
}
qint64 WebSocketClient::sendMessage(QJsonObject obj)
{
QJsonDocument writer(obj);
QByteArray data = writer.toJson(QJsonDocument::Compact) + "\n";
if (!_socket || (_socket->state() != QAbstractSocket::ConnectedState)) return 0;
qint64 payloadWritten = 0;
quint32 payloadSize = data.size();
const char * payload = data.data();
qint32 numFrames = payloadSize / FRAME_SIZE_IN_BYTES + ((quint64(payloadSize) % FRAME_SIZE_IN_BYTES) > 0 ? 1 : 0);
for (int i = 0; i < numFrames; ++i)
{
const bool isLastFrame = (i == (numFrames - 1));
quint64 position = i * FRAME_SIZE_IN_BYTES;
quint32 frameSize = (payloadSize-position >= FRAME_SIZE_IN_BYTES) ? FRAME_SIZE_IN_BYTES : (payloadSize-position);
QByteArray buf = makeFrameHeader(OPCODE::TEXT, frameSize, isLastFrame);
sendMessage_Raw(buf);
qint64 written = sendMessage_Raw(payload+position,frameSize);
if (written > 0)
{
payloadWritten += written;
}
else
{
_socket->flush();
Error(_log, "Error writing bytes to socket: %s", QSTRING_CSTR(_socket->errorString()));
break;
}
}
if (payloadSize != payloadWritten)
{
Error(_log, "Error writing bytes to socket %d bytes from %d written", payloadWritten, payloadSize);
return -1;
}
return payloadWritten;
}
qint64 WebSocketClient::sendMessage_Raw(const char* data, quint64 size)
{
return _socket->write(data, size);
}
qint64 WebSocketClient::sendMessage_Raw(QByteArray &data)
{
return _socket->write(data.data(), data.size());
}
QByteArray WebSocketClient::makeFrameHeader(quint8 opCode, quint64 payloadLength, bool lastFrame)
{
QByteArray header;
if (payloadLength <= 0x7FFFFFFFFFFFFFFFULL)
{
//FIN, RSV1-3, opcode (RSV-1, RSV-2 and RSV-3 are zero)
quint8 byte = static_cast<quint8>((opCode & 0x0F) | (lastFrame ? 0x80 : 0x00));
header.append(static_cast<char>(byte));
byte = 0x00;
if (payloadLength <= 125)
{
byte |= static_cast<quint8>(payloadLength);
header.append(static_cast<char>(byte));
}
else if (payloadLength <= 0xFFFFU)
{
byte |= 126;
header.append(static_cast<char>(byte));
quint16 swapped = qToBigEndian<quint16>(static_cast<quint16>(payloadLength));
header.append(static_cast<const char *>(static_cast<const void *>(&swapped)), 2);
}
else
{
byte |= 127;
header.append(static_cast<char>(byte));
quint64 swapped = qToBigEndian<quint64>(payloadLength);
header.append(static_cast<const char *>(static_cast<const void *>(&swapped)), 8);
}
}
else
{
Error(_log, "Payload too big!");
}
return header;
}

View File

@ -0,0 +1,72 @@
#pragma once
#include <utils/Logger.h>
#include "WebSocketUtils.h"
class QTcpSocket;
class QtHttpRequest;
class Hyperion;
class JsonProcessor;
class WebSocketClient : public QObject {
Q_OBJECT
public:
WebSocketClient(QtHttpRequest* request, QTcpSocket* sock, QObject* parent);
struct WebSocketHeader
{
bool fin;
quint8 opCode;
bool masked;
quint64 payloadLength;
char key[4];
};
private:
QTcpSocket* _socket;
Logger* _log;
Hyperion* _hyperion;
JsonProcessor* _jsonProcessor;
void getWsFrameHeader(WebSocketHeader* header);
void sendClose(int status, QString reason = "");
void handleBinaryMessage(QByteArray &data);
qint64 sendMessage_Raw(const char* data, quint64 size);
qint64 sendMessage_Raw(QByteArray &data);
QByteArray makeFrameHeader(quint8 opCode, quint64 payloadLength, bool lastFrame);
/// The buffer used for reading data from the socket
QByteArray _receiveBuffer;
/// buffer for websockets multi frame receive
QByteArray _wsReceiveBuffer;
quint8 _maskKey[4];
bool _onContinuation = false;
// true when data is missing for parsing
bool _notEnoughData = false;
// websocket header store
WebSocketHeader _wsh;
// masks for fields in the basic header
static uint8_t const BHB0_OPCODE = 0x0F;
static uint8_t const BHB0_RSV3 = 0x10;
static uint8_t const BHB0_RSV2 = 0x20;
static uint8_t const BHB0_RSV1 = 0x40;
static uint8_t const BHB0_FIN = 0x80;
static uint8_t const BHB1_PAYLOAD = 0x7F;
static uint8_t const BHB1_MASK = 0x80;
static uint8_t const payload_size_code_16bit = 0x7E; // 126
static uint8_t const payload_size_code_64bit = 0x7F; // 127
static const quint64 FRAME_SIZE_IN_BYTES = 512 * 512 * 2; //maximum size of a frame when sending a message
private slots:
void handleWebSocketFrame(void);
qint64 sendMessage(QJsonObject obj);
};

View File

@ -0,0 +1,68 @@
#pragma once
/// Constants and utility functions related to WebSocket opcodes
/**
* WebSocket Opcodes are 4 bits. See RFC6455 section 5.2.
*/
namespace OPCODE {
enum value {
CONTINUATION = 0x0,
TEXT = 0x1,
BINARY = 0x2,
RSV3 = 0x3,
RSV4 = 0x4,
RSV5 = 0x5,
RSV6 = 0x6,
RSV7 = 0x7,
CLOSE = 0x8,
PING = 0x9,
PONG = 0xA,
CONTROL_RSVB = 0xB,
CONTROL_RSVC = 0xC,
CONTROL_RSVD = 0xD,
CONTROL_RSVE = 0xE,
CONTROL_RSVF = 0xF
};
/// Check if an opcode is reserved
/**
* @param v The opcode to test.
* @return Whether or not the opcode is reserved.
*/
inline bool reserved(value v) {
return (v >= RSV3 && v <= RSV7) || (v >= CONTROL_RSVB && v <= CONTROL_RSVF);
}
/// Check if an opcode is invalid
/**
* Invalid opcodes are negative or require greater than 4 bits to store.
*
* @param v The opcode to test.
* @return Whether or not the opcode is invalid.
*/
inline bool invalid(value v) {
return (v > 0xF || v < 0);
}
/// Check if an opcode is for a control frame
/**
* @param v The opcode to test.
* @return Whether or not the opcode is a control opcode.
*/
inline bool is_control(value v) {
return v >= 0x8;
}
}
namespace CLOSECODE {
enum value {
NORMAL = 1000,
AWAY = 1001,
TERM = 1002,
INV_TYPE = 1003,
INV_DATA = 1007,
VIOLATION = 1008,
BIG_MSG = 1009,
UNEXPECTED= 1011
};
}