summaryrefslogtreecommitdiff
path: root/src/components
diff options
context:
space:
mode:
Diffstat (limited to 'src/components')
-rw-r--r--src/components/config_profile/include/config_profile/profile.h2
-rw-r--r--src/components/config_profile/src/profile.cc8
-rw-r--r--src/components/protocol_handler/CMakeLists.txt1
-rw-r--r--src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h19
-rw-r--r--src/components/protocol_handler/include/protocol_handler/multiframe_builder.h157
-rw-r--r--src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h54
-rw-r--r--src/components/protocol_handler/include/protocol_handler/protocol_packet.h12
-rw-r--r--src/components/protocol_handler/src/incoming_data_handler.cc38
-rw-r--r--src/components/protocol_handler/src/multiframe_builder.cc273
-rw-r--r--src/components/protocol_handler/src/protocol_handler_impl.cc189
-rw-r--r--src/components/protocol_handler/src/protocol_packet.cc8
-rw-r--r--src/components/protocol_handler/test/CMakeLists.txt1
-rw-r--r--src/components/protocol_handler/test/include/protocol_handler_mock.h2
-rw-r--r--src/components/protocol_handler/test/multiframe_builder_test.cc522
-rw-r--r--src/components/protocol_handler/test/protocol_handler_tm_test.cc9
-rw-r--r--src/components/time_tester/test/time_manager_test.cc2
16 files changed, 1105 insertions, 192 deletions
diff --git a/src/components/config_profile/include/config_profile/profile.h b/src/components/config_profile/include/config_profile/profile.h
index e3ef1a82a7..7c8f3c3958 100644
--- a/src/components/config_profile/include/config_profile/profile.h
+++ b/src/components/config_profile/include/config_profile/profile.h
@@ -545,6 +545,8 @@ class Profile : public utils::Singleton<Profile> {
size_t malformed_frequency_time() const;
+ uint32_t multiframe_waiting_timeout() const;
+
uint16_t attempts_to_open_policy_db() const;
uint16_t open_attempt_timeout_ms() const;
diff --git a/src/components/config_profile/src/profile.cc b/src/components/config_profile/src/profile.cc
index 92c139ce8d..153c89b0d2 100644
--- a/src/components/config_profile/src/profile.cc
+++ b/src/components/config_profile/src/profile.cc
@@ -196,6 +196,7 @@ const char* kFrequencyTime = "FrequencyTime";
const char* kMalformedMessageFiltering = "MalformedMessageFiltering";
const char* kMalformedFrequencyCount = "MalformedFrequencyCount";
const char* kMalformedFrequencyTime = "MalformedFrequencyTime";
+const char* kExpectedConsecutiveFramesTimeout = "ExpectedConsecutiveFramesTimeout";
const char* kHashStringSizeKey = "HashStringSize";
const char* kUseDBForResumptionKey = "UseDBForResumption";
const char* kAttemptsToOpenResumptionDBKey = "AttemptsToOpenResumptionDB";
@@ -273,6 +274,7 @@ const size_t kDefaultFrequencyTime = 1000;
const bool kDefaulMalformedMessageFiltering = true;
const size_t kDefaultMalformedFrequencyCount = 10;
const size_t kDefaultMalformedFrequencyTime = 1000;
+const uint32_t kDefaultExpectedConsecutiveFramesTimeout = 10000;
const uint16_t kDefaultAttemptsToOpenPolicyDB = 5;
const uint16_t kDefaultOpenAttemptTimeoutMs = 500;
const uint32_t kDefaultAppIconsFolderMaxSize = 104857600;
@@ -709,6 +711,12 @@ size_t Profile::malformed_frequency_time() const {
kProtocolHandlerSection, kMalformedFrequencyTime);
return malformed_frequency_time;
}
+uint32_t Profile::multiframe_waiting_timeout() const {
+ uint32_t multiframe_waiting_timeout = 0;
+ ReadUIntValue(&multiframe_waiting_timeout, kDefaultExpectedConsecutiveFramesTimeout,
+ kProtocolHandlerSection, kExpectedConsecutiveFramesTimeout);
+ return multiframe_waiting_timeout;
+}
uint16_t Profile::attempts_to_open_policy_db() const {
return attempts_to_open_policy_db_;
diff --git a/src/components/protocol_handler/CMakeLists.txt b/src/components/protocol_handler/CMakeLists.txt
index 10a18c48b6..81ce371001 100644
--- a/src/components/protocol_handler/CMakeLists.txt
+++ b/src/components/protocol_handler/CMakeLists.txt
@@ -43,6 +43,7 @@ set(SOURCES
${COMPONENTS_DIR}/protocol_handler/src/protocol_handler_impl.cc
${COMPONENTS_DIR}/protocol_handler/src/protocol_packet.cc
${COMPONENTS_DIR}/protocol_handler/src/protocol_payload.cc
+ ${COMPONENTS_DIR}/protocol_handler/src/multiframe_builder.cc
)
set(LIBRARIES
diff --git a/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h b/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h
index 7be82843c7..b02e7de3bf 100644
--- a/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h
+++ b/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h
@@ -32,7 +32,6 @@
#ifndef SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_INCOMING_DATA_HANDLER_H_
#define SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_INCOMING_DATA_HANDLER_H_
-#include <list>
#include <map>
#include <vector>
#include "utils/macro.h"
@@ -53,7 +52,7 @@ class IncomingDataHandler {
* @brief Setting additional validator for checking malformed packets
* \param validator pointer
*/
- void set_validator(const ProtocolPacket::ProtocolHeaderValidator *const validator);
+ void set_validator(const ProtocolPacket::ProtocolHeaderValidator* const validator);
/**
* @brief Concatenate TM messages to ford frames and validate ford header data
* \param TM messages for converting to frames
@@ -65,9 +64,9 @@ class IncomingDataHandler {
* - RESULT_FAIL - packet serialization or validation error occurs
* \return list of complete, correct packets
*/
- std::list<ProtocolFramePtr> ProcessData(const RawMessage &tm_message,
- RESULT_CODE *result,
- size_t *malformed_occurrence);
+ ProtocolFramePtrList ProcessData(const RawMessage& tm_message,
+ RESULT_CODE* result,
+ size_t* malformed_occurrence);
/**
* @brief Add connection for data handling and verification
*/
@@ -83,7 +82,7 @@ class IncomingDataHandler {
/**
* @brief Returns size of frame to be formed from raw bytes.
*/
- static uint32_t GetPacketSize(const ProtocolPacket::ProtocolHeader &header);
+ static uint32_t GetPacketSize(const ProtocolPacket::ProtocolHeader& header);
/**
* @brief Try to create frame from incoming data
* \param incommung_data raw stream
@@ -95,16 +94,16 @@ class IncomingDataHandler {
* - RESULT_OK - one or more frames successfully created
* - RESULT_FAIL - packet serialization or validation error occurs
*/
- RESULT_CODE CreateFrame(std::vector<uint8_t> &incoming_data,
- std::list<ProtocolFramePtr> &out_frames,
- size_t &malformed_occurrence,
+ RESULT_CODE CreateFrame(std::vector<uint8_t>& incoming_data,
+ ProtocolFramePtrList& out_frames,
+ size_t& malformed_occurrence,
const transport_manager::ConnectionUID connection_id);
typedef std::map<transport_manager::ConnectionUID, std::vector<uint8_t> >
ConnectionsDataMap;
ConnectionsDataMap connections_data_;
ProtocolPacket::ProtocolHeader header_;
- const ProtocolPacket::ProtocolHeaderValidator *validator_;
+ const ProtocolPacket::ProtocolHeaderValidator* validator_;
bool last_portion_of_data_was_malformed_;
DISALLOW_COPY_AND_ASSIGN(IncomingDataHandler);
};
diff --git a/src/components/protocol_handler/include/protocol_handler/multiframe_builder.h b/src/components/protocol_handler/include/protocol_handler/multiframe_builder.h
new file mode 100644
index 0000000000..6ccf891b38
--- /dev/null
+++ b/src/components/protocol_handler/include/protocol_handler/multiframe_builder.h
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2015, Ford Motor Company
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * Neither the name of the Ford Motor Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_MULTIFRAME_BUILDER_H_
+#define SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_MULTIFRAME_BUILDER_H_
+
+#include <map>
+#include <ostream> // std::basic_ostream
+#include <iterator> // std::ostream_iterator
+#include <algorithm> // std::copy
+
+#include "utils/date_time.h"
+#include "protocol_handler/protocol_packet.h"
+
+/**
+ *\namespace protocol_handlerHandler
+ *\brief Namespace for SmartDeviceLink ProtocolHandler related functionality.
+ */
+namespace protocol_handler {
+/**
+ * \brief Session identifier - contains connection identifier and
+ * session_id from protocol (can be used as hash)
+ */
+// TODO(EZamakhov): move SessionID to protocol_handler/protocol_packet.h
+typedef uint8_t SessionID;
+/**
+ * \brief Message identifier - unique to the session messages
+ */
+// TODO(EZamakhov): move MessageID to protocol_handler/session_observer.h
+typedef uint32_t MessageID;
+
+struct ProtocolFrameData {
+ ProtocolFramePtr frame;
+ TimevalStruct append_time;
+};
+/**
+ *\brief Map of frames with last frame data for messages received in multiple frames.
+ */
+typedef std::map<MessageID, ProtocolFrameData> MessageIDToFrameMap;
+/**
+ *\brief Map of frames with last frame data for messages received in multiple frames.
+ */
+typedef std::map<SessionID, MessageIDToFrameMap> SessionToFrameMap;
+/**
+ *\brief Map of frames with last frame data for messages received in multiple frames.
+ */
+typedef std::map<ConnectionID, SessionToFrameMap> MultiFrameMap;
+
+/**
+ * \class MultiFrameBuilder
+ * \brief Class for assembling consecutive frames according to
+ * messageID to complete multiframes.
+ */
+class MultiFrameBuilder {
+ public:
+ /**
+ * @brief Constructor
+ */
+ MultiFrameBuilder();
+
+ /**
+ *\brief Set timeout of waiting CONSECUTIVE frames
+ */
+ void set_waiting_timeout(const uint32_t consecutive_frame_wait_msecs);
+
+ /**
+ * @brief Add connection for pending data
+ * @return true on success
+ */
+ bool AddConnection(const ConnectionID connection_id);
+
+ /**
+ * @brief Clear all data related to connection_id
+ * @return true on success
+ */
+ bool RemoveConnection(const ConnectionID connection_id);
+
+ /**
+ *\brief Pop assembled and expired frames
+ */
+ ProtocolFramePtrList PopMultiframes();
+
+ /**
+ *\brief Handle Single or Consecutive frame
+ * @return RESULT_OK on success, or RESULT_FAIL in case of any error
+ */
+ RESULT_CODE AddFrame(const ProtocolFramePtr packet);
+
+ private:
+ RESULT_CODE HandleFirstFrame(const ProtocolFramePtr packet);
+ RESULT_CODE HandleConsecutiveFrame(const ProtocolFramePtr packet);
+
+ // Map of frames with last frame data for messages received in multiple frames.
+ MultiFrameMap multiframes_map_;
+ int64_t consecutive_frame_wait_msecs_;
+};
+
+template<typename _CharT>
+std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
+ const protocol_handler::MultiFrameMap& map) {
+ if (map.empty()) {
+ stream << "{empty}";
+ return stream;
+ }
+ for (MultiFrameMap::const_iterator connection_it = map.begin();
+ connection_it != map.end(); ++connection_it) {
+ const SessionToFrameMap& session_map = connection_it->second;
+
+ for (SessionToFrameMap::const_iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ const MessageIDToFrameMap& messageId_map = session_it->second;
+
+ for (MessageIDToFrameMap::const_iterator messageId_it = messageId_map.begin();
+ messageId_it != messageId_map.end(); ++messageId_it) {
+ const ProtocolFrameData& frame_data = messageId_it->second;
+
+ stream << "ConnectionID: " << connection_it->first
+ << ", SessionID: " << static_cast<uint32_t>(session_it->first)
+ << ", MessageID: " << static_cast<uint32_t>(messageId_it->first)
+ << " msec, frame: " << frame_data.frame << std::endl;
+ }
+ }
+ }
+ return stream;
+}
+
+} // namespace protocol_handler
+#endif // SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_MULTIFRAME_BUILDER_H_
diff --git a/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h b/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h
index 260ad16ec7..fb10811f14 100644
--- a/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h
+++ b/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h
@@ -50,6 +50,7 @@
#include "protocol_handler/session_observer.h"
#include "protocol_handler/protocol_observer.h"
#include "protocol_handler/incoming_data_handler.h"
+#include "protocol_handler/multiframe_builder.h"
#include "transport_manager/common.h"
#include "transport_manager/transport_manager.h"
#include "transport_manager/transport_manager_listener_empty.h"
@@ -142,14 +143,17 @@ class ProtocolHandlerImpl
* \param malformed_message_frequency_time used as time for malformed flood filtering
* \param malformed_message_frequency_count used as maximum value of malformed
* messages per message_frequency_time period
+ * \param multiframe_waiting_timeout used as maximum time of consecutive
+ * frames handling
* message exchange.
*/
explicit ProtocolHandlerImpl(
- transport_manager::TransportManager *transport_manager_param,
- size_t message_frequency_time, size_t message_frequency_count,
- bool malformed_message_filtering,
- size_t malformed_message_frequency_time,
- size_t malformed_message_frequency_count);
+ transport_manager::TransportManager *transport_manager_param,
+ size_t message_frequency_time, size_t message_frequency_count,
+ bool malformed_message_filtering,
+ size_t malformed_message_frequency_time,
+ size_t malformed_message_frequency_count,
+ uint32_t multiframe_waiting_timeout);
/**
* \brief Destructor
@@ -402,59 +406,40 @@ class ProtocolHandlerImpl
/**
* \brief Handles received message.
- * \param connection_handle Identifier of connection through which message
- * is received.
* \param packet Received message with protocol header.
* \return \saRESULT_CODE Status of operation
*/
- RESULT_CODE HandleMessage(
- ConnectionID connection_id,
- const ProtocolFramePtr packet);
+ RESULT_CODE HandleMessage(const ProtocolFramePtr packet);
/**
* \brief Handles message received in single frame.
- * \param connection_handle Identifier of connection through which message
- * is received.
* \param packet Frame of message with protocol header.
* \return \saRESULT_CODE Status of operation
*/
- RESULT_CODE HandleSingleFrameMessage(
- ConnectionID connection_id,
- const ProtocolFramePtr packet);
+ RESULT_CODE HandleSingleFrameMessage(const ProtocolFramePtr packet);
/**
* \brief Handles message received in multiple frames. Collects all frames
* of message.
- * \param connection_handle Identifier of connection through which message
- * is received.
* \param packet Current frame of message with protocol header.
* \return \saRESULT_CODE Status of operation
*/
- RESULT_CODE HandleMultiFrameMessage(
- ConnectionID connection_id,
- const ProtocolFramePtr packet);
+ RESULT_CODE HandleMultiFrameMessage(const ProtocolFramePtr packet);
/**
* \brief Handles message received in single frame.
- * \param connection_handle Identifier of connection through which message
- * is received.
* \param packet Received message with protocol header.
* \return \saRESULT_CODE Status of operation
*/
RESULT_CODE HandleControlMessage(
- ConnectionID connection_id,
const ProtocolFramePtr packet);
- RESULT_CODE HandleControlMessageEndSession(
- ConnectionID connection_id,
- const ProtocolPacket &packet);
+ RESULT_CODE HandleControlMessageEndSession(const ProtocolPacket &packet);
+
+ RESULT_CODE HandleControlMessageStartSession(const ProtocolPacket &packet);
- RESULT_CODE HandleControlMessageStartSession(
- ConnectionID connection_id,
- const ProtocolPacket &packet);
+ RESULT_CODE HandleControlMessageHeartBeat(const ProtocolPacket &packet);
- RESULT_CODE HandleControlMessageHeartBeat(
- ConnectionID connection_id,
- const ProtocolPacket &packet);
+ void PopValideAndExpirateMultiframes();
// threads::MessageLoopThread<*>::Handler implementations
// CALLED ON raw_ford_messages_from_mobile_ thread!
@@ -495,10 +480,9 @@ class ProtocolHandlerImpl
transport_manager::TransportManager *transport_manager_;
/**
- *\brief Map of frames with last frame data for messages received in multiple frames.
+ *\brief Assembling support class.
*/
- typedef std::map<int32_t, ProtocolFramePtr> MultiFrameMap;
- MultiFrameMap incomplete_multi_frame_messages_;
+ MultiFrameBuilder multiframe_builder_;
/**
* \brief Map of messages (frames) received over mobile nave session
diff --git a/src/components/protocol_handler/include/protocol_handler/protocol_packet.h b/src/components/protocol_handler/include/protocol_handler/protocol_packet.h
index db0650cfd8..1b68e6c870 100644
--- a/src/components/protocol_handler/include/protocol_handler/protocol_packet.h
+++ b/src/components/protocol_handler/include/protocol_handler/protocol_packet.h
@@ -33,6 +33,7 @@
#ifndef SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_PROTOCOL_PACKET_H_
#define SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_PROTOCOL_PACKET_H_
+#include <list>
#include "utils/macro.h"
#include "protocol/common.h"
#include "transport_manager/common.h"
@@ -245,7 +246,7 @@ class ProtocolPacket {
/**
* \brief Getter for Connection Identifier
*/
- uint8_t connection_id() const;
+ ConnectionID connection_id() const;
/**
* \brief Getter for data payload size
@@ -286,6 +287,7 @@ class ProtocolPacket {
* @brief Type definition for variable that hold shared pointer to protocolol packet
*/
typedef utils::SharedPtr<protocol_handler::ProtocolPacket> ProtocolFramePtr;
+typedef std::list<ProtocolFramePtr> ProtocolFramePtrList;
template<typename _CharT>
std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
@@ -304,7 +306,7 @@ template<typename _CharT>
std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
const protocol_handler::ProtocolPacket& packet) {
stream << packet.packet_header() <<
- ", ConnectionID: " << (packet.connection_id()) <<
+ ", ConnectionID: " << static_cast<uint32_t>(packet.connection_id()) <<
", TotalDataBytes: " << (packet.total_data_bytes()) <<
", Data: " << static_cast<void*>(packet.data());
return stream;
@@ -312,7 +314,9 @@ std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
template<typename _CharT>
std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
const ProtocolFramePtr packet_ptr) {
- stream << *packet_ptr;
- return stream;
+ if(packet_ptr) {
+ return stream << *packet_ptr;
+ }
+ return stream << "empty smart pointer";
}
#endif // SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_PROTOCOL_PACKET_H_
diff --git a/src/components/protocol_handler/src/incoming_data_handler.cc b/src/components/protocol_handler/src/incoming_data_handler.cc
index 43b0898cc2..f1ceb18425 100644
--- a/src/components/protocol_handler/src/incoming_data_handler.cc
+++ b/src/components/protocol_handler/src/incoming_data_handler.cc
@@ -40,32 +40,32 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "ProtocolHandler")
IncomingDataHandler::IncomingDataHandler()
: header_(),
validator_(NULL),
- last_portion_of_data_was_malformed_(false)
- {}
+ last_portion_of_data_was_malformed_(false) {
+}
void IncomingDataHandler::set_validator(
- const ProtocolPacket::ProtocolHeaderValidator *const validator) {
+ const ProtocolPacket::ProtocolHeaderValidator* const validator) {
validator_ = validator;
}
static const size_t MIN_HEADER_SIZE = std::min(PROTOCOL_HEADER_V1_SIZE,
PROTOCOL_HEADER_V2_SIZE);
-std::list<ProtocolFramePtr> IncomingDataHandler::ProcessData(
- const RawMessage &tm_message,
- RESULT_CODE *result,
- size_t *malformed_occurrence) {
+ProtocolFramePtrList IncomingDataHandler::ProcessData(
+ const RawMessage& tm_message,
+ RESULT_CODE* result,
+ size_t* malformed_occurrence) {
LOG4CXX_AUTO_TRACE(logger_);
DCHECK(result);
DCHECK(malformed_occurrence);
const transport_manager::ConnectionUID connection_id =
tm_message.connection_key();
- const uint8_t *data = tm_message.data();
+ const uint8_t* data = tm_message.data();
const size_t tm_message_size = tm_message.data_size();
if (tm_message_size == 0 || data == NULL) {
LOG4CXX_WARN(logger_, "Wrong raw message " << tm_message_size << " bytes");
*result = RESULT_FAIL;
- return std::list<ProtocolFramePtr>();
+ return ProtocolFramePtrList();
}
LOG4CXX_DEBUG(logger_, "Processing incoming data of size "
<< tm_message_size << " for connection " << connection_id);
@@ -73,13 +73,13 @@ std::list<ProtocolFramePtr> IncomingDataHandler::ProcessData(
if (connections_data_.end() == it) {
LOG4CXX_WARN(logger_, "ProcessData requested for unknown connection");
*result = RESULT_FAIL;
- return std::list<ProtocolFramePtr>();
+ return ProtocolFramePtrList();
}
- std::vector<uint8_t> &connection_data = it->second;
+ std::vector<uint8_t>& connection_data = it->second;
connection_data.insert(connection_data.end(), data, data + tm_message_size);
LOG4CXX_DEBUG(logger_, "Total data size for connection "
<< connection_id << " is " << connection_data.size());
- std::list<ProtocolFramePtr> out_frames;
+ ProtocolFramePtrList out_frames;
*malformed_occurrence = 0;
*result = CreateFrame(connection_data, out_frames, *malformed_occurrence, connection_id);
LOG4CXX_DEBUG(logger_, "New data size for connection " << connection_id
@@ -117,7 +117,7 @@ void IncomingDataHandler::RemoveConnection(
}
uint32_t IncomingDataHandler::GetPacketSize(
- const ProtocolPacket::ProtocolHeader &header) {
+ const ProtocolPacket::ProtocolHeader& header) {
switch (header.version) {
case PROTOCOL_VERSION_1:
return header.dataSize + PROTOCOL_HEADER_V1_SIZE;
@@ -134,9 +134,9 @@ uint32_t IncomingDataHandler::GetPacketSize(
}
RESULT_CODE IncomingDataHandler::CreateFrame(
- std::vector<uint8_t> &incoming_data,
- std::list<ProtocolFramePtr> &out_frames,
- size_t &malformed_occurrence,
+ std::vector<uint8_t>& incoming_data,
+ ProtocolFramePtrList& out_frames,
+ size_t& malformed_occurrence,
const transport_manager::ConnectionUID connection_id) {
LOG4CXX_AUTO_TRACE(logger_);
std::vector<uint8_t>::iterator data_it = incoming_data.begin();
@@ -157,7 +157,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame(
++data_it;
--data_size;
LOG4CXX_DEBUG(logger_, "Moved to the next byte " << std::hex
- << static_cast<const void *>(&*data_it));
+ << static_cast<const void*>(&*data_it));
continue;
}
LOG4CXX_DEBUG(logger_, "Payload size " << header_.dataSize);
@@ -167,7 +167,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame(
++data_it;
--data_size;
LOG4CXX_DEBUG(logger_, "Moved to the next byte " << std::hex
- << static_cast<const void *>(&*data_it));
+ << static_cast<const void*>(&*data_it));
continue;
}
if (data_size < packet_size) {
@@ -179,7 +179,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame(
const RESULT_CODE deserialize_result =
frame->deserializePacket(&*data_it, packet_size);
LOG4CXX_DEBUG(
- logger_, "Deserialized frame " << frame);
+ logger_, "Deserialized frame " << frame);
if (deserialize_result != RESULT_OK) {
LOG4CXX_WARN(logger_, "Packet deserialization failed");
incoming_data.erase(incoming_data.begin(), data_it);
diff --git a/src/components/protocol_handler/src/multiframe_builder.cc b/src/components/protocol_handler/src/multiframe_builder.cc
new file mode 100644
index 0000000000..9dc2c08fa5
--- /dev/null
+++ b/src/components/protocol_handler/src/multiframe_builder.cc
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2015, Ford Motor Company
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * Neither the name of the Ford Motor Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "protocol_handler/multiframe_builder.h"
+
+#include <limits>
+
+#include "utils/logger.h"
+#include "utils/make_shared.h"
+#include "utils/lock.h"
+#include "utils/date_time.h"
+
+namespace protocol_handler {
+
+CREATE_LOGGERPTR_GLOBAL(logger_, "ProtocolHandler")
+
+MultiFrameBuilder::MultiFrameBuilder()
+ : consecutive_frame_wait_msecs_(0u) {
+}
+
+void MultiFrameBuilder::set_waiting_timeout(const uint32_t consecutive_frame_wait_msecs) {
+ consecutive_frame_wait_msecs_ = static_cast<int64_t>(consecutive_frame_wait_msecs);
+ if (consecutive_frame_wait_msecs == 0) {
+ LOG4CXX_WARN(logger_, "Waiting timout disabled");
+ } else {
+ LOG4CXX_DEBUG(logger_, "Waiting time in msec: " << consecutive_frame_wait_msecs_);
+ }
+}
+
+bool MultiFrameBuilder::AddConnection(const ConnectionID connection_id) {
+ LOG4CXX_DEBUG(logger_, "Adding connection_id: " << connection_id);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ const MultiFrameMap::const_iterator it = multiframes_map_.find(connection_id);
+ if (it != multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Exists connection_id: " << connection_id);
+ return false;
+ }
+ multiframes_map_[connection_id] = SessionToFrameMap();
+ return true;
+}
+
+bool MultiFrameBuilder::RemoveConnection(const ConnectionID connection_id) {
+ LOG4CXX_DEBUG(logger_, "Removing connection_id: " << connection_id);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ const MultiFrameMap::iterator it = multiframes_map_.find(connection_id);
+ if (it == multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Non-existent connection_id: " << connection_id);
+ return false;
+ }
+ const SessionToFrameMap& session_to_frame_map = it->second;
+ if (!session_to_frame_map.empty()) {
+ // FIXME(EZamakhov): Ask ReqManager - do we need to send GenericError
+ LOG4CXX_WARN(logger_, "For connection_id: " << connection_id
+ << " waiting: " << multiframes_map_);
+ }
+ multiframes_map_.erase(it);
+ return true;
+}
+
+ProtocolFramePtrList MultiFrameBuilder::PopMultiframes() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ ProtocolFramePtrList outpute_frame_list;
+ for (MultiFrameMap::iterator connection_it = multiframes_map_.begin();
+ connection_it != multiframes_map_.end(); ++connection_it) {
+ LOG4CXX_TRACE(logger_, "Step over connection: " << connection_it->first);
+ SessionToFrameMap& session_map = connection_it->second;
+
+ for (SessionToFrameMap::iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ LOG4CXX_TRACE(logger_, "Step over session: "
+ << static_cast<int>(session_it->first));
+ MessageIDToFrameMap& messageId_map = session_it->second;
+
+ MessageIDToFrameMap::iterator messageId_it = messageId_map.begin();
+ while (messageId_it != messageId_map.end()) {
+ LOG4CXX_TRACE(logger_, "Step over messageId: " << messageId_it->first);
+ ProtocolFrameData& frame_data = messageId_it->second;
+ ProtocolFramePtr frame = frame_data.frame;
+
+ if (frame &&
+ frame->frame_data() == FRAME_DATA_LAST_CONSECUTIVE &&
+ frame->payload_size() > 0u ) {
+ LOG4CXX_DEBUG(logger_, "Ready frame: " << frame);
+ outpute_frame_list.push_back(frame);
+ messageId_map.erase(messageId_it++);
+ continue;
+ }
+ if (consecutive_frame_wait_msecs_ != 0) {
+ LOG4CXX_TRACE(logger_, "Expiration verification");
+ const int64_t time_left =
+ date_time::DateTime::calculateTimeSpan(frame_data.append_time);
+ LOG4CXX_DEBUG(logger_, "mSecs left: " << time_left);
+ if (time_left >= consecutive_frame_wait_msecs_) {
+ LOG4CXX_WARN(logger_, "Expired frame: " << frame);
+ outpute_frame_list.push_back(frame);
+ messageId_map.erase(messageId_it++);
+ continue;
+ }
+ }
+ ++messageId_it;
+ } // iteration over messageId_map
+ } // iteration over session_map
+ } // iteration over multiframes_map_
+ LOG4CXX_DEBUG(logger_, "Result frames count: " << outpute_frame_list.size());
+ return outpute_frame_list;
+}
+
+RESULT_CODE MultiFrameBuilder::AddFrame(const ProtocolFramePtr packet) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "Handling frame: " << packet);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ if (!packet) {
+ LOG4CXX_ERROR(logger_, "Skip empty frame");
+ return RESULT_FAIL;
+ }
+ switch (packet->frame_type()) {
+ case FRAME_TYPE_FIRST:
+ LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST");
+ return HandleFirstFrame(packet);
+ case FRAME_TYPE_CONSECUTIVE:
+ LOG4CXX_TRACE(logger_, "FRAME_TYPE_CONSECUTIVE");
+ return HandleConsecutiveFrame(packet);
+ default:
+ LOG4CXX_ERROR(logger_, "Frame is not FIRST or CONSECUTIVE :"
+ << packet);
+ break;
+ }
+ return RESULT_FAIL;
+}
+
+RESULT_CODE MultiFrameBuilder::HandleFirstFrame(const ProtocolFramePtr packet) {
+ DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_FIRST,
+ RESULT_FAIL);
+ LOG4CXX_DEBUG(logger_, "Waiting : " << multiframes_map_);
+ LOG4CXX_DEBUG(logger_, "Handling FIRST frame: " << packet);
+ if (packet->payload_size() != 0u) {
+ LOG4CXX_ERROR(logger_,
+ "First frame shall have no data:" << packet);
+ return RESULT_FAIL;
+ }
+
+ const ConnectionID connection_id = packet->connection_id();
+ MultiFrameMap::iterator connection_it = multiframes_map_.find(connection_id);
+ if (connection_it == multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Unknown connection_id: " << connection_id);
+ return RESULT_FAIL;
+ }
+ SessionToFrameMap& session_map = connection_it->second;
+
+ const SessionID session_id = packet->session_id();
+ // No need to verify session existance
+ MessageIDToFrameMap& messageId_map = session_map[session_id];
+
+ const MessageID message_id = packet->message_id();
+ MessageIDToFrameMap::iterator messageId_it = messageId_map.find(message_id);
+ if (messageId_it != messageId_map.end()) {
+ LOG4CXX_ERROR(logger_, "Already waiting message for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+ return RESULT_FAIL;
+ }
+
+ LOG4CXX_DEBUG(logger_, "Start waiting frames for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+ messageId_map[message_id] = {packet, date_time::DateTime::getCurrentTime()};
+ return RESULT_OK;
+}
+
+RESULT_CODE MultiFrameBuilder::HandleConsecutiveFrame(const ProtocolFramePtr packet) {
+ DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_CONSECUTIVE,
+ RESULT_FAIL);
+ LOG4CXX_DEBUG(logger_, "Handling CONSECUTIVE frame: " << packet);
+
+
+ const ConnectionID connection_id = packet->connection_id();
+ MultiFrameMap::iterator connection_it = multiframes_map_.find(connection_id);
+ if (connection_it == multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Unknown connection_id: " << connection_id);
+ return RESULT_FAIL;
+ }
+ SessionToFrameMap& session_map = connection_it->second;
+
+ const SessionID session_id = packet->session_id();
+ // No need to verify session existance
+ MessageIDToFrameMap& messageId_map = session_map[session_id];
+
+ const MessageID message_id = packet->message_id();
+ MessageIDToFrameMap::iterator messageId_it = messageId_map.find(message_id);
+ if (messageId_it == messageId_map.end()) {
+ LOG4CXX_ERROR(logger_, "No waiting message for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+ return RESULT_FAIL;
+ }
+
+ ProtocolFrameData& frame_data = messageId_it->second;
+ ProtocolFramePtr assembling_frame = frame_data.frame;
+ DCHECK_OR_RETURN(packet->message_id() == assembling_frame->message_id(),
+ RESULT_FAIL);
+
+ const uint8_t new_frame_data = packet->frame_data();
+ const bool is_last_consecutive = (new_frame_data ==
+ FRAME_DATA_LAST_CONSECUTIVE);
+
+ if (is_last_consecutive) {
+ // TODO(EZamakhov): implement count of frames and result size verification
+ LOG4CXX_DEBUG(logger_, "Last CONSECUTIVE frame");
+ } else {
+ uint8_t previous_frame_data = assembling_frame->frame_data();
+ if (previous_frame_data == std::numeric_limits<uint8_t>::max()) {
+ previous_frame_data = 0u;
+ }
+ // The next frame data is bigger at 1
+ if (new_frame_data != (previous_frame_data + 1)) {
+ LOG4CXX_ERROR(logger_,
+ "Unexpected CONSECUTIVE frame for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id
+ << ", frame: " << packet);
+ return RESULT_FAIL;
+ }
+ }
+
+ assembling_frame->set_frame_data(new_frame_data);
+
+ LOG4CXX_DEBUG(logger_,
+ "Appending " << packet->data_size() << " bytes "
+ << "; frame_data " << static_cast<int>(new_frame_data)
+ << "; for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+
+ if (assembling_frame->appendData(packet->data(),
+ packet->data_size()) != RESULT_OK) {
+ LOG4CXX_ERROR(logger_, "Failed to append frame for multiframe message.");
+ return RESULT_FAIL;
+ }
+ frame_data.append_time = date_time::DateTime::getCurrentTime();
+ return RESULT_OK;
+}
+
+} // namespace protocol_handler
diff --git a/src/components/protocol_handler/src/protocol_handler_impl.cc b/src/components/protocol_handler/src/protocol_handler_impl.cc
index dccfffdf04..b3fb337703 100644
--- a/src/components/protocol_handler/src/protocol_handler_impl.cc
+++ b/src/components/protocol_handler/src/protocol_handler_impl.cc
@@ -63,11 +63,12 @@ uint8_t SupportedSDLProtocolVersion();
const size_t kStackSize = 32768;
-ProtocolHandlerImpl::ProtocolHandlerImpl(
- transport_manager::TransportManager *transport_manager_param,
- size_t message_frequency_time, size_t message_frequency_count,
- bool malformed_message_filtering,
- size_t malformed_message_frequency_time, size_t malformed_message_frequency_count)
+ProtocolHandlerImpl::ProtocolHandlerImpl(transport_manager::TransportManager *transport_manager_param,
+ size_t message_frequency_time, size_t message_frequency_count,
+ bool malformed_message_filtering,
+ size_t malformed_message_frequency_time,
+ size_t malformed_message_frequency_count,
+ uint32_t multiframe_waiting_timeout)
: protocol_observers_(),
session_observer_(0),
transport_manager_(transport_manager_param),
@@ -117,6 +118,7 @@ ProtocolHandlerImpl::ProtocolHandlerImpl(
LOG4CXX_WARN(logger_, "Malformed message filtering is disabled."
<< "Connection will be close on first malformed message detection");
}
+ multiframe_builder_.set_waiting_timeout(multiframe_waiting_timeout);
}
ProtocolHandlerImpl::~ProtocolHandlerImpl() {
@@ -502,6 +504,11 @@ void ProtocolHandlerImpl::OnTMMessageReceiveFailed(
void ProtocolHandlerImpl::NotifySubscribers(const RawMessagePtr message) {
LOG4CXX_AUTO_TRACE(logger_);
sync_primitives::AutoLock lock(protocol_observers_lock_);
+ if (protocol_observers_.empty()) {
+ LOG4CXX_ERROR(
+ logger_,
+ "Cannot handle multiframe message: no IProtocolObserver is set.");
+ }
for (ProtocolObservers::iterator it = protocol_observers_.begin();
protocol_observers_.end() != it; ++it) {
(*it)->OnMessageReceived(message);
@@ -570,6 +577,7 @@ void ProtocolHandlerImpl::OnConnectionEstablished(
const transport_manager::DeviceInfo &device_info,
const transport_manager::ConnectionUID &connection_id) {
incoming_data_handler_.AddConnection(connection_id);
+ multiframe_builder_.AddConnection(connection_id);
}
void ProtocolHandlerImpl::OnConnectionClosed(
@@ -577,6 +585,7 @@ void ProtocolHandlerImpl::OnConnectionClosed(
incoming_data_handler_.RemoveConnection(connection_id);
message_meter_.ClearIdentifiers();
malformed_message_meter_.ClearIdentifiers();
+ multiframe_builder_.RemoveConnection(connection_id);
}
RESULT_CODE ProtocolHandlerImpl::SendFrame(const ProtocolFramePtr packet) {
@@ -705,21 +714,20 @@ RESULT_CODE ProtocolHandlerImpl::SendMultiFrameMessage(
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleMessage(ConnectionID connection_id,
- const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleMessage(const ProtocolFramePtr packet) {
DCHECK_OR_RETURN(packet, RESULT_UNKNOWN);
LOG4CXX_DEBUG(logger_, "Handling message " << packet);
switch (packet->frame_type()) {
case FRAME_TYPE_CONTROL:
LOG4CXX_TRACE(logger_, "FRAME_TYPE_CONTROL");
- return HandleControlMessage(connection_id, packet);
+ return HandleControlMessage(packet);
case FRAME_TYPE_SINGLE:
LOG4CXX_TRACE(logger_, "FRAME_TYPE_SINGLE");
- return HandleSingleFrameMessage(connection_id, packet);
+ return HandleSingleFrameMessage(packet);
case FRAME_TYPE_FIRST:
case FRAME_TYPE_CONSECUTIVE:
LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST or FRAME_TYPE_CONSECUTIVE");
- return HandleMultiFrameMessage(connection_id, packet);
+ return HandleMultiFrameMessage(packet);
default: {
LOG4CXX_WARN(logger_, "Unknown frame type"
<< packet->frame_type());
@@ -729,8 +737,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleMessage(ConnectionID connection_id,
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(
- ConnectionID connection_id, const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(const ProtocolFramePtr packet) {
LOG4CXX_AUTO_TRACE(logger_);
LOG4CXX_DEBUG(logger_,
@@ -745,7 +752,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(
}
const uint32_t connection_key =
- session_observer_->KeyFromPair(connection_id, packet->session_id());
+ session_observer_->KeyFromPair(packet->connection_id(), packet->session_id());
const RawMessagePtr rawMessage(
new RawMessage(connection_key,
@@ -773,8 +780,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(
- ConnectionID connection_id, const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(const ProtocolFramePtr packet) {
LOG4CXX_AUTO_TRACE(logger_);
if (!session_observer_) {
@@ -782,100 +788,14 @@ RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(
return RESULT_FAIL;
}
- const uint32_t key = session_observer_->KeyFromPair(connection_id,
- packet->session_id());
- LOG4CXX_DEBUG(
- logger_,
- "Packet " << packet << "; session id " << static_cast<int32_t>(key));
-
- if (packet->frame_type() == FRAME_TYPE_FIRST) {
- LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST");
- // First frame has no data
- DCHECK_OR_RETURN(packet->frame_data() == 0u, RESULT_FAIL);
- // We can not handle more than one multiframe with the same key
- DCHECK_OR_RETURN(incomplete_multi_frame_messages_.count(key) == 0,
- RESULT_FAIL);
- incomplete_multi_frame_messages_[key] = packet;
- return RESULT_OK;
+ if (multiframe_builder_.AddFrame(packet) != RESULT_OK) {
+ LOG4CXX_WARN(logger_, "Frame assembling issue");
}
- DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_CONSECUTIVE, RESULT_FAIL)
-
- MultiFrameMap::iterator it = incomplete_multi_frame_messages_.find(key);
- if (it == incomplete_multi_frame_messages_.end()) {
- LOG4CXX_ERROR(logger_,
- "Frame of multiframe message for non-existing session id " << key);
- return RESULT_FAIL;
- }
-
- ProtocolFramePtr& assembling_frame = it->second;
- const uint8_t previous_frame_data = assembling_frame->frame_data();
- const uint8_t new_frame_data = packet->frame_data();
-
- const bool is_last_consecutive = (new_frame_data == FRAME_DATA_LAST_CONSECUTIVE);
- // The next frame data is bigger at 1
- DCHECK_OR_RETURN((new_frame_data == (previous_frame_data + 1)) ||
- // except the last consecutive frame
- is_last_consecutive, RESULT_FAIL);
-
- assembling_frame->set_frame_data(new_frame_data);
-
- LOG4CXX_DEBUG(logger_,
- "Appending " << packet->data_size() << " bytes "
- << "; frame_data " << static_cast<int>(new_frame_data)
- << "; connection key " << key);
-
- DCHECK_OR_RETURN(packet->message_id() == assembling_frame->message_id(), RESULT_FAIL);
- if (assembling_frame->appendData(packet->data(), packet->data_size()) != RESULT_OK) {
- LOG4CXX_ERROR(logger_, "Failed to append frame for multiframe message.");
- return RESULT_FAIL;
- }
-
- if (is_last_consecutive) {
- LOG4CXX_DEBUG(
- logger_,
- "Last frame of multiframe message size " << packet->data_size()
- << "; connection key " << key);
- {
- sync_primitives::AutoLock lock(protocol_observers_lock_);
- if (protocol_observers_.empty()) {
- LOG4CXX_ERROR(
- logger_,
- "Cannot handle multiframe message: no IProtocolObserver is set.");
- return RESULT_FAIL;
- }
- }
- const uint32_t connection_key =
- session_observer_->KeyFromPair(connection_id,
- assembling_frame->session_id());
- LOG4CXX_DEBUG(logger_, "Result frame" << assembling_frame <<
- "for connection "<< connection_key);
- const RawMessagePtr rawMessage(
- new RawMessage(connection_key,
- assembling_frame->protocol_version(),
- assembling_frame->data(),
- assembling_frame->total_data_bytes(),
- assembling_frame->service_type(),
- assembling_frame->payload_size()));
- DCHECK_OR_RETURN(rawMessage, RESULT_FAIL);
-
-#ifdef TIME_TESTER
- if (metric_observer_) {
- PHMetricObserver::MessageMetric *metric =
- new PHMetricObserver::MessageMetric();
- metric->raw_msg = rawMessage;
- metric_observer_->EndMessageProcess(metric);
- }
-#endif // TIME_TESTER
- // TODO(EZamakhov): check service in session
- NotifySubscribers(rawMessage);
- incomplete_multi_frame_messages_.erase(it);
- }
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(
- ConnectionID connection_id, const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(const ProtocolFramePtr packet) {
LOG4CXX_AUTO_TRACE(logger_);
if (!session_observer_) {
@@ -885,17 +805,16 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(
switch (packet->frame_data()) {
case FRAME_DATA_START_SERVICE:
- return HandleControlMessageStartSession(connection_id, *(packet.get()));
+ return HandleControlMessageStartSession(*packet);
case FRAME_DATA_END_SERVICE:
- return HandleControlMessageEndSession(connection_id, *(packet.get()));
+ return HandleControlMessageEndSession(*packet);
case FRAME_DATA_HEART_BEAT: {
- LOG4CXX_DEBUG(logger_,
- "Received heart beat for connection " << connection_id);
- return HandleControlMessageHeartBeat(connection_id, *(packet.get()));
+ LOG4CXX_TRACE(logger_, "FRAME_DATA_HEART_BEAT");
+ return HandleControlMessageHeartBeat(*packet);
}
case FRAME_DATA_HEART_BEAT_ACK: {
LOG4CXX_DEBUG(logger_, "Received heart beat ack from mobile app"
- " for connection " << connection_id);
+ " for connection " << packet->connection_id());
return RESULT_OK;
}
default:
@@ -922,14 +841,14 @@ uint32_t get_hash_id(const ProtocolPacket &packet) {
return hash_le == HASH_ID_NOT_SUPPORTED ? HASH_ID_WRONG : hash_le;
}
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessageEndSession(
- ConnectionID connection_id, const ProtocolPacket &packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessageEndSession(const ProtocolPacket &packet) {
LOG4CXX_AUTO_TRACE(logger_);
const uint8_t current_session_id = packet.session_id();
const uint32_t hash_id = get_hash_id(packet);
const ServiceType service_type = ServiceTypeFromByte(packet.service_type());
+ const ConnectionID connection_id = packet.connection_id();
const uint32_t session_key = session_observer_->OnSessionEndedCallback(
connection_id, current_session_id, hash_id, service_type);
@@ -1025,8 +944,7 @@ class StartSessionHandler : public security_manager::SecurityManagerListener {
} // namespace
#endif // ENABLE_SECURITY
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(
- ConnectionID connection_id, const ProtocolPacket &packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(const ProtocolPacket &packet) {
LOG4CXX_DEBUG(logger_,
"Protocol version:" <<
static_cast<int>(packet.protocol_version()));
@@ -1043,6 +961,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(
DCHECK(session_observer_);
uint32_t hash_id;
+ const ConnectionID connection_id = packet.connection_id();
const uint32_t session_id = session_observer_->OnSessionStartedCallback(
connection_id, packet.session_id(), service_type, protection, &hash_id);
@@ -1100,8 +1019,8 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(
- ConnectionID connection_id, const ProtocolPacket &packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(const ProtocolPacket &packet) {
+ const ConnectionID connection_id = packet.connection_id();
LOG4CXX_DEBUG(
logger_,
"Sending heart beat acknowledgment for connection " << connection_id);
@@ -1124,6 +1043,41 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(
}
}
+void ProtocolHandlerImpl::PopValideAndExpirateMultiframes() {
+ const ProtocolFramePtrList& frame_list = multiframe_builder_.PopMultiframes();
+ for (ProtocolFramePtrList::const_iterator it = frame_list.begin();
+ it != frame_list.end(); ++it) {
+ const ProtocolFramePtr frame = *it;
+ DCHECK(frame);
+ if(!frame) {
+ continue;
+ }
+
+ const uint32_t connection_key =
+ session_observer_->KeyFromPair(frame->connection_id(), frame->session_id());
+ LOG4CXX_DEBUG(logger_, "Result frame" << frame <<
+ "for connection "<< connection_key);
+ const RawMessagePtr rawMessage(
+ new RawMessage(connection_key,
+ frame->protocol_version(),
+ frame->data(),
+ frame->total_data_bytes(),
+ frame->service_type(),
+ frame->payload_size()));
+ DCHECK(rawMessage);
+
+#ifdef TIME_TESTER
+ if (metric_observer_) {
+ PHMetricObserver::MessageMetric *metric =
+ new PHMetricObserver::MessageMetric();
+ metric->raw_msg = rawMessage;
+ metric_observer_->EndMessageProcess(metric);
+ }
+#endif // TIME_TESTER
+ NotifySubscribers(rawMessage);
+ }
+}
+
bool ProtocolHandlerImpl::TrackMessage(const uint32_t& connection_key) {
LOG4CXX_AUTO_TRACE(logger_);
if (message_frequency_time_ > 0u &&
@@ -1203,7 +1157,8 @@ void ProtocolHandlerImpl::Handle(
FRAME_TYPE_CONTROL == message->frame_type() ||
FRAME_TYPE_FIRST == message->frame_type()) {
LOG4CXX_DEBUG(logger_, "Packet: dataSize " << message->data_size());
- HandleMessage(message->connection_id(), message);
+ HandleMessage(message);
+ PopValideAndExpirateMultiframes();
} else {
LOG4CXX_WARN(logger_,
"handleMessagesFromMobileApp() - incorrect or NULL data");
diff --git a/src/components/protocol_handler/src/protocol_packet.cc b/src/components/protocol_handler/src/protocol_packet.cc
index 1ed6e05699..8675d7543e 100644
--- a/src/components/protocol_handler/src/protocol_packet.cc
+++ b/src/components/protocol_handler/src/protocol_packet.cc
@@ -33,6 +33,7 @@
#include <stdint.h>
#include <memory.h>
#include <new>
+#include <memory>
#include <cstring>
#include <limits>
@@ -247,7 +248,7 @@ RESULT_CODE ProtocolPacket::ProtocolHeaderValidator::validate(
}
ProtocolPacket::ProtocolPacket()
- : payload_size_(0), connection_id_(0) {
+ : payload_size_(0u), connection_id_(0u) {
}
ProtocolPacket::ProtocolPacket(ConnectionID connection_id,
@@ -382,7 +383,7 @@ RESULT_CODE ProtocolPacket::deserializePacket(
dataPayloadSize = messageSize - offset;
}
- uint8_t *data = NULL;
+ uint8_t* data = NULL;
if (dataPayloadSize) {
data = new (std::nothrow) uint8_t[dataPayloadSize];
if (!data) {
@@ -401,6 +402,7 @@ RESULT_CODE ProtocolPacket::deserializePacket(
total_data_bytes |= data[3];
set_total_data_bytes(total_data_bytes);
if (0 == packet_data_.data) {
+ delete[] data;
return RESULT_FAIL;
}
} else {
@@ -482,7 +484,7 @@ uint32_t ProtocolPacket::total_data_bytes() const {
return packet_data_.totalDataBytes;
}
-uint8_t ProtocolPacket::connection_id() const {
+ConnectionID ProtocolPacket::connection_id() const {
return connection_id_;
}
diff --git a/src/components/protocol_handler/test/CMakeLists.txt b/src/components/protocol_handler/test/CMakeLists.txt
index 6e12eeb581..3cb0dfc106 100644
--- a/src/components/protocol_handler/test/CMakeLists.txt
+++ b/src/components/protocol_handler/test/CMakeLists.txt
@@ -52,6 +52,7 @@ set(SOURCES
protocol_handler_tm_test.cc
protocol_packet_test.cc
protocol_payload_test.cc
+ multiframe_builder_test.cc
)
create_test("protocol_handler_test" "${SOURCES}" "${LIBRARIES}")
diff --git a/src/components/protocol_handler/test/include/protocol_handler_mock.h b/src/components/protocol_handler/test/include/protocol_handler_mock.h
index 41b7c491a4..9ad3545ceb 100644
--- a/src/components/protocol_handler/test/include/protocol_handler_mock.h
+++ b/src/components/protocol_handler/test/include/protocol_handler_mock.h
@@ -163,8 +163,10 @@ class SessionObserverMock : public protocol_handler::SessionObserver {
MOCK_METHOD3(ProtocolVersionUsed,
bool( uint32_t connection_id,
uint8_t session_id, uint8_t& protocol_version));
+#ifdef ENABLE_SECURITY
MOCK_CONST_METHOD1(GetHandshakeContext,
security_manager::SSLContext::HandshakeContext (const uint32_t key) );
+#endif // ENABLE_SECURITY
};
#ifdef ENABLE_SECURITY
diff --git a/src/components/protocol_handler/test/multiframe_builder_test.cc b/src/components/protocol_handler/test/multiframe_builder_test.cc
new file mode 100644
index 0000000000..d1910c18e1
--- /dev/null
+++ b/src/components/protocol_handler/test/multiframe_builder_test.cc
@@ -0,0 +1,522 @@
+/*
+ * Copyright (c) 2015, Ford Motor Company
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * Neither the name of the Ford Motor Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <gtest/gtest.h>
+#include <vector>
+#include <utility>
+#include <limits>
+#include "utils/make_shared.h"
+#include "protocol_handler/multiframe_builder.h"
+
+namespace test {
+namespace components {
+namespace protocol_handler_test {
+
+using namespace protocol_handler;
+
+typedef std::vector<ConnectionID> ConnectionList;
+typedef std::vector<uint8_t> UCharDataVector;
+
+struct MutiframeData {
+ UCharDataVector binary_data;
+ ProtocolFramePtrList multiframes;
+};
+
+/**
+ *\brief Map of MutiframeData by MessageID key
+ */
+typedef std::map<MessageID, MutiframeData> MessageIDToMutiframeDataTestMap;
+/**
+ *\brief Map of MessageIDToMutiframeDataMap by SessionID key
+ */
+typedef std::map<SessionID, MessageIDToMutiframeDataTestMap> SessionToMutiframeDataTestMap;
+/**
+ *\brief Map of SessionToMutiframeDataMap by ConnectionID key
+ */
+typedef std::map<ConnectionID, SessionToMutiframeDataTestMap> MultiFrameTestMap;
+
+template<typename IntegerType>
+std::vector<IntegerType> getTestVector() {
+ // Prepare array with a few minimals, middle and a few maximum values
+ const IntegerType array[] = {
+ std::numeric_limits<IntegerType>::min(),
+ std::numeric_limits<IntegerType>::min() + 1,
+ std::numeric_limits<IntegerType>::max() / 2,
+ std::numeric_limits<IntegerType>::max() - 1 ,
+ std::numeric_limits<IntegerType>::max()
+ };
+ return std::vector<IntegerType>(array,
+ array + sizeof(array) / sizeof(array[0]) );
+}
+template<typename IntegerType>
+struct Incrementor {
+ IntegerType value;
+ Incrementor(const IntegerType value = 0u)
+ : value(value) {
+ }
+ IntegerType operator() () {
+ return ++value;
+ }
+};
+
+
+class MultiFrameBuilderTest : public ::testing::Test {
+ protected:
+ void SetUp() OVERRIDE {
+
+ const std::vector<ConnectionID> connections = getTestVector<ConnectionID>();
+ const std::vector<SessionID> sessions = getTestVector<SessionID>();
+ const std::vector<MessageID> messages = getTestVector<MessageID>();
+
+ MutiframeData some_data;
+
+ const uint8_t protocol_version = PROTOCOL_VERSION_2;
+ const uint8_t service_type = SERVICE_TYPE_RPC;
+
+ // We need 255+ messages for rolling over max uint8_t value
+ int multi_frames_count = std::numeric_limits<uint8_t>::max() * 2;
+
+ // Prepare C connections with S sessions with M messages data
+ for (size_t c = 0; c < connections.size(); ++c) {
+ const ConnectionID connection_id = connections[c];
+
+ SessionToMutiframeDataTestMap sessions_map;
+ for (size_t s = 0; s < sessions.size(); ++s) {
+ const SessionID session_id = sessions[s];
+
+ MessageIDToMutiframeDataTestMap messages_map;
+ for (size_t m = 0; m < messages.size(); ++m) {
+ const MessageID message_id = messages[m];
+
+ UCharDataVector& data_vector = some_data.binary_data;
+ // Sahll not be 1 consecutive frame
+ ASSERT_GT(multi_frames_count, 1);
+ data_vector.resize(++multi_frames_count * mtu_);
+
+ std::generate(data_vector.begin(), data_vector.end(), Incrementor<uint8_t>(0u));
+
+ PrepareMultiFrames(connection_id,
+ protocol_version,
+ service_type,
+ session_id,
+ message_id,
+ mtu_,
+ data_vector,
+ some_data.multiframes);
+ messages_map.insert(std::make_pair(message_id, some_data));
+ }
+ sessions_map.insert(std::make_pair(session_id, messages_map));
+ }
+ test_data_map_.insert(std::make_pair(connection_id, sessions_map));
+ }
+ }
+
+ // Support method for first and consecutive frame disassembling
+ static void PrepareMultiFrames(
+ const ConnectionID connection_id,
+ const uint8_t protocol_version,
+ const uint8_t service_type,
+ const uint8_t session_id,
+ const uint32_t message_id,
+ const size_t max_frame_size,
+ const UCharDataVector& data,
+ ProtocolFramePtrList& out_frames);
+
+ void AddConnection(const ConnectionID connection_id);
+
+ void AddConnections();
+
+ void VerifyConsecutiveAdd(const MutiframeData& multiframe_data);
+
+ MultiFrameBuilder multiframe_builder_;
+ MultiFrameTestMap test_data_map_;
+ static size_t mtu_;
+};
+
+size_t MultiFrameBuilderTest::mtu_ = 10;
+
+TEST_F(MultiFrameBuilderTest, Pop_Frames_From_Empty_builder) {
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes());
+}
+
+TEST_F(MultiFrameBuilderTest, Pop_Frames_with_existing_connections) {
+ AddConnections();
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes());
+}
+
+TEST_F(MultiFrameBuilderTest, Add_EmptyFrame) {
+ EXPECT_EQ(RESULT_FAIL,
+ multiframe_builder_.AddFrame(ProtocolFramePtr()));
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes());
+}
+
+TEST_F(MultiFrameBuilderTest, Add_NonSingleOrConsecutive_Frames) {
+ UCharDataVector types;
+ types.reserve(std::numeric_limits<uint8_t>::max());
+ for (uint8_t type = std::numeric_limits<uint8_t>::min();
+ type < std::numeric_limits<uint8_t>::max(); ++type) {
+ if (type != FRAME_TYPE_FIRST &&
+ type != FRAME_TYPE_CONSECUTIVE) {
+ types.push_back(type);
+ }
+ }
+
+ for (UCharDataVector::iterator it = types.begin(); it != types.end(); ++it) {
+ const uint8_t frame_type = *it;
+ const ProtocolFramePtr unexpected_frame(
+ new ProtocolPacket( 0u, PROTOCOL_VERSION_3, PROTECTION_OFF, frame_type,
+ SERVICE_TYPE_RPC, FRAME_DATA_FIRST, 0u, 0u, 0u));
+ EXPECT_EQ(RESULT_FAIL,
+ multiframe_builder_.AddFrame(unexpected_frame))
+ << "Unexpected frame: " << unexpected_frame;
+
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes())
+ << "Unexpected frame: " << unexpected_frame;
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, Add_FirstFrames_NoConnections) {
+ for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ connection_it != test_data_map_.end(); ++connection_it) {
+ SessionToMutiframeDataTestMap& session_map = connection_it->second;
+ const ConnectionID connection_id = connection_it->first;
+
+ for (SessionToMutiframeDataTestMap::iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ MessageIDToMutiframeDataTestMap& messageId_map = session_it->second;
+
+ for (MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin();
+ messageId_it != messageId_map.end(); ++messageId_it) {
+ const MutiframeData& multiframe_data = messageId_it->second;
+
+ const ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ ASSERT_FALSE(multiframes.empty());
+ const ProtocolFramePtr first_frame = multiframes.front();
+ ASSERT_TRUE(first_frame);
+ EXPECT_EQ(RESULT_FAIL,
+ multiframe_builder_.AddFrame(first_frame))
+ << "Unexisting connection " << connection_id
+ << "- to be skipped first frame: " << first_frame;
+
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes())
+ << "First frame: " << first_frame;
+ }
+ }
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, Add_FirstFrames_only) {
+ AddConnections();
+ for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ connection_it != test_data_map_.end(); ++connection_it) {
+ SessionToMutiframeDataTestMap& session_map = connection_it->second;
+
+ for (SessionToMutiframeDataTestMap::iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ MessageIDToMutiframeDataTestMap& messageId_map = session_it->second;
+
+ for (MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin();
+ messageId_it != messageId_map.end(); ++messageId_it) {
+ const MutiframeData& multiframe_data = messageId_it->second;
+
+ const ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ ASSERT_FALSE(multiframes.empty());
+ const ProtocolFramePtr first_frame = multiframes.front();
+ ASSERT_TRUE(first_frame);
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(first_frame))
+ << "First frame: " << first_frame;
+
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes())
+ << "First frame: " << first_frame;
+ }
+ }
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, Add_ConsecutiveFrame) {
+ ASSERT_FALSE(test_data_map_.empty());
+ const ConnectionID& connection_id = test_data_map_.begin()->first;
+ SessionToMutiframeDataTestMap& session_map = test_data_map_.begin()->second;
+
+ AddConnection(connection_id);
+
+ ASSERT_FALSE(session_map.empty());
+ MessageIDToMutiframeDataTestMap& messageId_map = session_map.begin()->second;
+
+ ASSERT_FALSE(messageId_map.empty());
+ const MutiframeData& multiframe_data = messageId_map.begin()->second;
+
+ VerifyConsecutiveAdd(multiframe_data);
+}
+
+TEST_F(MultiFrameBuilderTest, Add_ConsecutiveFrames_OneByOne) {
+ AddConnections();
+ for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ connection_it != test_data_map_.end(); ++connection_it) {
+ SessionToMutiframeDataTestMap& session_map = connection_it->second;
+
+ for (SessionToMutiframeDataTestMap::iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ MessageIDToMutiframeDataTestMap& messageId_map = session_it->second;
+
+ for (MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin();
+ messageId_it != messageId_map.end(); ++messageId_it) {
+ const MutiframeData& multiframe_data = messageId_it->second;
+
+ VerifyConsecutiveAdd(multiframe_data);
+ }
+ }
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, Add_ConsecutiveFrames_per1) {
+ AddConnections();
+ ASSERT_FALSE(test_data_map_.empty());
+ // After processing each frame we remove it from messageId_it
+ // After processing all session data - it removes from session_map
+ // and so on
+ // TODO(Ezamakhov): optimize speed of test by skipping erasing data
+ while (!test_data_map_.empty()) {
+ MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ while (connection_it != test_data_map_.end()) {
+ SessionToMutiframeDataTestMap& session_map = connection_it->second;
+
+ SessionToMutiframeDataTestMap::iterator session_it = session_map.begin();
+ while (session_it != session_map.end()) {
+ MessageIDToMutiframeDataTestMap& messageId_map = session_it->second;
+
+ MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin();
+ while (messageId_it != messageId_map.end()) {
+
+ MutiframeData& multiframe_data = messageId_it->second;
+ ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ ASSERT_FALSE(multiframes.empty());
+
+ const ProtocolFramePtr frame = multiframes.front();
+ ASSERT_TRUE(frame);
+
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(frame)) << "Frame: " << frame;
+
+ multiframes.pop_front();
+
+ // If all frames are assembled
+ if (multiframes.empty()) {
+ const ProtocolFramePtrList& multiframe_list
+ = multiframe_builder_.PopMultiframes();
+ ASSERT_EQ(multiframe_list.size(), 1u);
+
+ const ProtocolFramePtr result_multiframe = multiframe_list.front();
+ const UCharDataVector& binary_data = multiframe_data.binary_data;
+ EXPECT_EQ(binary_data,
+ UCharDataVector(result_multiframe->data(),
+ result_multiframe->data() +
+ result_multiframe->payload_size()));
+ messageId_map.erase(messageId_it++);
+ } else {
+ // Multiframe is not completed
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes()) << "Frame: " << frame;
+ ++messageId_it;
+ }
+ }
+ if (messageId_map.empty()) {
+ session_map.erase(session_it++);
+ } else {
+ ++session_it;
+ }
+ }
+ if (session_map.empty()) {
+ test_data_map_.erase(connection_it++);
+ } else {
+ ++connection_it;
+ }
+ }
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, FrameExpired_OneMSec) {
+ multiframe_builder_.set_waiting_timeout(1);
+
+ ASSERT_FALSE(test_data_map_.empty());
+ const ConnectionID& connection_id = test_data_map_.begin()->first;
+ SessionToMutiframeDataTestMap& session_map = test_data_map_.begin()->second;
+
+ AddConnection(connection_id);
+
+ ASSERT_FALSE(session_map.empty());
+ MessageIDToMutiframeDataTestMap& messageId_map = session_map.begin()->second;
+
+ ASSERT_FALSE(messageId_map.empty());
+ const MutiframeData& multiframe_data = messageId_map.begin()->second;
+
+ const ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ ASSERT_FALSE(multiframes.empty());
+ const ProtocolFramePtr first_frame = multiframes.front();
+ ASSERT_TRUE(first_frame);
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(first_frame))
+ << "First frame: " << first_frame;
+
+ // Wait frame expire
+ usleep(1000);
+ const ProtocolFramePtrList& list = multiframe_builder_.PopMultiframes();
+ ASSERT_FALSE(list.empty());
+ EXPECT_EQ(first_frame,
+ list.front());
+}
+
+/*
+ * Testing support methods
+ */
+
+void MultiFrameBuilderTest::VerifyConsecutiveAdd(const MutiframeData& multiframe_data) {
+ const ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ const UCharDataVector& binary_data = multiframe_data.binary_data;
+ ASSERT_FALSE(multiframes.empty());
+
+ // Frame of multiframe loop
+ ProtocolFramePtrList::const_iterator it = multiframes.begin();
+ // Skip last final frame
+ const ProtocolFramePtrList::const_iterator it_last = --(multiframes.end());
+ while (it != it_last) {
+ const ProtocolFramePtr frame = *it;
+ ASSERT_TRUE(frame);
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(frame))
+ << "Non final CONSECUTIVE frame: " << frame;
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes())
+ << "Non final CONSECUTIVE frame: " << frame;
+ ++it;
+ // Skip last final frame
+ }
+
+ const ProtocolFramePtr final_frame = multiframes.back();
+
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(final_frame))
+ << "Final CONSECUTIVE frame: " << final_frame;
+
+ const ProtocolFramePtrList& multiframe_list
+ = multiframe_builder_.PopMultiframes();
+ ASSERT_EQ(multiframe_list.size(), 1u);
+
+ const ProtocolFramePtr result_multiframe = multiframe_list.front();
+ EXPECT_EQ(binary_data,
+ UCharDataVector(result_multiframe->data(),
+ result_multiframe->data() + result_multiframe->payload_size()));
+}
+
+void MultiFrameBuilderTest::PrepareMultiFrames(const ConnectionID connection_id,
+ const uint8_t protocol_version,
+ const uint8_t service_type,
+ const uint8_t session_id,
+ const uint32_t message_id,
+ const size_t max_payload_size,
+ const UCharDataVector& data,
+ ProtocolFramePtrList& out_frames) {
+ ASSERT_GT(max_payload_size, FIRST_FRAME_DATA_SIZE);
+ ASSERT_EQ(FIRST_FRAME_DATA_SIZE, 0x08);
+
+ // TODO(EZamakhov): move to the separate class
+ const size_t data_size = data.size();
+ // remainder of last frame
+ const size_t lastframe_remainder = data_size % max_payload_size;
+ // size of last frame (full fill or not)
+ const size_t lastframe_size =
+ lastframe_remainder > 0 ? lastframe_remainder : max_payload_size;
+
+ const size_t frames_count = data_size / max_payload_size +
+ // add last frame if not empty
+ (lastframe_remainder > 0 ? 1 : 0);
+
+ uint8_t out_data[FIRST_FRAME_DATA_SIZE];
+ out_data[0] = data_size >> 24;
+ out_data[1] = data_size >> 16;
+ out_data[2] = data_size >> 8;
+ out_data[3] = data_size;
+
+ out_data[4] = frames_count >> 24;
+ out_data[5] = frames_count >> 16;
+ out_data[6] = frames_count >> 8;
+ out_data[7] = frames_count;
+
+ ProtocolFramePtr first_frame(
+ new ProtocolPacket(
+ connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_FIRST,
+ service_type, FRAME_DATA_FIRST, session_id, FIRST_FRAME_DATA_SIZE,
+ message_id, out_data));
+ // Note: PHIMpl already prepare First frames the total_data_bytes on desirialization
+ first_frame->set_total_data_bytes(data_size);
+
+ out_frames.clear();
+ out_frames.push_back(first_frame);
+
+ for (size_t i = 0; i < frames_count; ++i) {
+ const bool is_last_frame = (i == (frames_count - 1));
+ const size_t frame_size = is_last_frame ? lastframe_size : max_payload_size;
+ const uint8_t data_type =
+ is_last_frame
+ ? FRAME_DATA_LAST_CONSECUTIVE
+ : (i % FRAME_DATA_MAX_CONSECUTIVE + 1);
+
+ const ProtocolFramePtr consecutive_frame(
+ new ProtocolPacket(
+ connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_CONSECUTIVE,
+ service_type, data_type, session_id, frame_size, message_id,
+ &data[max_payload_size * i]));
+ out_frames.push_back(consecutive_frame);
+ }
+}
+
+void MultiFrameBuilderTest::AddConnection(const ConnectionID connection_id) {
+ ASSERT_TRUE(multiframe_builder_.AddConnection(connection_id));
+}
+
+void MultiFrameBuilderTest::AddConnections() {
+ for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ connection_it != test_data_map_.end(); ++connection_it) {
+ const ConnectionID connection_id = connection_it->first;
+ ASSERT_TRUE(multiframe_builder_.AddConnection(connection_id));
+ }
+}
+
+} // namespace protocol_handler_test
+} // namespace components
+} // namespace test
diff --git a/src/components/protocol_handler/test/protocol_handler_tm_test.cc b/src/components/protocol_handler/test/protocol_handler_tm_test.cc
index 4ef070c52f..ea8f7b122e 100644
--- a/src/components/protocol_handler/test/protocol_handler_tm_test.cc
+++ b/src/components/protocol_handler/test/protocol_handler_tm_test.cc
@@ -64,12 +64,15 @@ class ProtocolHandlerImplTest : public ::testing::Test {
const size_t period_msec, const size_t max_messages,
bool malformed_message_filtering = false,
const size_t malformd_period_msec = 0u,
- const size_t malformd_max_messages = 0u) {
+ const size_t malformd_max_messages = 0u,
+ const int32_t multiframe_waiting_timeout = 0) {
protocol_handler_impl.reset(
new ProtocolHandlerImpl(&transport_manager_mock,
period_msec, max_messages,
malformed_message_filtering,
- malformd_period_msec, malformd_max_messages));
+ malformd_period_msec,
+ malformd_max_messages,
+ multiframe_waiting_timeout));
protocol_handler_impl->set_session_observer(&session_observer_mock);
tm_listener = protocol_handler_impl.get();
}
@@ -249,7 +252,7 @@ TEST_F(ProtocolHandlerImplTest, StartSession_Protected_SessionObserverReject) {
// For enabled protection callback shall use protection ON
const bool callback_protection_flag = PROTECTION_ON;
#else
- // For disabled protection callback shall ignore protection income flad and use protection OFF
+ // For disabled protection callback shall ignore protection income flag and use protection OFF
const bool callback_protection_flag = PROTECTION_OFF;
#endif // ENABLE_SECURITY
// expect ConnectionHandler check
diff --git a/src/components/time_tester/test/time_manager_test.cc b/src/components/time_tester/test/time_manager_test.cc
index 60f3bd6814..c5ad607a97 100644
--- a/src/components/time_tester/test/time_manager_test.cc
+++ b/src/components/time_tester/test/time_manager_test.cc
@@ -55,7 +55,7 @@ class StreamerMock : public Streamer {
TEST(TimeManagerTest, DISABLED_MessageProcess) {
//TODO(AK) APPLINK-13351 Disable due to refactor TimeTester
protocol_handler_test::TransportManagerMock transport_manager_mock;
- protocol_handler::ProtocolHandlerImpl protocol_handler_mock(&transport_manager_mock, 0, 0, 0, 0, 0);
+ protocol_handler::ProtocolHandlerImpl protocol_handler_mock(&transport_manager_mock, 0, 0, 0, 0, 0, 0);
TimeManager * time_manager = new TimeManager();
// Streamer will be deleted by Thread
StreamerMock* streamer_mock = new StreamerMock(time_manager);