summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAGaliuzov <AGaliuzov@luxoft.com>2015-09-25 17:01:00 +0300
committerAGaliuzov <AGaliuzov@luxoft.com>2015-09-25 17:01:00 +0300
commita2f5a1c6de476ec8d4fa11b873c16d25a109231a (patch)
tree116005c283ff2ab563f90123d38dc0a196eb3477
parent1c997837f63833c13026def2f4c6a468f2553a7e (diff)
parent1182cde5e1e8a878029a68a2dc333add5e8257a7 (diff)
downloadsmartdevicelink-a2f5a1c6de476ec8d4fa11b873c16d25a109231a.tar.gz
Merge pull request #215 from LuxoftSDL/hotfix/Remove_invalid_messages_from_MessageBrokers_buffer
Remove parsed JSON and invalid messages from MessageBroker's buffer
-rw-r--r--src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp5
-rw-r--r--src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp222
-rw-r--r--src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp93
3 files changed, 263 insertions, 57 deletions
diff --git a/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp b/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp
index 952b250ae..c54204379 100644
--- a/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp
+++ b/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp
@@ -61,9 +61,10 @@ namespace NsMessageBroker
* \brief Receive data from TCP server (from client).
* \param fd FileDescriptor of socket.
* \param aJSONData JSON string.
+ * \param tryHard give up on first JSON parse error or try to workaround it.
*/
- void onMessageReceived(int fd, std::string& aJSONData);
-
+ void onMessageReceived(int fd, std::string& aJSONData, bool tryHard);
+
/**
* \brief Test of buffer parsing.
*/
diff --git a/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp b/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp
index 07f01ea12..3b14489a8 100644
--- a/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp
+++ b/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp
@@ -4,6 +4,7 @@
* \author AKara
*/
+#include <cassert>
#include <stdio.h>
#include <vector>
@@ -216,6 +217,31 @@ class CMessageBroker_Private {
int popMessageFromWaitQue(CMessage* pMessage);
/**
+ * \brief Tries to remove the parsed part of the buffer
+ * \param root Parsed JSON value
+ * \param aJSONData The string buffer
+ * \return true on success, false on failure
+ */
+ bool cutParsedJSON(const Json::Value& root, std::string& aJSONData);
+
+ /**
+ * \brief Finds the position just after a JSON object or array in a buffer
+ * \param isObject Must be true for object, false for array
+ * \param aJSONData The string buffer
+ * \return The position in the buffer after the object or array on success,
+ * std::strin::npos on failure
+ */
+ size_t jumpOverJSONObjectOrArray(bool isObject, const std::string& aJSONData);
+
+ /**
+ * \brief Finds the position just after a JSON string in a buffer
+ * \param aJSONData The string buffer
+ * \return The position in the buffer after the string on success,
+ * std::strin::npos on failure
+ */
+ size_t jumpOverJSONString(const std::string& aJSONData);
+
+ /**
* \brief Que of messages.
*/
std::deque<CMessage*> mMessagesQueue;
@@ -288,35 +314,183 @@ CMessageBroker* CMessageBroker::getInstance() {
return &instance;
}
-void CMessageBroker::onMessageReceived(int fd, std::string& aJSONData) {
- DBG_MSG(("CMessageBroker::onMessageReceived()\n"));
- while (!aJSONData.empty()) {
- Json::Value root;
- if (!p->m_reader.parse(aJSONData, root)) {
- DBG_MSG(("Received not JSON string! %s\n", aJSONData.c_str()));
- return;
+
+size_t CMessageBroker_Private::jumpOverJSONObjectOrArray(bool isObject,
+ const std::string& aJSONData) {
+ const char openBracket = isObject? '{' : '[';
+ const char closeBracket = isObject? '}' : ']';
+ int open_minus_close_brackets(1);
+ size_t position = aJSONData.find(openBracket); // Find the beginning of the object
+
+ while ((position != std::string::npos) && (open_minus_close_brackets > 0)) {
+ position = aJSONData.find_first_of(std::string("\"")+openBracket+closeBracket,
+ position+1);
+ if (std::string::npos == position) {
+ break;
}
- if(root["jsonrpc"]!="2.0") {
- DBG_MSG(("\t Json::Reader::parce didn't set up jsonrpc! jsonrpc = '%s'\n", root["jsonrpc"].asString().c_str()));
- return;
+ if ('"' == aJSONData[position]) {
+ // Ignore string interior, which might contain brackets and escaped "-s
+ do {
+ position = aJSONData.find('"', position+1); // Find the closing quote
+ } while ((std::string::npos != position) && ('\\' == aJSONData[position-1]));
+ } else if (openBracket == aJSONData[position]) {
+ ++open_minus_close_brackets;
+ } else if (closeBracket == aJSONData[position]) {
+ --open_minus_close_brackets;
}
- std::string wmes = p->m_recieverWriter.write(root);
- DBG_MSG(("Parsed JSON string %d : %s\n", wmes.length(),
- wmes.c_str()));
- DBG_MSG(("Buffer is:%s\n", aJSONData.c_str()));
- if (aJSONData.length() > wmes.length()) {
- // wmes string length can differ from buffer substr length
- size_t offset = wmes.length();
- char msg_begin = '{';
- if (aJSONData.at(offset) != msg_begin) {
- offset -= 1; // wmes can contain redudant \n in the tail.
+ }
+
+ if ((0 == open_minus_close_brackets) && (std::string::npos != position)) {
+ ++position; // Move after the closing bracket
+ } else {
+ position = std::string::npos;
+ }
+
+ return position;
+}
+
+
+size_t CMessageBroker_Private::jumpOverJSONString(const std::string& aJSONData) {
+ size_t position = aJSONData.find('"'); // Find the beginning of the string
+
+ do {
+ position = aJSONData.find('"', position+1); // Find the closing quote
+ } while ((std::string::npos != position) && ('\\' == aJSONData[position-1]));
+
+ if (std::string::npos != position) {
+ ++position; // Move after the closing quote
+ }
+
+ return position;
+}
+
+
+bool CMessageBroker_Private::cutParsedJSON(const Json::Value& root,
+ std::string& aJSONData) {
+ if (root.isNull() || aJSONData.empty()) {
+ DBG_MSG_ERROR(("JSON is null or the buffer is empty!\n"));
+ return false;
+ }
+
+ std::string parsed_json_str = m_recieverWriter.write(root);
+ DBG_MSG(("Parsed JSON string: '%s'\n", parsed_json_str.c_str()));
+
+ // Trim front spaces (if any)
+ const size_t nonempty_position = aJSONData.find_first_not_of(" \t\n\v\f\r");
+ aJSONData.erase(0, nonempty_position);
+ if (std::string::npos == nonempty_position) {
+ DBG_MSG_ERROR(("Buffer contains only blanks!\n"));
+ return false;
+ }
+
+ // JSON writer puts '\n' at the end. Remove it.
+ const size_t final_lf_pos = parsed_json_str.rfind('\n');
+ if (final_lf_pos == parsed_json_str.length()-1) {
+ parsed_json_str.erase(final_lf_pos, 1);
+ }
+
+ /* RFC 4627: "A JSON value MUST be an object, array, number, or string, or
+ * one of the following three literal names: false null true"
+ * So we will try to find the borders of the parsed part based on its type. */
+
+ size_t position(std::string::npos);
+
+ if (0 == aJSONData.find(parsed_json_str)) {
+ // If by chance parsed JSON is the same in the buffer and is at the beginning
+ position = parsed_json_str.length();
+ } else if (root.isObject() || root.isArray()) {
+ position = jumpOverJSONObjectOrArray(root.isObject(), aJSONData);
+ } else if (root.isString()) {
+ position = jumpOverJSONString(aJSONData);
+ } else if (root.isNumeric()) {
+ position = aJSONData.find_first_not_of("+-0123456789.eE");
+ } else if (root.isBool() || ("null" == parsed_json_str)) {
+ position = aJSONData.find(parsed_json_str);
+ if (std::string::npos != position) {
+ position += parsed_json_str.length();
+ }
+ } else {
+ DBG_MSG_ERROR(("Unknown JSON type!\n"));
+ }
+
+ if (std::string::npos == position) {
+ DBG_MSG_ERROR(("Error finding JSON object boundaries!\n"));
+ /* This should not happen, because the string is already parsed as a
+ * valid JSON. If this happens then above code is wrong. It is better
+ * to assert() than just return here, because otherwise we may enter an
+ * endless cycle - fail to process one and the same message again and
+ * again. Or we may clear the buffer and return, but in this way we will
+ * loose the next messages, miss a bug here, and create another bug. */
+ assert(std::string::npos != position);
+ return false; // For release version
+ }
+
+ if ((position >= aJSONData.length()) ||
+ ((position == aJSONData.length()-1) && isspace(aJSONData[position]))) {
+ // No next object. Clear entire aJSONData.
+ aJSONData = "";
+ } else {
+ // There is another object. Clear the current one.
+ aJSONData.erase(0, position);
+ }
+
+ return true;
+}
+
+
+void CMessageBroker::onMessageReceived(int fd, std::string& aJSONData, bool tryHard) {
+ DBG_MSG(("CMessageBroker::onMessageReceived(%d, '%s')\n", fd, aJSONData.c_str()));
+
+ while (! aJSONData.empty()) {
+ Json::Value root;
+ if ((! p->m_reader.parse(aJSONData, root)) || root.isNull()) {
+ DBG_MSG_ERROR(("Unable to parse JSON!"));
+ if (! tryHard) {
+ return;
+ }
+ uint8_t first_byte = static_cast<uint8_t>(aJSONData[0]);
+ if ((first_byte <= 0x08) || ((first_byte >= 0x80) && (first_byte <= 0x88))) {
+ DBG_MSG((" There is an unparsed websocket header probably.\n"));
+ /* Websocket headers can have FIN flag set in the first byte (0x80).
+ * Then there are 3 zero bits and 4 bits for opcode (from 0x00 to 0x0A).
+ * But actually we don't use opcodes above 0x08.
+ * Use this fact to distinguish websocket header from payload text data.
+ * It can be a coincidence of course, but we have to give it a try. */
+ return;
+ } else if ('{' == aJSONData[0]) {
+ DBG_MSG_ERROR((" Incomplete JSON object probably.\n"));
+ return;
+ } else {
+ DBG_MSG_ERROR((" Step in the buffer and try again...\n"));
+ aJSONData.erase(0, 1);
+ DBG_MSG_ERROR(("Buffer after cut is: '%s'\n", aJSONData.c_str()));
+ continue;
}
- aJSONData.erase(aJSONData.begin(), aJSONData.begin() + offset);
- DBG_MSG(("Buffer after cut is:%s\n", aJSONData.c_str()));
+
+ } else if (! root.isObject()) {
+ /* JSON RPC 2.0 messages are objects. Batch calls must be pre-rpocessed,
+ * so no need for "and !root.isArray()" */
+ DBG_MSG_ERROR(("Parsed JSON is not an object!\n"));
+ if (! tryHard) {
+ return;
+ }
+ // Cut parsed data from the buffer below and continue
+
+ } else if ((!root.isMember("jsonrpc")) || (root["jsonrpc"]!="2.0")) {
+ DBG_MSG_ERROR(("'jsonrpc' is not set correctly in parsed JSON!\n"));
+ if (! tryHard) {
+ return;
+ }
+ // Cut parsed object from the buffer below and continue
+
} else {
- aJSONData = "";
+ // Parsing successful. Pass the message up.
+ p->pushMessage(new CMessage(fd, root));
}
- p->pushMessage(new CMessage(fd, root));
+
+ p->cutParsedJSON(root, aJSONData);
+
+ DBG_MSG(("Buffer after cut is: '%s'\n", aJSONData.c_str()));
}
}
diff --git a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
index ca27502b9..bdd7b2bfd 100644
--- a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
+++ b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
@@ -58,9 +58,10 @@ ssize_t TcpServer::Send(int fd, const std::string& data) {
bool TcpServer::Recv(int fd) {
DBG_MSG(("TcpServer::Recv(%d)\n", fd));
- ssize_t nb = -1;
std::string* pReceivingBuffer = getBufferFor(fd);
+ bool buffer_was_not_empty = pReceivingBuffer->size() > 0;
+
std::vector<char> buf;
buf.reserve(RECV_BUFFER_LENGTH + pReceivingBuffer->size());
DBG_MSG(("Left in pReceivingBuffer: %d \n",
@@ -68,50 +69,87 @@ bool TcpServer::Recv(int fd) {
buf.assign(pReceivingBuffer->c_str(),
pReceivingBuffer->c_str() + pReceivingBuffer->size());
buf.resize(RECV_BUFFER_LENGTH + pReceivingBuffer->size());
- ssize_t received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0);
- nb = received_bytes;
+
+ int received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0);
+ if (received_bytes <= 0) {
+ DBG_MSG(("Received %d bytes from %d; error = %d\n",
+ received_bytes, fd, errno));
+ m_purge.push_back(fd);
+ return false;
+ }
+
+ unsigned int nb = received_bytes;
+ std::vector<char> last_msg_buf(buf.begin()+pReceivingBuffer->size(),
+ buf.begin()+pReceivingBuffer->size()+nb);
DBG_MSG(("Recieved %d from %d\n", nb, fd));
- nb += pReceivingBuffer->size();
+ nb += static_cast<unsigned int>(pReceivingBuffer->size());
DBG_MSG(("Recieved with buffer %d from %d\n", nb, fd));
- if (received_bytes > 0) {
- unsigned int recieved_data = nb;
+ if (nb > 0) { // This is redundant
if (isWebSocket(fd)) {
const unsigned int data_length =
- mWebSocketHandler.parseWebSocketDataLength(&buf[0], recieved_data);
+ mWebSocketHandler.parseWebSocketDataLength(&buf[0], nb);
- DBG_MSG(("Received %d actual data length %d\n",
- recieved_data, data_length));
+ DBG_MSG(("Received %d actual data length %d\n", nb, data_length));
- if (data_length > recieved_data) {
- DBG_MSG_ERROR(("Received %d actual data length %d\n",
- recieved_data, data_length));
+ if (data_length > nb) {
+ DBG_MSG_ERROR(("Received %d actual data length %d\n", nb, data_length));
DBG_MSG_ERROR(("Incomplete message"));
*pReceivingBuffer = std::string(&buf[0], nb);
return false;
}
- unsigned int b_size = static_cast<unsigned int>(nb);
- mWebSocketHandler.parseWebSocketData(&buf[0], b_size);
- nb = b_size;
+ mWebSocketHandler.parseWebSocketData(&buf[0], nb);
}
*pReceivingBuffer = std::string(&buf[0], nb);
- DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s",
+ DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s\n",
pReceivingBuffer->size(), pReceivingBuffer->c_str()));
- // we need to check websocket clients here
+
+ // we need to check for websocket handshake
if (!checkWebSocketHandShake(fd, pReceivingBuffer))
{ //JSON MESSAGE received. Send data in CMessageBroker.
if (mpMessageBroker) {
- mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer);
+ size_t buffer_size_before = pReceivingBuffer->size();
+ mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer, true);
+
+ if (buffer_was_not_empty && (pReceivingBuffer->size() == buffer_size_before)) {
+ /* We couldn't parse the buffer (with the last message at the end)
+ * Try to parse ONLY the last message */
+ DBG_MSG_ERROR(("Couldn't parse the whole buffer! Try only the last message.\n"));
+
+ nb = static_cast<unsigned int>(last_msg_buf.size());
+ if (isWebSocket(fd)) {
+ const unsigned int data_length =
+ mWebSocketHandler.parseWebSocketDataLength(&last_msg_buf[0], nb);
+ if (data_length > nb) {
+ DBG_MSG_ERROR(("The last message may be incomplete. Don't do anything.\n"));
+ /* Should we replace the buffer with the last message?
+ * Probably not. It may not be a real websocket message.
+ * Wait for a full message. */
+ return false;
+ }
+ mWebSocketHandler.parseWebSocketData(&last_msg_buf[0], nb);
+ }
+
+ std::string last_message = std::string(&last_msg_buf[0], nb);
+ buffer_size_before = last_message.size();
+ mpMessageBroker->onMessageReceived(fd, last_message, false);
+ if ( last_message.size() < buffer_size_before ) {
+ /* Parsing last message successful! Discard the old data and
+ * keep only what is left from the last message */
+ DBG_MSG_ERROR(("Parsing last message successful! Discard the old data.\n"));
+ *pReceivingBuffer = last_message;
+ }
+ }
} else {
return false;
}
- } else { // client is a websocket
- std::string handshakeResponse =
- "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n"
- "Connection: Upgrade\r\nSec-WebSocket-Accept: ";
+ } else { // message is a websocket handshake
ssize_t webSocketKeyPos = pReceivingBuffer->find("Sec-WebSocket-Key: ");
if (-1 != webSocketKeyPos) {
+ std::string handshakeResponse =
+ "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n"
+ "Connection: Upgrade\r\nSec-WebSocket-Accept: ";
std::string wsKey = pReceivingBuffer->substr(webSocketKeyPos + 19, 24);
mWebSocketHandler.handshake_0405(wsKey);
handshakeResponse += wsKey;
@@ -126,15 +164,8 @@ bool TcpServer::Recv(int fd) {
m_WebSocketClients.push_back(fd);
}
}
-
- return true;
- }
- else {
- DBG_MSG(("Received %d bytes from %d; error = %d\n",
- received_bytes, fd, errno));
- m_purge.push_back(fd);
- return false;
}
+ return true;
}
bool TcpServer::checkWebSocketHandShake(int fd, std::string* pReceivingBuffer) {
@@ -228,9 +259,9 @@ void TcpServer::WaitMessage(uint32_t ms) {
itr = m_receivingBuffers.find((*it));
if (itr != m_receivingBuffers.end())
{ // delete receiving buffer of disconnected client
+ mpMessageBroker->OnSocketClosed(itr->first);
delete itr->second;
m_receivingBuffers.erase(itr);
- mpMessageBroker->OnSocketClosed(itr->first);
}
}