diff options
author | AGaliuzov <AGaliuzov@luxoft.com> | 2015-09-25 17:01:00 +0300 |
---|---|---|
committer | AGaliuzov <AGaliuzov@luxoft.com> | 2015-09-25 17:01:00 +0300 |
commit | a2f5a1c6de476ec8d4fa11b873c16d25a109231a (patch) | |
tree | 116005c283ff2ab563f90123d38dc0a196eb3477 | |
parent | 1c997837f63833c13026def2f4c6a468f2553a7e (diff) | |
parent | 1182cde5e1e8a878029a68a2dc333add5e8257a7 (diff) | |
download | smartdevicelink-a2f5a1c6de476ec8d4fa11b873c16d25a109231a.tar.gz |
Merge pull request #215 from LuxoftSDL/hotfix/Remove_invalid_messages_from_MessageBrokers_buffer
Remove parsed JSON and invalid messages from MessageBroker's buffer
3 files changed, 263 insertions, 57 deletions
diff --git a/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp b/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp index 952b250ae..c54204379 100644 --- a/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp +++ b/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp @@ -61,9 +61,10 @@ namespace NsMessageBroker * \brief Receive data from TCP server (from client). * \param fd FileDescriptor of socket. * \param aJSONData JSON string. + * \param tryHard give up on first JSON parse error or try to workaround it. */ - void onMessageReceived(int fd, std::string& aJSONData); - + void onMessageReceived(int fd, std::string& aJSONData, bool tryHard); + /** * \brief Test of buffer parsing. */ diff --git a/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp b/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp index 07f01ea12..3b14489a8 100644 --- a/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp +++ b/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp @@ -4,6 +4,7 @@ * \author AKara */ +#include <cassert> #include <stdio.h> #include <vector> @@ -216,6 +217,31 @@ class CMessageBroker_Private { int popMessageFromWaitQue(CMessage* pMessage); /** + * \brief Tries to remove the parsed part of the buffer + * \param root Parsed JSON value + * \param aJSONData The string buffer + * \return true on success, false on failure + */ + bool cutParsedJSON(const Json::Value& root, std::string& aJSONData); + + /** + * \brief Finds the position just after a JSON object or array in a buffer + * \param isObject Must be true for object, false for array + * \param aJSONData The string buffer + * \return The position in the buffer after the object or array on success, + * std::strin::npos on failure + */ + size_t jumpOverJSONObjectOrArray(bool isObject, const std::string& aJSONData); + + /** + * \brief Finds the position just after a JSON string in a buffer + * \param aJSONData The string buffer + * \return The position in the buffer after the string on success, + * std::strin::npos on failure + */ + size_t jumpOverJSONString(const std::string& aJSONData); + + /** * \brief Que of messages. */ std::deque<CMessage*> mMessagesQueue; @@ -288,35 +314,183 @@ CMessageBroker* CMessageBroker::getInstance() { return &instance; } -void CMessageBroker::onMessageReceived(int fd, std::string& aJSONData) { - DBG_MSG(("CMessageBroker::onMessageReceived()\n")); - while (!aJSONData.empty()) { - Json::Value root; - if (!p->m_reader.parse(aJSONData, root)) { - DBG_MSG(("Received not JSON string! %s\n", aJSONData.c_str())); - return; + +size_t CMessageBroker_Private::jumpOverJSONObjectOrArray(bool isObject, + const std::string& aJSONData) { + const char openBracket = isObject? '{' : '['; + const char closeBracket = isObject? '}' : ']'; + int open_minus_close_brackets(1); + size_t position = aJSONData.find(openBracket); // Find the beginning of the object + + while ((position != std::string::npos) && (open_minus_close_brackets > 0)) { + position = aJSONData.find_first_of(std::string("\"")+openBracket+closeBracket, + position+1); + if (std::string::npos == position) { + break; } - if(root["jsonrpc"]!="2.0") { - DBG_MSG(("\t Json::Reader::parce didn't set up jsonrpc! jsonrpc = '%s'\n", root["jsonrpc"].asString().c_str())); - return; + if ('"' == aJSONData[position]) { + // Ignore string interior, which might contain brackets and escaped "-s + do { + position = aJSONData.find('"', position+1); // Find the closing quote + } while ((std::string::npos != position) && ('\\' == aJSONData[position-1])); + } else if (openBracket == aJSONData[position]) { + ++open_minus_close_brackets; + } else if (closeBracket == aJSONData[position]) { + --open_minus_close_brackets; } - std::string wmes = p->m_recieverWriter.write(root); - DBG_MSG(("Parsed JSON string %d : %s\n", wmes.length(), - wmes.c_str())); - DBG_MSG(("Buffer is:%s\n", aJSONData.c_str())); - if (aJSONData.length() > wmes.length()) { - // wmes string length can differ from buffer substr length - size_t offset = wmes.length(); - char msg_begin = '{'; - if (aJSONData.at(offset) != msg_begin) { - offset -= 1; // wmes can contain redudant \n in the tail. + } + + if ((0 == open_minus_close_brackets) && (std::string::npos != position)) { + ++position; // Move after the closing bracket + } else { + position = std::string::npos; + } + + return position; +} + + +size_t CMessageBroker_Private::jumpOverJSONString(const std::string& aJSONData) { + size_t position = aJSONData.find('"'); // Find the beginning of the string + + do { + position = aJSONData.find('"', position+1); // Find the closing quote + } while ((std::string::npos != position) && ('\\' == aJSONData[position-1])); + + if (std::string::npos != position) { + ++position; // Move after the closing quote + } + + return position; +} + + +bool CMessageBroker_Private::cutParsedJSON(const Json::Value& root, + std::string& aJSONData) { + if (root.isNull() || aJSONData.empty()) { + DBG_MSG_ERROR(("JSON is null or the buffer is empty!\n")); + return false; + } + + std::string parsed_json_str = m_recieverWriter.write(root); + DBG_MSG(("Parsed JSON string: '%s'\n", parsed_json_str.c_str())); + + // Trim front spaces (if any) + const size_t nonempty_position = aJSONData.find_first_not_of(" \t\n\v\f\r"); + aJSONData.erase(0, nonempty_position); + if (std::string::npos == nonempty_position) { + DBG_MSG_ERROR(("Buffer contains only blanks!\n")); + return false; + } + + // JSON writer puts '\n' at the end. Remove it. + const size_t final_lf_pos = parsed_json_str.rfind('\n'); + if (final_lf_pos == parsed_json_str.length()-1) { + parsed_json_str.erase(final_lf_pos, 1); + } + + /* RFC 4627: "A JSON value MUST be an object, array, number, or string, or + * one of the following three literal names: false null true" + * So we will try to find the borders of the parsed part based on its type. */ + + size_t position(std::string::npos); + + if (0 == aJSONData.find(parsed_json_str)) { + // If by chance parsed JSON is the same in the buffer and is at the beginning + position = parsed_json_str.length(); + } else if (root.isObject() || root.isArray()) { + position = jumpOverJSONObjectOrArray(root.isObject(), aJSONData); + } else if (root.isString()) { + position = jumpOverJSONString(aJSONData); + } else if (root.isNumeric()) { + position = aJSONData.find_first_not_of("+-0123456789.eE"); + } else if (root.isBool() || ("null" == parsed_json_str)) { + position = aJSONData.find(parsed_json_str); + if (std::string::npos != position) { + position += parsed_json_str.length(); + } + } else { + DBG_MSG_ERROR(("Unknown JSON type!\n")); + } + + if (std::string::npos == position) { + DBG_MSG_ERROR(("Error finding JSON object boundaries!\n")); + /* This should not happen, because the string is already parsed as a + * valid JSON. If this happens then above code is wrong. It is better + * to assert() than just return here, because otherwise we may enter an + * endless cycle - fail to process one and the same message again and + * again. Or we may clear the buffer and return, but in this way we will + * loose the next messages, miss a bug here, and create another bug. */ + assert(std::string::npos != position); + return false; // For release version + } + + if ((position >= aJSONData.length()) || + ((position == aJSONData.length()-1) && isspace(aJSONData[position]))) { + // No next object. Clear entire aJSONData. + aJSONData = ""; + } else { + // There is another object. Clear the current one. + aJSONData.erase(0, position); + } + + return true; +} + + +void CMessageBroker::onMessageReceived(int fd, std::string& aJSONData, bool tryHard) { + DBG_MSG(("CMessageBroker::onMessageReceived(%d, '%s')\n", fd, aJSONData.c_str())); + + while (! aJSONData.empty()) { + Json::Value root; + if ((! p->m_reader.parse(aJSONData, root)) || root.isNull()) { + DBG_MSG_ERROR(("Unable to parse JSON!")); + if (! tryHard) { + return; + } + uint8_t first_byte = static_cast<uint8_t>(aJSONData[0]); + if ((first_byte <= 0x08) || ((first_byte >= 0x80) && (first_byte <= 0x88))) { + DBG_MSG((" There is an unparsed websocket header probably.\n")); + /* Websocket headers can have FIN flag set in the first byte (0x80). + * Then there are 3 zero bits and 4 bits for opcode (from 0x00 to 0x0A). + * But actually we don't use opcodes above 0x08. + * Use this fact to distinguish websocket header from payload text data. + * It can be a coincidence of course, but we have to give it a try. */ + return; + } else if ('{' == aJSONData[0]) { + DBG_MSG_ERROR((" Incomplete JSON object probably.\n")); + return; + } else { + DBG_MSG_ERROR((" Step in the buffer and try again...\n")); + aJSONData.erase(0, 1); + DBG_MSG_ERROR(("Buffer after cut is: '%s'\n", aJSONData.c_str())); + continue; } - aJSONData.erase(aJSONData.begin(), aJSONData.begin() + offset); - DBG_MSG(("Buffer after cut is:%s\n", aJSONData.c_str())); + + } else if (! root.isObject()) { + /* JSON RPC 2.0 messages are objects. Batch calls must be pre-rpocessed, + * so no need for "and !root.isArray()" */ + DBG_MSG_ERROR(("Parsed JSON is not an object!\n")); + if (! tryHard) { + return; + } + // Cut parsed data from the buffer below and continue + + } else if ((!root.isMember("jsonrpc")) || (root["jsonrpc"]!="2.0")) { + DBG_MSG_ERROR(("'jsonrpc' is not set correctly in parsed JSON!\n")); + if (! tryHard) { + return; + } + // Cut parsed object from the buffer below and continue + } else { - aJSONData = ""; + // Parsing successful. Pass the message up. + p->pushMessage(new CMessage(fd, root)); } - p->pushMessage(new CMessage(fd, root)); + + p->cutParsedJSON(root, aJSONData); + + DBG_MSG(("Buffer after cut is: '%s'\n", aJSONData.c_str())); } } diff --git a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp index ca27502b9..bdd7b2bfd 100644 --- a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp +++ b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp @@ -58,9 +58,10 @@ ssize_t TcpServer::Send(int fd, const std::string& data) { bool TcpServer::Recv(int fd) { DBG_MSG(("TcpServer::Recv(%d)\n", fd)); - ssize_t nb = -1; std::string* pReceivingBuffer = getBufferFor(fd); + bool buffer_was_not_empty = pReceivingBuffer->size() > 0; + std::vector<char> buf; buf.reserve(RECV_BUFFER_LENGTH + pReceivingBuffer->size()); DBG_MSG(("Left in pReceivingBuffer: %d \n", @@ -68,50 +69,87 @@ bool TcpServer::Recv(int fd) { buf.assign(pReceivingBuffer->c_str(), pReceivingBuffer->c_str() + pReceivingBuffer->size()); buf.resize(RECV_BUFFER_LENGTH + pReceivingBuffer->size()); - ssize_t received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0); - nb = received_bytes; + + int received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0); + if (received_bytes <= 0) { + DBG_MSG(("Received %d bytes from %d; error = %d\n", + received_bytes, fd, errno)); + m_purge.push_back(fd); + return false; + } + + unsigned int nb = received_bytes; + std::vector<char> last_msg_buf(buf.begin()+pReceivingBuffer->size(), + buf.begin()+pReceivingBuffer->size()+nb); DBG_MSG(("Recieved %d from %d\n", nb, fd)); - nb += pReceivingBuffer->size(); + nb += static_cast<unsigned int>(pReceivingBuffer->size()); DBG_MSG(("Recieved with buffer %d from %d\n", nb, fd)); - if (received_bytes > 0) { - unsigned int recieved_data = nb; + if (nb > 0) { // This is redundant if (isWebSocket(fd)) { const unsigned int data_length = - mWebSocketHandler.parseWebSocketDataLength(&buf[0], recieved_data); + mWebSocketHandler.parseWebSocketDataLength(&buf[0], nb); - DBG_MSG(("Received %d actual data length %d\n", - recieved_data, data_length)); + DBG_MSG(("Received %d actual data length %d\n", nb, data_length)); - if (data_length > recieved_data) { - DBG_MSG_ERROR(("Received %d actual data length %d\n", - recieved_data, data_length)); + if (data_length > nb) { + DBG_MSG_ERROR(("Received %d actual data length %d\n", nb, data_length)); DBG_MSG_ERROR(("Incomplete message")); *pReceivingBuffer = std::string(&buf[0], nb); return false; } - unsigned int b_size = static_cast<unsigned int>(nb); - mWebSocketHandler.parseWebSocketData(&buf[0], b_size); - nb = b_size; + mWebSocketHandler.parseWebSocketData(&buf[0], nb); } *pReceivingBuffer = std::string(&buf[0], nb); - DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s", + DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s\n", pReceivingBuffer->size(), pReceivingBuffer->c_str())); - // we need to check websocket clients here + + // we need to check for websocket handshake if (!checkWebSocketHandShake(fd, pReceivingBuffer)) { //JSON MESSAGE received. Send data in CMessageBroker. if (mpMessageBroker) { - mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer); + size_t buffer_size_before = pReceivingBuffer->size(); + mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer, true); + + if (buffer_was_not_empty && (pReceivingBuffer->size() == buffer_size_before)) { + /* We couldn't parse the buffer (with the last message at the end) + * Try to parse ONLY the last message */ + DBG_MSG_ERROR(("Couldn't parse the whole buffer! Try only the last message.\n")); + + nb = static_cast<unsigned int>(last_msg_buf.size()); + if (isWebSocket(fd)) { + const unsigned int data_length = + mWebSocketHandler.parseWebSocketDataLength(&last_msg_buf[0], nb); + if (data_length > nb) { + DBG_MSG_ERROR(("The last message may be incomplete. Don't do anything.\n")); + /* Should we replace the buffer with the last message? + * Probably not. It may not be a real websocket message. + * Wait for a full message. */ + return false; + } + mWebSocketHandler.parseWebSocketData(&last_msg_buf[0], nb); + } + + std::string last_message = std::string(&last_msg_buf[0], nb); + buffer_size_before = last_message.size(); + mpMessageBroker->onMessageReceived(fd, last_message, false); + if ( last_message.size() < buffer_size_before ) { + /* Parsing last message successful! Discard the old data and + * keep only what is left from the last message */ + DBG_MSG_ERROR(("Parsing last message successful! Discard the old data.\n")); + *pReceivingBuffer = last_message; + } + } } else { return false; } - } else { // client is a websocket - std::string handshakeResponse = - "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n" - "Connection: Upgrade\r\nSec-WebSocket-Accept: "; + } else { // message is a websocket handshake ssize_t webSocketKeyPos = pReceivingBuffer->find("Sec-WebSocket-Key: "); if (-1 != webSocketKeyPos) { + std::string handshakeResponse = + "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n" + "Connection: Upgrade\r\nSec-WebSocket-Accept: "; std::string wsKey = pReceivingBuffer->substr(webSocketKeyPos + 19, 24); mWebSocketHandler.handshake_0405(wsKey); handshakeResponse += wsKey; @@ -126,15 +164,8 @@ bool TcpServer::Recv(int fd) { m_WebSocketClients.push_back(fd); } } - - return true; - } - else { - DBG_MSG(("Received %d bytes from %d; error = %d\n", - received_bytes, fd, errno)); - m_purge.push_back(fd); - return false; } + return true; } bool TcpServer::checkWebSocketHandShake(int fd, std::string* pReceivingBuffer) { @@ -228,9 +259,9 @@ void TcpServer::WaitMessage(uint32_t ms) { itr = m_receivingBuffers.find((*it)); if (itr != m_receivingBuffers.end()) { // delete receiving buffer of disconnected client + mpMessageBroker->OnSocketClosed(itr->first); delete itr->second; m_receivingBuffers.erase(itr); - mpMessageBroker->OnSocketClosed(itr->first); } } |