summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorElisey Zamakhov <EZamakhov@luxoft.com>2015-11-03 16:26:16 +0300
committerElisey Zamakhov <EZamakhov@luxoft.com>2015-11-06 21:26:05 +0300
commitdc7d430a5f40c9f2b0e42cf5e29dfe1d3d98c1a4 (patch)
tree80e8f47e137199ba35daa4f88ac9d753fcff74f1
parent889599ab98c2ed4eac38a7832aabbf4a872e6005 (diff)
downloadsdl_core-dc7d430a5f40c9f2b0e42cf5e29dfe1d3d98c1a4.tar.gz
Implement Multiframe assembling by MessageID
Each Session resposible for assembling multiframes according to MessageID Implemented MultiframeBuilder support class Add MultiframeBuilder tests Integrated MultiframeBuilder usage in PHimpl with pop all expired frames on each handled frame on any session of any connection Added Configurable parameter in ConfigProfile for expirated time configuration Updated PASA and Genevi ini files Fixed crash on printing empty ProtocolFramePtr Added ProtocolFramePtrList, ConnectionID typedefs to protocol_packet.h Removed redundant ConnectionID usage in PHImpl Fixed memoty leak in ProtocolPacket Code style fixs in PHIMpl. IncomingDataHandler Issues:CRQ - APPLINK-17629, Defect fixed indirectly - APPLINK-17954
-rw-r--r--customer-specific/pasa/src/appMain/smartDeviceLink.ini2
-rw-r--r--src/appMain/life_cycle.cc3
-rw-r--r--src/appMain/smartDeviceLink.ini2
-rw-r--r--src/components/config_profile/include/config_profile/profile.h2
-rw-r--r--src/components/config_profile/src/profile.cc8
-rw-r--r--src/components/protocol_handler/CMakeLists.txt1
-rw-r--r--src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h19
-rw-r--r--src/components/protocol_handler/include/protocol_handler/multiframe_builder.h157
-rw-r--r--src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h54
-rw-r--r--src/components/protocol_handler/include/protocol_handler/protocol_packet.h12
-rw-r--r--src/components/protocol_handler/src/incoming_data_handler.cc38
-rw-r--r--src/components/protocol_handler/src/multiframe_builder.cc273
-rw-r--r--src/components/protocol_handler/src/protocol_handler_impl.cc189
-rw-r--r--src/components/protocol_handler/src/protocol_packet.cc8
-rw-r--r--src/components/protocol_handler/test/CMakeLists.txt1
-rw-r--r--src/components/protocol_handler/test/include/protocol_handler_mock.h2
-rw-r--r--src/components/protocol_handler/test/multiframe_builder_test.cc522
-rw-r--r--src/components/protocol_handler/test/protocol_handler_tm_test.cc9
-rw-r--r--src/components/time_tester/test/time_manager_test.cc2
19 files changed, 1111 insertions, 193 deletions
diff --git a/customer-specific/pasa/src/appMain/smartDeviceLink.ini b/customer-specific/pasa/src/appMain/smartDeviceLink.ini
index 3ceb4a2a7d..e473f0a9ae 100644
--- a/customer-specific/pasa/src/appMain/smartDeviceLink.ini
+++ b/customer-specific/pasa/src/appMain/smartDeviceLink.ini
@@ -163,6 +163,8 @@ MalformedMessageFiltering = true
; #MalformedFrequencyCount to Zero
MalformedFrequencyCount = 10
MalformedFrequencyTime = 1000
+; Timeout for waiting CONSECUTIVE frames of multiframe
+ExpectedConsecutiveFramesTimeout = 10000
[ApplicationManager]
ApplicationListUpdateTimeout = 2
diff --git a/src/appMain/life_cycle.cc b/src/appMain/life_cycle.cc
index 09095caff6..4888843588 100644
--- a/src/appMain/life_cycle.cc
+++ b/src/appMain/life_cycle.cc
@@ -99,7 +99,8 @@ bool LifeCycle::StartComponents() {
profile::Profile::instance()->message_frequency_count(),
profile::Profile::instance()->malformed_message_filtering(),
profile::Profile::instance()->malformed_frequency_time(),
- profile::Profile::instance()->malformed_frequency_count());
+ profile::Profile::instance()->malformed_frequency_count(),
+ profile::Profile::instance()->multiframe_waiting_timeout());
DCHECK(protocol_handler_ != NULL);
connection_handler_ =
diff --git a/src/appMain/smartDeviceLink.ini b/src/appMain/smartDeviceLink.ini
index c237ad7175..12d465c801 100644
--- a/src/appMain/smartDeviceLink.ini
+++ b/src/appMain/smartDeviceLink.ini
@@ -201,6 +201,8 @@ MalformedMessageFiltering = true
; #MalformedFrequencyCount to Zero
MalformedFrequencyCount = 10
MalformedFrequencyTime = 1000
+; Timeout for waiting CONSECUTIVE frames of multiframe
+ExpectedConsecutiveFramesTimeout = 10000
[ApplicationManager]
; Application list update timeout ms
diff --git a/src/components/config_profile/include/config_profile/profile.h b/src/components/config_profile/include/config_profile/profile.h
index e3ef1a82a7..7c8f3c3958 100644
--- a/src/components/config_profile/include/config_profile/profile.h
+++ b/src/components/config_profile/include/config_profile/profile.h
@@ -545,6 +545,8 @@ class Profile : public utils::Singleton<Profile> {
size_t malformed_frequency_time() const;
+ uint32_t multiframe_waiting_timeout() const;
+
uint16_t attempts_to_open_policy_db() const;
uint16_t open_attempt_timeout_ms() const;
diff --git a/src/components/config_profile/src/profile.cc b/src/components/config_profile/src/profile.cc
index 92c139ce8d..153c89b0d2 100644
--- a/src/components/config_profile/src/profile.cc
+++ b/src/components/config_profile/src/profile.cc
@@ -196,6 +196,7 @@ const char* kFrequencyTime = "FrequencyTime";
const char* kMalformedMessageFiltering = "MalformedMessageFiltering";
const char* kMalformedFrequencyCount = "MalformedFrequencyCount";
const char* kMalformedFrequencyTime = "MalformedFrequencyTime";
+const char* kExpectedConsecutiveFramesTimeout = "ExpectedConsecutiveFramesTimeout";
const char* kHashStringSizeKey = "HashStringSize";
const char* kUseDBForResumptionKey = "UseDBForResumption";
const char* kAttemptsToOpenResumptionDBKey = "AttemptsToOpenResumptionDB";
@@ -273,6 +274,7 @@ const size_t kDefaultFrequencyTime = 1000;
const bool kDefaulMalformedMessageFiltering = true;
const size_t kDefaultMalformedFrequencyCount = 10;
const size_t kDefaultMalformedFrequencyTime = 1000;
+const uint32_t kDefaultExpectedConsecutiveFramesTimeout = 10000;
const uint16_t kDefaultAttemptsToOpenPolicyDB = 5;
const uint16_t kDefaultOpenAttemptTimeoutMs = 500;
const uint32_t kDefaultAppIconsFolderMaxSize = 104857600;
@@ -709,6 +711,12 @@ size_t Profile::malformed_frequency_time() const {
kProtocolHandlerSection, kMalformedFrequencyTime);
return malformed_frequency_time;
}
+uint32_t Profile::multiframe_waiting_timeout() const {
+ uint32_t multiframe_waiting_timeout = 0;
+ ReadUIntValue(&multiframe_waiting_timeout, kDefaultExpectedConsecutiveFramesTimeout,
+ kProtocolHandlerSection, kExpectedConsecutiveFramesTimeout);
+ return multiframe_waiting_timeout;
+}
uint16_t Profile::attempts_to_open_policy_db() const {
return attempts_to_open_policy_db_;
diff --git a/src/components/protocol_handler/CMakeLists.txt b/src/components/protocol_handler/CMakeLists.txt
index 10a18c48b6..81ce371001 100644
--- a/src/components/protocol_handler/CMakeLists.txt
+++ b/src/components/protocol_handler/CMakeLists.txt
@@ -43,6 +43,7 @@ set(SOURCES
${COMPONENTS_DIR}/protocol_handler/src/protocol_handler_impl.cc
${COMPONENTS_DIR}/protocol_handler/src/protocol_packet.cc
${COMPONENTS_DIR}/protocol_handler/src/protocol_payload.cc
+ ${COMPONENTS_DIR}/protocol_handler/src/multiframe_builder.cc
)
set(LIBRARIES
diff --git a/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h b/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h
index 7be82843c7..b02e7de3bf 100644
--- a/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h
+++ b/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h
@@ -32,7 +32,6 @@
#ifndef SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_INCOMING_DATA_HANDLER_H_
#define SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_INCOMING_DATA_HANDLER_H_
-#include <list>
#include <map>
#include <vector>
#include "utils/macro.h"
@@ -53,7 +52,7 @@ class IncomingDataHandler {
* @brief Setting additional validator for checking malformed packets
* \param validator pointer
*/
- void set_validator(const ProtocolPacket::ProtocolHeaderValidator *const validator);
+ void set_validator(const ProtocolPacket::ProtocolHeaderValidator* const validator);
/**
* @brief Concatenate TM messages to ford frames and validate ford header data
* \param TM messages for converting to frames
@@ -65,9 +64,9 @@ class IncomingDataHandler {
* - RESULT_FAIL - packet serialization or validation error occurs
* \return list of complete, correct packets
*/
- std::list<ProtocolFramePtr> ProcessData(const RawMessage &tm_message,
- RESULT_CODE *result,
- size_t *malformed_occurrence);
+ ProtocolFramePtrList ProcessData(const RawMessage& tm_message,
+ RESULT_CODE* result,
+ size_t* malformed_occurrence);
/**
* @brief Add connection for data handling and verification
*/
@@ -83,7 +82,7 @@ class IncomingDataHandler {
/**
* @brief Returns size of frame to be formed from raw bytes.
*/
- static uint32_t GetPacketSize(const ProtocolPacket::ProtocolHeader &header);
+ static uint32_t GetPacketSize(const ProtocolPacket::ProtocolHeader& header);
/**
* @brief Try to create frame from incoming data
* \param incommung_data raw stream
@@ -95,16 +94,16 @@ class IncomingDataHandler {
* - RESULT_OK - one or more frames successfully created
* - RESULT_FAIL - packet serialization or validation error occurs
*/
- RESULT_CODE CreateFrame(std::vector<uint8_t> &incoming_data,
- std::list<ProtocolFramePtr> &out_frames,
- size_t &malformed_occurrence,
+ RESULT_CODE CreateFrame(std::vector<uint8_t>& incoming_data,
+ ProtocolFramePtrList& out_frames,
+ size_t& malformed_occurrence,
const transport_manager::ConnectionUID connection_id);
typedef std::map<transport_manager::ConnectionUID, std::vector<uint8_t> >
ConnectionsDataMap;
ConnectionsDataMap connections_data_;
ProtocolPacket::ProtocolHeader header_;
- const ProtocolPacket::ProtocolHeaderValidator *validator_;
+ const ProtocolPacket::ProtocolHeaderValidator* validator_;
bool last_portion_of_data_was_malformed_;
DISALLOW_COPY_AND_ASSIGN(IncomingDataHandler);
};
diff --git a/src/components/protocol_handler/include/protocol_handler/multiframe_builder.h b/src/components/protocol_handler/include/protocol_handler/multiframe_builder.h
new file mode 100644
index 0000000000..6ccf891b38
--- /dev/null
+++ b/src/components/protocol_handler/include/protocol_handler/multiframe_builder.h
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2015, Ford Motor Company
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * Neither the name of the Ford Motor Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_MULTIFRAME_BUILDER_H_
+#define SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_MULTIFRAME_BUILDER_H_
+
+#include <map>
+#include <ostream> // std::basic_ostream
+#include <iterator> // std::ostream_iterator
+#include <algorithm> // std::copy
+
+#include "utils/date_time.h"
+#include "protocol_handler/protocol_packet.h"
+
+/**
+ *\namespace protocol_handlerHandler
+ *\brief Namespace for SmartDeviceLink ProtocolHandler related functionality.
+ */
+namespace protocol_handler {
+/**
+ * \brief Session identifier - contains connection identifier and
+ * session_id from protocol (can be used as hash)
+ */
+// TODO(EZamakhov): move SessionID to protocol_handler/protocol_packet.h
+typedef uint8_t SessionID;
+/**
+ * \brief Message identifier - unique to the session messages
+ */
+// TODO(EZamakhov): move MessageID to protocol_handler/session_observer.h
+typedef uint32_t MessageID;
+
+struct ProtocolFrameData {
+ ProtocolFramePtr frame;
+ TimevalStruct append_time;
+};
+/**
+ *\brief Map of frames with last frame data for messages received in multiple frames.
+ */
+typedef std::map<MessageID, ProtocolFrameData> MessageIDToFrameMap;
+/**
+ *\brief Map of frames with last frame data for messages received in multiple frames.
+ */
+typedef std::map<SessionID, MessageIDToFrameMap> SessionToFrameMap;
+/**
+ *\brief Map of frames with last frame data for messages received in multiple frames.
+ */
+typedef std::map<ConnectionID, SessionToFrameMap> MultiFrameMap;
+
+/**
+ * \class MultiFrameBuilder
+ * \brief Class for assembling consecutive frames according to
+ * messageID to complete multiframes.
+ */
+class MultiFrameBuilder {
+ public:
+ /**
+ * @brief Constructor
+ */
+ MultiFrameBuilder();
+
+ /**
+ *\brief Set timeout of waiting CONSECUTIVE frames
+ */
+ void set_waiting_timeout(const uint32_t consecutive_frame_wait_msecs);
+
+ /**
+ * @brief Add connection for pending data
+ * @return true on success
+ */
+ bool AddConnection(const ConnectionID connection_id);
+
+ /**
+ * @brief Clear all data related to connection_id
+ * @return true on success
+ */
+ bool RemoveConnection(const ConnectionID connection_id);
+
+ /**
+ *\brief Pop assembled and expired frames
+ */
+ ProtocolFramePtrList PopMultiframes();
+
+ /**
+ *\brief Handle Single or Consecutive frame
+ * @return RESULT_OK on success, or RESULT_FAIL in case of any error
+ */
+ RESULT_CODE AddFrame(const ProtocolFramePtr packet);
+
+ private:
+ RESULT_CODE HandleFirstFrame(const ProtocolFramePtr packet);
+ RESULT_CODE HandleConsecutiveFrame(const ProtocolFramePtr packet);
+
+ // Map of frames with last frame data for messages received in multiple frames.
+ MultiFrameMap multiframes_map_;
+ int64_t consecutive_frame_wait_msecs_;
+};
+
+template<typename _CharT>
+std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
+ const protocol_handler::MultiFrameMap& map) {
+ if (map.empty()) {
+ stream << "{empty}";
+ return stream;
+ }
+ for (MultiFrameMap::const_iterator connection_it = map.begin();
+ connection_it != map.end(); ++connection_it) {
+ const SessionToFrameMap& session_map = connection_it->second;
+
+ for (SessionToFrameMap::const_iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ const MessageIDToFrameMap& messageId_map = session_it->second;
+
+ for (MessageIDToFrameMap::const_iterator messageId_it = messageId_map.begin();
+ messageId_it != messageId_map.end(); ++messageId_it) {
+ const ProtocolFrameData& frame_data = messageId_it->second;
+
+ stream << "ConnectionID: " << connection_it->first
+ << ", SessionID: " << static_cast<uint32_t>(session_it->first)
+ << ", MessageID: " << static_cast<uint32_t>(messageId_it->first)
+ << " msec, frame: " << frame_data.frame << std::endl;
+ }
+ }
+ }
+ return stream;
+}
+
+} // namespace protocol_handler
+#endif // SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_MULTIFRAME_BUILDER_H_
diff --git a/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h b/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h
index 260ad16ec7..fb10811f14 100644
--- a/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h
+++ b/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h
@@ -50,6 +50,7 @@
#include "protocol_handler/session_observer.h"
#include "protocol_handler/protocol_observer.h"
#include "protocol_handler/incoming_data_handler.h"
+#include "protocol_handler/multiframe_builder.h"
#include "transport_manager/common.h"
#include "transport_manager/transport_manager.h"
#include "transport_manager/transport_manager_listener_empty.h"
@@ -142,14 +143,17 @@ class ProtocolHandlerImpl
* \param malformed_message_frequency_time used as time for malformed flood filtering
* \param malformed_message_frequency_count used as maximum value of malformed
* messages per message_frequency_time period
+ * \param multiframe_waiting_timeout used as maximum time of consecutive
+ * frames handling
* message exchange.
*/
explicit ProtocolHandlerImpl(
- transport_manager::TransportManager *transport_manager_param,
- size_t message_frequency_time, size_t message_frequency_count,
- bool malformed_message_filtering,
- size_t malformed_message_frequency_time,
- size_t malformed_message_frequency_count);
+ transport_manager::TransportManager *transport_manager_param,
+ size_t message_frequency_time, size_t message_frequency_count,
+ bool malformed_message_filtering,
+ size_t malformed_message_frequency_time,
+ size_t malformed_message_frequency_count,
+ uint32_t multiframe_waiting_timeout);
/**
* \brief Destructor
@@ -402,59 +406,40 @@ class ProtocolHandlerImpl
/**
* \brief Handles received message.
- * \param connection_handle Identifier of connection through which message
- * is received.
* \param packet Received message with protocol header.
* \return \saRESULT_CODE Status of operation
*/
- RESULT_CODE HandleMessage(
- ConnectionID connection_id,
- const ProtocolFramePtr packet);
+ RESULT_CODE HandleMessage(const ProtocolFramePtr packet);
/**
* \brief Handles message received in single frame.
- * \param connection_handle Identifier of connection through which message
- * is received.
* \param packet Frame of message with protocol header.
* \return \saRESULT_CODE Status of operation
*/
- RESULT_CODE HandleSingleFrameMessage(
- ConnectionID connection_id,
- const ProtocolFramePtr packet);
+ RESULT_CODE HandleSingleFrameMessage(const ProtocolFramePtr packet);
/**
* \brief Handles message received in multiple frames. Collects all frames
* of message.
- * \param connection_handle Identifier of connection through which message
- * is received.
* \param packet Current frame of message with protocol header.
* \return \saRESULT_CODE Status of operation
*/
- RESULT_CODE HandleMultiFrameMessage(
- ConnectionID connection_id,
- const ProtocolFramePtr packet);
+ RESULT_CODE HandleMultiFrameMessage(const ProtocolFramePtr packet);
/**
* \brief Handles message received in single frame.
- * \param connection_handle Identifier of connection through which message
- * is received.
* \param packet Received message with protocol header.
* \return \saRESULT_CODE Status of operation
*/
RESULT_CODE HandleControlMessage(
- ConnectionID connection_id,
const ProtocolFramePtr packet);
- RESULT_CODE HandleControlMessageEndSession(
- ConnectionID connection_id,
- const ProtocolPacket &packet);
+ RESULT_CODE HandleControlMessageEndSession(const ProtocolPacket &packet);
+
+ RESULT_CODE HandleControlMessageStartSession(const ProtocolPacket &packet);
- RESULT_CODE HandleControlMessageStartSession(
- ConnectionID connection_id,
- const ProtocolPacket &packet);
+ RESULT_CODE HandleControlMessageHeartBeat(const ProtocolPacket &packet);
- RESULT_CODE HandleControlMessageHeartBeat(
- ConnectionID connection_id,
- const ProtocolPacket &packet);
+ void PopValideAndExpirateMultiframes();
// threads::MessageLoopThread<*>::Handler implementations
// CALLED ON raw_ford_messages_from_mobile_ thread!
@@ -495,10 +480,9 @@ class ProtocolHandlerImpl
transport_manager::TransportManager *transport_manager_;
/**
- *\brief Map of frames with last frame data for messages received in multiple frames.
+ *\brief Assembling support class.
*/
- typedef std::map<int32_t, ProtocolFramePtr> MultiFrameMap;
- MultiFrameMap incomplete_multi_frame_messages_;
+ MultiFrameBuilder multiframe_builder_;
/**
* \brief Map of messages (frames) received over mobile nave session
diff --git a/src/components/protocol_handler/include/protocol_handler/protocol_packet.h b/src/components/protocol_handler/include/protocol_handler/protocol_packet.h
index db0650cfd8..1b68e6c870 100644
--- a/src/components/protocol_handler/include/protocol_handler/protocol_packet.h
+++ b/src/components/protocol_handler/include/protocol_handler/protocol_packet.h
@@ -33,6 +33,7 @@
#ifndef SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_PROTOCOL_PACKET_H_
#define SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_PROTOCOL_PACKET_H_
+#include <list>
#include "utils/macro.h"
#include "protocol/common.h"
#include "transport_manager/common.h"
@@ -245,7 +246,7 @@ class ProtocolPacket {
/**
* \brief Getter for Connection Identifier
*/
- uint8_t connection_id() const;
+ ConnectionID connection_id() const;
/**
* \brief Getter for data payload size
@@ -286,6 +287,7 @@ class ProtocolPacket {
* @brief Type definition for variable that hold shared pointer to protocolol packet
*/
typedef utils::SharedPtr<protocol_handler::ProtocolPacket> ProtocolFramePtr;
+typedef std::list<ProtocolFramePtr> ProtocolFramePtrList;
template<typename _CharT>
std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
@@ -304,7 +306,7 @@ template<typename _CharT>
std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
const protocol_handler::ProtocolPacket& packet) {
stream << packet.packet_header() <<
- ", ConnectionID: " << (packet.connection_id()) <<
+ ", ConnectionID: " << static_cast<uint32_t>(packet.connection_id()) <<
", TotalDataBytes: " << (packet.total_data_bytes()) <<
", Data: " << static_cast<void*>(packet.data());
return stream;
@@ -312,7 +314,9 @@ std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
template<typename _CharT>
std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream,
const ProtocolFramePtr packet_ptr) {
- stream << *packet_ptr;
- return stream;
+ if(packet_ptr) {
+ return stream << *packet_ptr;
+ }
+ return stream << "empty smart pointer";
}
#endif // SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_PROTOCOL_PACKET_H_
diff --git a/src/components/protocol_handler/src/incoming_data_handler.cc b/src/components/protocol_handler/src/incoming_data_handler.cc
index 43b0898cc2..f1ceb18425 100644
--- a/src/components/protocol_handler/src/incoming_data_handler.cc
+++ b/src/components/protocol_handler/src/incoming_data_handler.cc
@@ -40,32 +40,32 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "ProtocolHandler")
IncomingDataHandler::IncomingDataHandler()
: header_(),
validator_(NULL),
- last_portion_of_data_was_malformed_(false)
- {}
+ last_portion_of_data_was_malformed_(false) {
+}
void IncomingDataHandler::set_validator(
- const ProtocolPacket::ProtocolHeaderValidator *const validator) {
+ const ProtocolPacket::ProtocolHeaderValidator* const validator) {
validator_ = validator;
}
static const size_t MIN_HEADER_SIZE = std::min(PROTOCOL_HEADER_V1_SIZE,
PROTOCOL_HEADER_V2_SIZE);
-std::list<ProtocolFramePtr> IncomingDataHandler::ProcessData(
- const RawMessage &tm_message,
- RESULT_CODE *result,
- size_t *malformed_occurrence) {
+ProtocolFramePtrList IncomingDataHandler::ProcessData(
+ const RawMessage& tm_message,
+ RESULT_CODE* result,
+ size_t* malformed_occurrence) {
LOG4CXX_AUTO_TRACE(logger_);
DCHECK(result);
DCHECK(malformed_occurrence);
const transport_manager::ConnectionUID connection_id =
tm_message.connection_key();
- const uint8_t *data = tm_message.data();
+ const uint8_t* data = tm_message.data();
const size_t tm_message_size = tm_message.data_size();
if (tm_message_size == 0 || data == NULL) {
LOG4CXX_WARN(logger_, "Wrong raw message " << tm_message_size << " bytes");
*result = RESULT_FAIL;
- return std::list<ProtocolFramePtr>();
+ return ProtocolFramePtrList();
}
LOG4CXX_DEBUG(logger_, "Processing incoming data of size "
<< tm_message_size << " for connection " << connection_id);
@@ -73,13 +73,13 @@ std::list<ProtocolFramePtr> IncomingDataHandler::ProcessData(
if (connections_data_.end() == it) {
LOG4CXX_WARN(logger_, "ProcessData requested for unknown connection");
*result = RESULT_FAIL;
- return std::list<ProtocolFramePtr>();
+ return ProtocolFramePtrList();
}
- std::vector<uint8_t> &connection_data = it->second;
+ std::vector<uint8_t>& connection_data = it->second;
connection_data.insert(connection_data.end(), data, data + tm_message_size);
LOG4CXX_DEBUG(logger_, "Total data size for connection "
<< connection_id << " is " << connection_data.size());
- std::list<ProtocolFramePtr> out_frames;
+ ProtocolFramePtrList out_frames;
*malformed_occurrence = 0;
*result = CreateFrame(connection_data, out_frames, *malformed_occurrence, connection_id);
LOG4CXX_DEBUG(logger_, "New data size for connection " << connection_id
@@ -117,7 +117,7 @@ void IncomingDataHandler::RemoveConnection(
}
uint32_t IncomingDataHandler::GetPacketSize(
- const ProtocolPacket::ProtocolHeader &header) {
+ const ProtocolPacket::ProtocolHeader& header) {
switch (header.version) {
case PROTOCOL_VERSION_1:
return header.dataSize + PROTOCOL_HEADER_V1_SIZE;
@@ -134,9 +134,9 @@ uint32_t IncomingDataHandler::GetPacketSize(
}
RESULT_CODE IncomingDataHandler::CreateFrame(
- std::vector<uint8_t> &incoming_data,
- std::list<ProtocolFramePtr> &out_frames,
- size_t &malformed_occurrence,
+ std::vector<uint8_t>& incoming_data,
+ ProtocolFramePtrList& out_frames,
+ size_t& malformed_occurrence,
const transport_manager::ConnectionUID connection_id) {
LOG4CXX_AUTO_TRACE(logger_);
std::vector<uint8_t>::iterator data_it = incoming_data.begin();
@@ -157,7 +157,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame(
++data_it;
--data_size;
LOG4CXX_DEBUG(logger_, "Moved to the next byte " << std::hex
- << static_cast<const void *>(&*data_it));
+ << static_cast<const void*>(&*data_it));
continue;
}
LOG4CXX_DEBUG(logger_, "Payload size " << header_.dataSize);
@@ -167,7 +167,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame(
++data_it;
--data_size;
LOG4CXX_DEBUG(logger_, "Moved to the next byte " << std::hex
- << static_cast<const void *>(&*data_it));
+ << static_cast<const void*>(&*data_it));
continue;
}
if (data_size < packet_size) {
@@ -179,7 +179,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame(
const RESULT_CODE deserialize_result =
frame->deserializePacket(&*data_it, packet_size);
LOG4CXX_DEBUG(
- logger_, "Deserialized frame " << frame);
+ logger_, "Deserialized frame " << frame);
if (deserialize_result != RESULT_OK) {
LOG4CXX_WARN(logger_, "Packet deserialization failed");
incoming_data.erase(incoming_data.begin(), data_it);
diff --git a/src/components/protocol_handler/src/multiframe_builder.cc b/src/components/protocol_handler/src/multiframe_builder.cc
new file mode 100644
index 0000000000..9dc2c08fa5
--- /dev/null
+++ b/src/components/protocol_handler/src/multiframe_builder.cc
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2015, Ford Motor Company
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * Neither the name of the Ford Motor Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "protocol_handler/multiframe_builder.h"
+
+#include <limits>
+
+#include "utils/logger.h"
+#include "utils/make_shared.h"
+#include "utils/lock.h"
+#include "utils/date_time.h"
+
+namespace protocol_handler {
+
+CREATE_LOGGERPTR_GLOBAL(logger_, "ProtocolHandler")
+
+MultiFrameBuilder::MultiFrameBuilder()
+ : consecutive_frame_wait_msecs_(0u) {
+}
+
+void MultiFrameBuilder::set_waiting_timeout(const uint32_t consecutive_frame_wait_msecs) {
+ consecutive_frame_wait_msecs_ = static_cast<int64_t>(consecutive_frame_wait_msecs);
+ if (consecutive_frame_wait_msecs == 0) {
+ LOG4CXX_WARN(logger_, "Waiting timout disabled");
+ } else {
+ LOG4CXX_DEBUG(logger_, "Waiting time in msec: " << consecutive_frame_wait_msecs_);
+ }
+}
+
+bool MultiFrameBuilder::AddConnection(const ConnectionID connection_id) {
+ LOG4CXX_DEBUG(logger_, "Adding connection_id: " << connection_id);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ const MultiFrameMap::const_iterator it = multiframes_map_.find(connection_id);
+ if (it != multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Exists connection_id: " << connection_id);
+ return false;
+ }
+ multiframes_map_[connection_id] = SessionToFrameMap();
+ return true;
+}
+
+bool MultiFrameBuilder::RemoveConnection(const ConnectionID connection_id) {
+ LOG4CXX_DEBUG(logger_, "Removing connection_id: " << connection_id);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ const MultiFrameMap::iterator it = multiframes_map_.find(connection_id);
+ if (it == multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Non-existent connection_id: " << connection_id);
+ return false;
+ }
+ const SessionToFrameMap& session_to_frame_map = it->second;
+ if (!session_to_frame_map.empty()) {
+ // FIXME(EZamakhov): Ask ReqManager - do we need to send GenericError
+ LOG4CXX_WARN(logger_, "For connection_id: " << connection_id
+ << " waiting: " << multiframes_map_);
+ }
+ multiframes_map_.erase(it);
+ return true;
+}
+
+ProtocolFramePtrList MultiFrameBuilder::PopMultiframes() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ ProtocolFramePtrList outpute_frame_list;
+ for (MultiFrameMap::iterator connection_it = multiframes_map_.begin();
+ connection_it != multiframes_map_.end(); ++connection_it) {
+ LOG4CXX_TRACE(logger_, "Step over connection: " << connection_it->first);
+ SessionToFrameMap& session_map = connection_it->second;
+
+ for (SessionToFrameMap::iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ LOG4CXX_TRACE(logger_, "Step over session: "
+ << static_cast<int>(session_it->first));
+ MessageIDToFrameMap& messageId_map = session_it->second;
+
+ MessageIDToFrameMap::iterator messageId_it = messageId_map.begin();
+ while (messageId_it != messageId_map.end()) {
+ LOG4CXX_TRACE(logger_, "Step over messageId: " << messageId_it->first);
+ ProtocolFrameData& frame_data = messageId_it->second;
+ ProtocolFramePtr frame = frame_data.frame;
+
+ if (frame &&
+ frame->frame_data() == FRAME_DATA_LAST_CONSECUTIVE &&
+ frame->payload_size() > 0u ) {
+ LOG4CXX_DEBUG(logger_, "Ready frame: " << frame);
+ outpute_frame_list.push_back(frame);
+ messageId_map.erase(messageId_it++);
+ continue;
+ }
+ if (consecutive_frame_wait_msecs_ != 0) {
+ LOG4CXX_TRACE(logger_, "Expiration verification");
+ const int64_t time_left =
+ date_time::DateTime::calculateTimeSpan(frame_data.append_time);
+ LOG4CXX_DEBUG(logger_, "mSecs left: " << time_left);
+ if (time_left >= consecutive_frame_wait_msecs_) {
+ LOG4CXX_WARN(logger_, "Expired frame: " << frame);
+ outpute_frame_list.push_back(frame);
+ messageId_map.erase(messageId_it++);
+ continue;
+ }
+ }
+ ++messageId_it;
+ } // iteration over messageId_map
+ } // iteration over session_map
+ } // iteration over multiframes_map_
+ LOG4CXX_DEBUG(logger_, "Result frames count: " << outpute_frame_list.size());
+ return outpute_frame_list;
+}
+
+RESULT_CODE MultiFrameBuilder::AddFrame(const ProtocolFramePtr packet) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "Handling frame: " << packet);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ if (!packet) {
+ LOG4CXX_ERROR(logger_, "Skip empty frame");
+ return RESULT_FAIL;
+ }
+ switch (packet->frame_type()) {
+ case FRAME_TYPE_FIRST:
+ LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST");
+ return HandleFirstFrame(packet);
+ case FRAME_TYPE_CONSECUTIVE:
+ LOG4CXX_TRACE(logger_, "FRAME_TYPE_CONSECUTIVE");
+ return HandleConsecutiveFrame(packet);
+ default:
+ LOG4CXX_ERROR(logger_, "Frame is not FIRST or CONSECUTIVE :"
+ << packet);
+ break;
+ }
+ return RESULT_FAIL;
+}
+
+RESULT_CODE MultiFrameBuilder::HandleFirstFrame(const ProtocolFramePtr packet) {
+ DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_FIRST,
+ RESULT_FAIL);
+ LOG4CXX_DEBUG(logger_, "Waiting : " << multiframes_map_);
+ LOG4CXX_DEBUG(logger_, "Handling FIRST frame: " << packet);
+ if (packet->payload_size() != 0u) {
+ LOG4CXX_ERROR(logger_,
+ "First frame shall have no data:" << packet);
+ return RESULT_FAIL;
+ }
+
+ const ConnectionID connection_id = packet->connection_id();
+ MultiFrameMap::iterator connection_it = multiframes_map_.find(connection_id);
+ if (connection_it == multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Unknown connection_id: " << connection_id);
+ return RESULT_FAIL;
+ }
+ SessionToFrameMap& session_map = connection_it->second;
+
+ const SessionID session_id = packet->session_id();
+ // No need to verify session existance
+ MessageIDToFrameMap& messageId_map = session_map[session_id];
+
+ const MessageID message_id = packet->message_id();
+ MessageIDToFrameMap::iterator messageId_it = messageId_map.find(message_id);
+ if (messageId_it != messageId_map.end()) {
+ LOG4CXX_ERROR(logger_, "Already waiting message for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+ return RESULT_FAIL;
+ }
+
+ LOG4CXX_DEBUG(logger_, "Start waiting frames for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+ messageId_map[message_id] = {packet, date_time::DateTime::getCurrentTime()};
+ return RESULT_OK;
+}
+
+RESULT_CODE MultiFrameBuilder::HandleConsecutiveFrame(const ProtocolFramePtr packet) {
+ DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_CONSECUTIVE,
+ RESULT_FAIL);
+ LOG4CXX_DEBUG(logger_, "Handling CONSECUTIVE frame: " << packet);
+
+
+ const ConnectionID connection_id = packet->connection_id();
+ MultiFrameMap::iterator connection_it = multiframes_map_.find(connection_id);
+ if (connection_it == multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Unknown connection_id: " << connection_id);
+ return RESULT_FAIL;
+ }
+ SessionToFrameMap& session_map = connection_it->second;
+
+ const SessionID session_id = packet->session_id();
+ // No need to verify session existance
+ MessageIDToFrameMap& messageId_map = session_map[session_id];
+
+ const MessageID message_id = packet->message_id();
+ MessageIDToFrameMap::iterator messageId_it = messageId_map.find(message_id);
+ if (messageId_it == messageId_map.end()) {
+ LOG4CXX_ERROR(logger_, "No waiting message for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+ return RESULT_FAIL;
+ }
+
+ ProtocolFrameData& frame_data = messageId_it->second;
+ ProtocolFramePtr assembling_frame = frame_data.frame;
+ DCHECK_OR_RETURN(packet->message_id() == assembling_frame->message_id(),
+ RESULT_FAIL);
+
+ const uint8_t new_frame_data = packet->frame_data();
+ const bool is_last_consecutive = (new_frame_data ==
+ FRAME_DATA_LAST_CONSECUTIVE);
+
+ if (is_last_consecutive) {
+ // TODO(EZamakhov): implement count of frames and result size verification
+ LOG4CXX_DEBUG(logger_, "Last CONSECUTIVE frame");
+ } else {
+ uint8_t previous_frame_data = assembling_frame->frame_data();
+ if (previous_frame_data == std::numeric_limits<uint8_t>::max()) {
+ previous_frame_data = 0u;
+ }
+ // The next frame data is bigger at 1
+ if (new_frame_data != (previous_frame_data + 1)) {
+ LOG4CXX_ERROR(logger_,
+ "Unexpected CONSECUTIVE frame for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id
+ << ", frame: " << packet);
+ return RESULT_FAIL;
+ }
+ }
+
+ assembling_frame->set_frame_data(new_frame_data);
+
+ LOG4CXX_DEBUG(logger_,
+ "Appending " << packet->data_size() << " bytes "
+ << "; frame_data " << static_cast<int>(new_frame_data)
+ << "; for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+
+ if (assembling_frame->appendData(packet->data(),
+ packet->data_size()) != RESULT_OK) {
+ LOG4CXX_ERROR(logger_, "Failed to append frame for multiframe message.");
+ return RESULT_FAIL;
+ }
+ frame_data.append_time = date_time::DateTime::getCurrentTime();
+ return RESULT_OK;
+}
+
+} // namespace protocol_handler
diff --git a/src/components/protocol_handler/src/protocol_handler_impl.cc b/src/components/protocol_handler/src/protocol_handler_impl.cc
index dccfffdf04..b3fb337703 100644
--- a/src/components/protocol_handler/src/protocol_handler_impl.cc
+++ b/src/components/protocol_handler/src/protocol_handler_impl.cc
@@ -63,11 +63,12 @@ uint8_t SupportedSDLProtocolVersion();
const size_t kStackSize = 32768;
-ProtocolHandlerImpl::ProtocolHandlerImpl(
- transport_manager::TransportManager *transport_manager_param,
- size_t message_frequency_time, size_t message_frequency_count,
- bool malformed_message_filtering,
- size_t malformed_message_frequency_time, size_t malformed_message_frequency_count)
+ProtocolHandlerImpl::ProtocolHandlerImpl(transport_manager::TransportManager *transport_manager_param,
+ size_t message_frequency_time, size_t message_frequency_count,
+ bool malformed_message_filtering,
+ size_t malformed_message_frequency_time,
+ size_t malformed_message_frequency_count,
+ uint32_t multiframe_waiting_timeout)
: protocol_observers_(),
session_observer_(0),
transport_manager_(transport_manager_param),
@@ -117,6 +118,7 @@ ProtocolHandlerImpl::ProtocolHandlerImpl(
LOG4CXX_WARN(logger_, "Malformed message filtering is disabled."
<< "Connection will be close on first malformed message detection");
}
+ multiframe_builder_.set_waiting_timeout(multiframe_waiting_timeout);
}
ProtocolHandlerImpl::~ProtocolHandlerImpl() {
@@ -502,6 +504,11 @@ void ProtocolHandlerImpl::OnTMMessageReceiveFailed(
void ProtocolHandlerImpl::NotifySubscribers(const RawMessagePtr message) {
LOG4CXX_AUTO_TRACE(logger_);
sync_primitives::AutoLock lock(protocol_observers_lock_);
+ if (protocol_observers_.empty()) {
+ LOG4CXX_ERROR(
+ logger_,
+ "Cannot handle multiframe message: no IProtocolObserver is set.");
+ }
for (ProtocolObservers::iterator it = protocol_observers_.begin();
protocol_observers_.end() != it; ++it) {
(*it)->OnMessageReceived(message);
@@ -570,6 +577,7 @@ void ProtocolHandlerImpl::OnConnectionEstablished(
const transport_manager::DeviceInfo &device_info,
const transport_manager::ConnectionUID &connection_id) {
incoming_data_handler_.AddConnection(connection_id);
+ multiframe_builder_.AddConnection(connection_id);
}
void ProtocolHandlerImpl::OnConnectionClosed(
@@ -577,6 +585,7 @@ void ProtocolHandlerImpl::OnConnectionClosed(
incoming_data_handler_.RemoveConnection(connection_id);
message_meter_.ClearIdentifiers();
malformed_message_meter_.ClearIdentifiers();
+ multiframe_builder_.RemoveConnection(connection_id);
}
RESULT_CODE ProtocolHandlerImpl::SendFrame(const ProtocolFramePtr packet) {
@@ -705,21 +714,20 @@ RESULT_CODE ProtocolHandlerImpl::SendMultiFrameMessage(
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleMessage(ConnectionID connection_id,
- const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleMessage(const ProtocolFramePtr packet) {
DCHECK_OR_RETURN(packet, RESULT_UNKNOWN);
LOG4CXX_DEBUG(logger_, "Handling message " << packet);
switch (packet->frame_type()) {
case FRAME_TYPE_CONTROL:
LOG4CXX_TRACE(logger_, "FRAME_TYPE_CONTROL");
- return HandleControlMessage(connection_id, packet);
+ return HandleControlMessage(packet);
case FRAME_TYPE_SINGLE:
LOG4CXX_TRACE(logger_, "FRAME_TYPE_SINGLE");
- return HandleSingleFrameMessage(connection_id, packet);
+ return HandleSingleFrameMessage(packet);
case FRAME_TYPE_FIRST:
case FRAME_TYPE_CONSECUTIVE:
LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST or FRAME_TYPE_CONSECUTIVE");
- return HandleMultiFrameMessage(connection_id, packet);
+ return HandleMultiFrameMessage(packet);
default: {
LOG4CXX_WARN(logger_, "Unknown frame type"
<< packet->frame_type());
@@ -729,8 +737,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleMessage(ConnectionID connection_id,
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(
- ConnectionID connection_id, const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(const ProtocolFramePtr packet) {
LOG4CXX_AUTO_TRACE(logger_);
LOG4CXX_DEBUG(logger_,
@@ -745,7 +752,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(
}
const uint32_t connection_key =
- session_observer_->KeyFromPair(connection_id, packet->session_id());
+ session_observer_->KeyFromPair(packet->connection_id(), packet->session_id());
const RawMessagePtr rawMessage(
new RawMessage(connection_key,
@@ -773,8 +780,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(
- ConnectionID connection_id, const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(const ProtocolFramePtr packet) {
LOG4CXX_AUTO_TRACE(logger_);
if (!session_observer_) {
@@ -782,100 +788,14 @@ RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(
return RESULT_FAIL;
}
- const uint32_t key = session_observer_->KeyFromPair(connection_id,
- packet->session_id());
- LOG4CXX_DEBUG(
- logger_,
- "Packet " << packet << "; session id " << static_cast<int32_t>(key));
-
- if (packet->frame_type() == FRAME_TYPE_FIRST) {
- LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST");
- // First frame has no data
- DCHECK_OR_RETURN(packet->frame_data() == 0u, RESULT_FAIL);
- // We can not handle more than one multiframe with the same key
- DCHECK_OR_RETURN(incomplete_multi_frame_messages_.count(key) == 0,
- RESULT_FAIL);
- incomplete_multi_frame_messages_[key] = packet;
- return RESULT_OK;
+ if (multiframe_builder_.AddFrame(packet) != RESULT_OK) {
+ LOG4CXX_WARN(logger_, "Frame assembling issue");
}
- DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_CONSECUTIVE, RESULT_FAIL)
-
- MultiFrameMap::iterator it = incomplete_multi_frame_messages_.find(key);
- if (it == incomplete_multi_frame_messages_.end()) {
- LOG4CXX_ERROR(logger_,
- "Frame of multiframe message for non-existing session id " << key);
- return RESULT_FAIL;
- }
-
- ProtocolFramePtr& assembling_frame = it->second;
- const uint8_t previous_frame_data = assembling_frame->frame_data();
- const uint8_t new_frame_data = packet->frame_data();
-
- const bool is_last_consecutive = (new_frame_data == FRAME_DATA_LAST_CONSECUTIVE);
- // The next frame data is bigger at 1
- DCHECK_OR_RETURN((new_frame_data == (previous_frame_data + 1)) ||
- // except the last consecutive frame
- is_last_consecutive, RESULT_FAIL);
-
- assembling_frame->set_frame_data(new_frame_data);
-
- LOG4CXX_DEBUG(logger_,
- "Appending " << packet->data_size() << " bytes "
- << "; frame_data " << static_cast<int>(new_frame_data)
- << "; connection key " << key);
-
- DCHECK_OR_RETURN(packet->message_id() == assembling_frame->message_id(), RESULT_FAIL);
- if (assembling_frame->appendData(packet->data(), packet->data_size()) != RESULT_OK) {
- LOG4CXX_ERROR(logger_, "Failed to append frame for multiframe message.");
- return RESULT_FAIL;
- }
-
- if (is_last_consecutive) {
- LOG4CXX_DEBUG(
- logger_,
- "Last frame of multiframe message size " << packet->data_size()
- << "; connection key " << key);
- {
- sync_primitives::AutoLock lock(protocol_observers_lock_);
- if (protocol_observers_.empty()) {
- LOG4CXX_ERROR(
- logger_,
- "Cannot handle multiframe message: no IProtocolObserver is set.");
- return RESULT_FAIL;
- }
- }
- const uint32_t connection_key =
- session_observer_->KeyFromPair(connection_id,
- assembling_frame->session_id());
- LOG4CXX_DEBUG(logger_, "Result frame" << assembling_frame <<
- "for connection "<< connection_key);
- const RawMessagePtr rawMessage(
- new RawMessage(connection_key,
- assembling_frame->protocol_version(),
- assembling_frame->data(),
- assembling_frame->total_data_bytes(),
- assembling_frame->service_type(),
- assembling_frame->payload_size()));
- DCHECK_OR_RETURN(rawMessage, RESULT_FAIL);
-
-#ifdef TIME_TESTER
- if (metric_observer_) {
- PHMetricObserver::MessageMetric *metric =
- new PHMetricObserver::MessageMetric();
- metric->raw_msg = rawMessage;
- metric_observer_->EndMessageProcess(metric);
- }
-#endif // TIME_TESTER
- // TODO(EZamakhov): check service in session
- NotifySubscribers(rawMessage);
- incomplete_multi_frame_messages_.erase(it);
- }
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(
- ConnectionID connection_id, const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(const ProtocolFramePtr packet) {
LOG4CXX_AUTO_TRACE(logger_);
if (!session_observer_) {
@@ -885,17 +805,16 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(
switch (packet->frame_data()) {
case FRAME_DATA_START_SERVICE:
- return HandleControlMessageStartSession(connection_id, *(packet.get()));
+ return HandleControlMessageStartSession(*packet);
case FRAME_DATA_END_SERVICE:
- return HandleControlMessageEndSession(connection_id, *(packet.get()));
+ return HandleControlMessageEndSession(*packet);
case FRAME_DATA_HEART_BEAT: {
- LOG4CXX_DEBUG(logger_,
- "Received heart beat for connection " << connection_id);
- return HandleControlMessageHeartBeat(connection_id, *(packet.get()));
+ LOG4CXX_TRACE(logger_, "FRAME_DATA_HEART_BEAT");
+ return HandleControlMessageHeartBeat(*packet);
}
case FRAME_DATA_HEART_BEAT_ACK: {
LOG4CXX_DEBUG(logger_, "Received heart beat ack from mobile app"
- " for connection " << connection_id);
+ " for connection " << packet->connection_id());
return RESULT_OK;
}
default:
@@ -922,14 +841,14 @@ uint32_t get_hash_id(const ProtocolPacket &packet) {
return hash_le == HASH_ID_NOT_SUPPORTED ? HASH_ID_WRONG : hash_le;
}
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessageEndSession(
- ConnectionID connection_id, const ProtocolPacket &packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessageEndSession(const ProtocolPacket &packet) {
LOG4CXX_AUTO_TRACE(logger_);
const uint8_t current_session_id = packet.session_id();
const uint32_t hash_id = get_hash_id(packet);
const ServiceType service_type = ServiceTypeFromByte(packet.service_type());
+ const ConnectionID connection_id = packet.connection_id();
const uint32_t session_key = session_observer_->OnSessionEndedCallback(
connection_id, current_session_id, hash_id, service_type);
@@ -1025,8 +944,7 @@ class StartSessionHandler : public security_manager::SecurityManagerListener {
} // namespace
#endif // ENABLE_SECURITY
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(
- ConnectionID connection_id, const ProtocolPacket &packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(const ProtocolPacket &packet) {
LOG4CXX_DEBUG(logger_,
"Protocol version:" <<
static_cast<int>(packet.protocol_version()));
@@ -1043,6 +961,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(
DCHECK(session_observer_);
uint32_t hash_id;
+ const ConnectionID connection_id = packet.connection_id();
const uint32_t session_id = session_observer_->OnSessionStartedCallback(
connection_id, packet.session_id(), service_type, protection, &hash_id);
@@ -1100,8 +1019,8 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(
- ConnectionID connection_id, const ProtocolPacket &packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(const ProtocolPacket &packet) {
+ const ConnectionID connection_id = packet.connection_id();
LOG4CXX_DEBUG(
logger_,
"Sending heart beat acknowledgment for connection " << connection_id);
@@ -1124,6 +1043,41 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(
}
}
+void ProtocolHandlerImpl::PopValideAndExpirateMultiframes() {
+ const ProtocolFramePtrList& frame_list = multiframe_builder_.PopMultiframes();
+ for (ProtocolFramePtrList::const_iterator it = frame_list.begin();
+ it != frame_list.end(); ++it) {
+ const ProtocolFramePtr frame = *it;
+ DCHECK(frame);
+ if(!frame) {
+ continue;
+ }
+
+ const uint32_t connection_key =
+ session_observer_->KeyFromPair(frame->connection_id(), frame->session_id());
+ LOG4CXX_DEBUG(logger_, "Result frame" << frame <<
+ "for connection "<< connection_key);
+ const RawMessagePtr rawMessage(
+ new RawMessage(connection_key,
+ frame->protocol_version(),
+ frame->data(),
+ frame->total_data_bytes(),
+ frame->service_type(),
+ frame->payload_size()));
+ DCHECK(rawMessage);
+
+#ifdef TIME_TESTER
+ if (metric_observer_) {
+ PHMetricObserver::MessageMetric *metric =
+ new PHMetricObserver::MessageMetric();
+ metric->raw_msg = rawMessage;
+ metric_observer_->EndMessageProcess(metric);
+ }
+#endif // TIME_TESTER
+ NotifySubscribers(rawMessage);
+ }
+}
+
bool ProtocolHandlerImpl::TrackMessage(const uint32_t& connection_key) {
LOG4CXX_AUTO_TRACE(logger_);
if (message_frequency_time_ > 0u &&
@@ -1203,7 +1157,8 @@ void ProtocolHandlerImpl::Handle(
FRAME_TYPE_CONTROL == message->frame_type() ||
FRAME_TYPE_FIRST == message->frame_type()) {
LOG4CXX_DEBUG(logger_, "Packet: dataSize " << message->data_size());
- HandleMessage(message->connection_id(), message);
+ HandleMessage(message);
+ PopValideAndExpirateMultiframes();
} else {
LOG4CXX_WARN(logger_,
"handleMessagesFromMobileApp() - incorrect or NULL data");
diff --git a/src/components/protocol_handler/src/protocol_packet.cc b/src/components/protocol_handler/src/protocol_packet.cc
index 1ed6e05699..8675d7543e 100644
--- a/src/components/protocol_handler/src/protocol_packet.cc
+++ b/src/components/protocol_handler/src/protocol_packet.cc
@@ -33,6 +33,7 @@
#include <stdint.h>
#include <memory.h>
#include <new>
+#include <memory>
#include <cstring>
#include <limits>
@@ -247,7 +248,7 @@ RESULT_CODE ProtocolPacket::ProtocolHeaderValidator::validate(
}
ProtocolPacket::ProtocolPacket()
- : payload_size_(0), connection_id_(0) {
+ : payload_size_(0u), connection_id_(0u) {
}
ProtocolPacket::ProtocolPacket(ConnectionID connection_id,
@@ -382,7 +383,7 @@ RESULT_CODE ProtocolPacket::deserializePacket(
dataPayloadSize = messageSize - offset;
}
- uint8_t *data = NULL;
+ uint8_t* data = NULL;
if (dataPayloadSize) {
data = new (std::nothrow) uint8_t[dataPayloadSize];
if (!data) {
@@ -401,6 +402,7 @@ RESULT_CODE ProtocolPacket::deserializePacket(
total_data_bytes |= data[3];
set_total_data_bytes(total_data_bytes);
if (0 == packet_data_.data) {
+ delete[] data;
return RESULT_FAIL;
}
} else {
@@ -482,7 +484,7 @@ uint32_t ProtocolPacket::total_data_bytes() const {
return packet_data_.totalDataBytes;
}
-uint8_t ProtocolPacket::connection_id() const {
+ConnectionID ProtocolPacket::connection_id() const {
return connection_id_;
}
diff --git a/src/components/protocol_handler/test/CMakeLists.txt b/src/components/protocol_handler/test/CMakeLists.txt
index 6e12eeb581..3cb0dfc106 100644
--- a/src/components/protocol_handler/test/CMakeLists.txt
+++ b/src/components/protocol_handler/test/CMakeLists.txt
@@ -52,6 +52,7 @@ set(SOURCES
protocol_handler_tm_test.cc
protocol_packet_test.cc
protocol_payload_test.cc
+ multiframe_builder_test.cc
)
create_test("protocol_handler_test" "${SOURCES}" "${LIBRARIES}")
diff --git a/src/components/protocol_handler/test/include/protocol_handler_mock.h b/src/components/protocol_handler/test/include/protocol_handler_mock.h
index 41b7c491a4..9ad3545ceb 100644
--- a/src/components/protocol_handler/test/include/protocol_handler_mock.h
+++ b/src/components/protocol_handler/test/include/protocol_handler_mock.h
@@ -163,8 +163,10 @@ class SessionObserverMock : public protocol_handler::SessionObserver {
MOCK_METHOD3(ProtocolVersionUsed,
bool( uint32_t connection_id,
uint8_t session_id, uint8_t& protocol_version));
+#ifdef ENABLE_SECURITY
MOCK_CONST_METHOD1(GetHandshakeContext,
security_manager::SSLContext::HandshakeContext (const uint32_t key) );
+#endif // ENABLE_SECURITY
};
#ifdef ENABLE_SECURITY
diff --git a/src/components/protocol_handler/test/multiframe_builder_test.cc b/src/components/protocol_handler/test/multiframe_builder_test.cc
new file mode 100644
index 0000000000..d1910c18e1
--- /dev/null
+++ b/src/components/protocol_handler/test/multiframe_builder_test.cc
@@ -0,0 +1,522 @@
+/*
+ * Copyright (c) 2015, Ford Motor Company
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * Neither the name of the Ford Motor Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <gtest/gtest.h>
+#include <vector>
+#include <utility>
+#include <limits>
+#include "utils/make_shared.h"
+#include "protocol_handler/multiframe_builder.h"
+
+namespace test {
+namespace components {
+namespace protocol_handler_test {
+
+using namespace protocol_handler;
+
+typedef std::vector<ConnectionID> ConnectionList;
+typedef std::vector<uint8_t> UCharDataVector;
+
+struct MutiframeData {
+ UCharDataVector binary_data;
+ ProtocolFramePtrList multiframes;
+};
+
+/**
+ *\brief Map of MutiframeData by MessageID key
+ */
+typedef std::map<MessageID, MutiframeData> MessageIDToMutiframeDataTestMap;
+/**
+ *\brief Map of MessageIDToMutiframeDataMap by SessionID key
+ */
+typedef std::map<SessionID, MessageIDToMutiframeDataTestMap> SessionToMutiframeDataTestMap;
+/**
+ *\brief Map of SessionToMutiframeDataMap by ConnectionID key
+ */
+typedef std::map<ConnectionID, SessionToMutiframeDataTestMap> MultiFrameTestMap;
+
+template<typename IntegerType>
+std::vector<IntegerType> getTestVector() {
+ // Prepare array with a few minimals, middle and a few maximum values
+ const IntegerType array[] = {
+ std::numeric_limits<IntegerType>::min(),
+ std::numeric_limits<IntegerType>::min() + 1,
+ std::numeric_limits<IntegerType>::max() / 2,
+ std::numeric_limits<IntegerType>::max() - 1 ,
+ std::numeric_limits<IntegerType>::max()
+ };
+ return std::vector<IntegerType>(array,
+ array + sizeof(array) / sizeof(array[0]) );
+}
+template<typename IntegerType>
+struct Incrementor {
+ IntegerType value;
+ Incrementor(const IntegerType value = 0u)
+ : value(value) {
+ }
+ IntegerType operator() () {
+ return ++value;
+ }
+};
+
+
+class MultiFrameBuilderTest : public ::testing::Test {
+ protected:
+ void SetUp() OVERRIDE {
+
+ const std::vector<ConnectionID> connections = getTestVector<ConnectionID>();
+ const std::vector<SessionID> sessions = getTestVector<SessionID>();
+ const std::vector<MessageID> messages = getTestVector<MessageID>();
+
+ MutiframeData some_data;
+
+ const uint8_t protocol_version = PROTOCOL_VERSION_2;
+ const uint8_t service_type = SERVICE_TYPE_RPC;
+
+ // We need 255+ messages for rolling over max uint8_t value
+ int multi_frames_count = std::numeric_limits<uint8_t>::max() * 2;
+
+ // Prepare C connections with S sessions with M messages data
+ for (size_t c = 0; c < connections.size(); ++c) {
+ const ConnectionID connection_id = connections[c];
+
+ SessionToMutiframeDataTestMap sessions_map;
+ for (size_t s = 0; s < sessions.size(); ++s) {
+ const SessionID session_id = sessions[s];
+
+ MessageIDToMutiframeDataTestMap messages_map;
+ for (size_t m = 0; m < messages.size(); ++m) {
+ const MessageID message_id = messages[m];
+
+ UCharDataVector& data_vector = some_data.binary_data;
+ // Sahll not be 1 consecutive frame
+ ASSERT_GT(multi_frames_count, 1);
+ data_vector.resize(++multi_frames_count * mtu_);
+
+ std::generate(data_vector.begin(), data_vector.end(), Incrementor<uint8_t>(0u));
+
+ PrepareMultiFrames(connection_id,
+ protocol_version,
+ service_type,
+ session_id,
+ message_id,
+ mtu_,
+ data_vector,
+ some_data.multiframes);
+ messages_map.insert(std::make_pair(message_id, some_data));
+ }
+ sessions_map.insert(std::make_pair(session_id, messages_map));
+ }
+ test_data_map_.insert(std::make_pair(connection_id, sessions_map));
+ }
+ }
+
+ // Support method for first and consecutive frame disassembling
+ static void PrepareMultiFrames(
+ const ConnectionID connection_id,
+ const uint8_t protocol_version,
+ const uint8_t service_type,
+ const uint8_t session_id,
+ const uint32_t message_id,
+ const size_t max_frame_size,
+ const UCharDataVector& data,
+ ProtocolFramePtrList& out_frames);
+
+ void AddConnection(const ConnectionID connection_id);
+
+ void AddConnections();
+
+ void VerifyConsecutiveAdd(const MutiframeData& multiframe_data);
+
+ MultiFrameBuilder multiframe_builder_;
+ MultiFrameTestMap test_data_map_;
+ static size_t mtu_;
+};
+
+size_t MultiFrameBuilderTest::mtu_ = 10;
+
+TEST_F(MultiFrameBuilderTest, Pop_Frames_From_Empty_builder) {
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes());
+}
+
+TEST_F(MultiFrameBuilderTest, Pop_Frames_with_existing_connections) {
+ AddConnections();
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes());
+}
+
+TEST_F(MultiFrameBuilderTest, Add_EmptyFrame) {
+ EXPECT_EQ(RESULT_FAIL,
+ multiframe_builder_.AddFrame(ProtocolFramePtr()));
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes());
+}
+
+TEST_F(MultiFrameBuilderTest, Add_NonSingleOrConsecutive_Frames) {
+ UCharDataVector types;
+ types.reserve(std::numeric_limits<uint8_t>::max());
+ for (uint8_t type = std::numeric_limits<uint8_t>::min();
+ type < std::numeric_limits<uint8_t>::max(); ++type) {
+ if (type != FRAME_TYPE_FIRST &&
+ type != FRAME_TYPE_CONSECUTIVE) {
+ types.push_back(type);
+ }
+ }
+
+ for (UCharDataVector::iterator it = types.begin(); it != types.end(); ++it) {
+ const uint8_t frame_type = *it;
+ const ProtocolFramePtr unexpected_frame(
+ new ProtocolPacket( 0u, PROTOCOL_VERSION_3, PROTECTION_OFF, frame_type,
+ SERVICE_TYPE_RPC, FRAME_DATA_FIRST, 0u, 0u, 0u));
+ EXPECT_EQ(RESULT_FAIL,
+ multiframe_builder_.AddFrame(unexpected_frame))
+ << "Unexpected frame: " << unexpected_frame;
+
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes())
+ << "Unexpected frame: " << unexpected_frame;
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, Add_FirstFrames_NoConnections) {
+ for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ connection_it != test_data_map_.end(); ++connection_it) {
+ SessionToMutiframeDataTestMap& session_map = connection_it->second;
+ const ConnectionID connection_id = connection_it->first;
+
+ for (SessionToMutiframeDataTestMap::iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ MessageIDToMutiframeDataTestMap& messageId_map = session_it->second;
+
+ for (MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin();
+ messageId_it != messageId_map.end(); ++messageId_it) {
+ const MutiframeData& multiframe_data = messageId_it->second;
+
+ const ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ ASSERT_FALSE(multiframes.empty());
+ const ProtocolFramePtr first_frame = multiframes.front();
+ ASSERT_TRUE(first_frame);
+ EXPECT_EQ(RESULT_FAIL,
+ multiframe_builder_.AddFrame(first_frame))
+ << "Unexisting connection " << connection_id
+ << "- to be skipped first frame: " << first_frame;
+
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes())
+ << "First frame: " << first_frame;
+ }
+ }
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, Add_FirstFrames_only) {
+ AddConnections();
+ for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ connection_it != test_data_map_.end(); ++connection_it) {
+ SessionToMutiframeDataTestMap& session_map = connection_it->second;
+
+ for (SessionToMutiframeDataTestMap::iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ MessageIDToMutiframeDataTestMap& messageId_map = session_it->second;
+
+ for (MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin();
+ messageId_it != messageId_map.end(); ++messageId_it) {
+ const MutiframeData& multiframe_data = messageId_it->second;
+
+ const ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ ASSERT_FALSE(multiframes.empty());
+ const ProtocolFramePtr first_frame = multiframes.front();
+ ASSERT_TRUE(first_frame);
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(first_frame))
+ << "First frame: " << first_frame;
+
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes())
+ << "First frame: " << first_frame;
+ }
+ }
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, Add_ConsecutiveFrame) {
+ ASSERT_FALSE(test_data_map_.empty());
+ const ConnectionID& connection_id = test_data_map_.begin()->first;
+ SessionToMutiframeDataTestMap& session_map = test_data_map_.begin()->second;
+
+ AddConnection(connection_id);
+
+ ASSERT_FALSE(session_map.empty());
+ MessageIDToMutiframeDataTestMap& messageId_map = session_map.begin()->second;
+
+ ASSERT_FALSE(messageId_map.empty());
+ const MutiframeData& multiframe_data = messageId_map.begin()->second;
+
+ VerifyConsecutiveAdd(multiframe_data);
+}
+
+TEST_F(MultiFrameBuilderTest, Add_ConsecutiveFrames_OneByOne) {
+ AddConnections();
+ for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ connection_it != test_data_map_.end(); ++connection_it) {
+ SessionToMutiframeDataTestMap& session_map = connection_it->second;
+
+ for (SessionToMutiframeDataTestMap::iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ MessageIDToMutiframeDataTestMap& messageId_map = session_it->second;
+
+ for (MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin();
+ messageId_it != messageId_map.end(); ++messageId_it) {
+ const MutiframeData& multiframe_data = messageId_it->second;
+
+ VerifyConsecutiveAdd(multiframe_data);
+ }
+ }
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, Add_ConsecutiveFrames_per1) {
+ AddConnections();
+ ASSERT_FALSE(test_data_map_.empty());
+ // After processing each frame we remove it from messageId_it
+ // After processing all session data - it removes from session_map
+ // and so on
+ // TODO(Ezamakhov): optimize speed of test by skipping erasing data
+ while (!test_data_map_.empty()) {
+ MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ while (connection_it != test_data_map_.end()) {
+ SessionToMutiframeDataTestMap& session_map = connection_it->second;
+
+ SessionToMutiframeDataTestMap::iterator session_it = session_map.begin();
+ while (session_it != session_map.end()) {
+ MessageIDToMutiframeDataTestMap& messageId_map = session_it->second;
+
+ MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin();
+ while (messageId_it != messageId_map.end()) {
+
+ MutiframeData& multiframe_data = messageId_it->second;
+ ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ ASSERT_FALSE(multiframes.empty());
+
+ const ProtocolFramePtr frame = multiframes.front();
+ ASSERT_TRUE(frame);
+
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(frame)) << "Frame: " << frame;
+
+ multiframes.pop_front();
+
+ // If all frames are assembled
+ if (multiframes.empty()) {
+ const ProtocolFramePtrList& multiframe_list
+ = multiframe_builder_.PopMultiframes();
+ ASSERT_EQ(multiframe_list.size(), 1u);
+
+ const ProtocolFramePtr result_multiframe = multiframe_list.front();
+ const UCharDataVector& binary_data = multiframe_data.binary_data;
+ EXPECT_EQ(binary_data,
+ UCharDataVector(result_multiframe->data(),
+ result_multiframe->data() +
+ result_multiframe->payload_size()));
+ messageId_map.erase(messageId_it++);
+ } else {
+ // Multiframe is not completed
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes()) << "Frame: " << frame;
+ ++messageId_it;
+ }
+ }
+ if (messageId_map.empty()) {
+ session_map.erase(session_it++);
+ } else {
+ ++session_it;
+ }
+ }
+ if (session_map.empty()) {
+ test_data_map_.erase(connection_it++);
+ } else {
+ ++connection_it;
+ }
+ }
+ }
+}
+
+TEST_F(MultiFrameBuilderTest, FrameExpired_OneMSec) {
+ multiframe_builder_.set_waiting_timeout(1);
+
+ ASSERT_FALSE(test_data_map_.empty());
+ const ConnectionID& connection_id = test_data_map_.begin()->first;
+ SessionToMutiframeDataTestMap& session_map = test_data_map_.begin()->second;
+
+ AddConnection(connection_id);
+
+ ASSERT_FALSE(session_map.empty());
+ MessageIDToMutiframeDataTestMap& messageId_map = session_map.begin()->second;
+
+ ASSERT_FALSE(messageId_map.empty());
+ const MutiframeData& multiframe_data = messageId_map.begin()->second;
+
+ const ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ ASSERT_FALSE(multiframes.empty());
+ const ProtocolFramePtr first_frame = multiframes.front();
+ ASSERT_TRUE(first_frame);
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(first_frame))
+ << "First frame: " << first_frame;
+
+ // Wait frame expire
+ usleep(1000);
+ const ProtocolFramePtrList& list = multiframe_builder_.PopMultiframes();
+ ASSERT_FALSE(list.empty());
+ EXPECT_EQ(first_frame,
+ list.front());
+}
+
+/*
+ * Testing support methods
+ */
+
+void MultiFrameBuilderTest::VerifyConsecutiveAdd(const MutiframeData& multiframe_data) {
+ const ProtocolFramePtrList& multiframes = multiframe_data.multiframes;
+ const UCharDataVector& binary_data = multiframe_data.binary_data;
+ ASSERT_FALSE(multiframes.empty());
+
+ // Frame of multiframe loop
+ ProtocolFramePtrList::const_iterator it = multiframes.begin();
+ // Skip last final frame
+ const ProtocolFramePtrList::const_iterator it_last = --(multiframes.end());
+ while (it != it_last) {
+ const ProtocolFramePtr frame = *it;
+ ASSERT_TRUE(frame);
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(frame))
+ << "Non final CONSECUTIVE frame: " << frame;
+ EXPECT_EQ(ProtocolFramePtrList(),
+ multiframe_builder_.PopMultiframes())
+ << "Non final CONSECUTIVE frame: " << frame;
+ ++it;
+ // Skip last final frame
+ }
+
+ const ProtocolFramePtr final_frame = multiframes.back();
+
+ EXPECT_EQ(RESULT_OK,
+ multiframe_builder_.AddFrame(final_frame))
+ << "Final CONSECUTIVE frame: " << final_frame;
+
+ const ProtocolFramePtrList& multiframe_list
+ = multiframe_builder_.PopMultiframes();
+ ASSERT_EQ(multiframe_list.size(), 1u);
+
+ const ProtocolFramePtr result_multiframe = multiframe_list.front();
+ EXPECT_EQ(binary_data,
+ UCharDataVector(result_multiframe->data(),
+ result_multiframe->data() + result_multiframe->payload_size()));
+}
+
+void MultiFrameBuilderTest::PrepareMultiFrames(const ConnectionID connection_id,
+ const uint8_t protocol_version,
+ const uint8_t service_type,
+ const uint8_t session_id,
+ const uint32_t message_id,
+ const size_t max_payload_size,
+ const UCharDataVector& data,
+ ProtocolFramePtrList& out_frames) {
+ ASSERT_GT(max_payload_size, FIRST_FRAME_DATA_SIZE);
+ ASSERT_EQ(FIRST_FRAME_DATA_SIZE, 0x08);
+
+ // TODO(EZamakhov): move to the separate class
+ const size_t data_size = data.size();
+ // remainder of last frame
+ const size_t lastframe_remainder = data_size % max_payload_size;
+ // size of last frame (full fill or not)
+ const size_t lastframe_size =
+ lastframe_remainder > 0 ? lastframe_remainder : max_payload_size;
+
+ const size_t frames_count = data_size / max_payload_size +
+ // add last frame if not empty
+ (lastframe_remainder > 0 ? 1 : 0);
+
+ uint8_t out_data[FIRST_FRAME_DATA_SIZE];
+ out_data[0] = data_size >> 24;
+ out_data[1] = data_size >> 16;
+ out_data[2] = data_size >> 8;
+ out_data[3] = data_size;
+
+ out_data[4] = frames_count >> 24;
+ out_data[5] = frames_count >> 16;
+ out_data[6] = frames_count >> 8;
+ out_data[7] = frames_count;
+
+ ProtocolFramePtr first_frame(
+ new ProtocolPacket(
+ connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_FIRST,
+ service_type, FRAME_DATA_FIRST, session_id, FIRST_FRAME_DATA_SIZE,
+ message_id, out_data));
+ // Note: PHIMpl already prepare First frames the total_data_bytes on desirialization
+ first_frame->set_total_data_bytes(data_size);
+
+ out_frames.clear();
+ out_frames.push_back(first_frame);
+
+ for (size_t i = 0; i < frames_count; ++i) {
+ const bool is_last_frame = (i == (frames_count - 1));
+ const size_t frame_size = is_last_frame ? lastframe_size : max_payload_size;
+ const uint8_t data_type =
+ is_last_frame
+ ? FRAME_DATA_LAST_CONSECUTIVE
+ : (i % FRAME_DATA_MAX_CONSECUTIVE + 1);
+
+ const ProtocolFramePtr consecutive_frame(
+ new ProtocolPacket(
+ connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_CONSECUTIVE,
+ service_type, data_type, session_id, frame_size, message_id,
+ &data[max_payload_size * i]));
+ out_frames.push_back(consecutive_frame);
+ }
+}
+
+void MultiFrameBuilderTest::AddConnection(const ConnectionID connection_id) {
+ ASSERT_TRUE(multiframe_builder_.AddConnection(connection_id));
+}
+
+void MultiFrameBuilderTest::AddConnections() {
+ for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin();
+ connection_it != test_data_map_.end(); ++connection_it) {
+ const ConnectionID connection_id = connection_it->first;
+ ASSERT_TRUE(multiframe_builder_.AddConnection(connection_id));
+ }
+}
+
+} // namespace protocol_handler_test
+} // namespace components
+} // namespace test
diff --git a/src/components/protocol_handler/test/protocol_handler_tm_test.cc b/src/components/protocol_handler/test/protocol_handler_tm_test.cc
index 4ef070c52f..ea8f7b122e 100644
--- a/src/components/protocol_handler/test/protocol_handler_tm_test.cc
+++ b/src/components/protocol_handler/test/protocol_handler_tm_test.cc
@@ -64,12 +64,15 @@ class ProtocolHandlerImplTest : public ::testing::Test {
const size_t period_msec, const size_t max_messages,
bool malformed_message_filtering = false,
const size_t malformd_period_msec = 0u,
- const size_t malformd_max_messages = 0u) {
+ const size_t malformd_max_messages = 0u,
+ const int32_t multiframe_waiting_timeout = 0) {
protocol_handler_impl.reset(
new ProtocolHandlerImpl(&transport_manager_mock,
period_msec, max_messages,
malformed_message_filtering,
- malformd_period_msec, malformd_max_messages));
+ malformd_period_msec,
+ malformd_max_messages,
+ multiframe_waiting_timeout));
protocol_handler_impl->set_session_observer(&session_observer_mock);
tm_listener = protocol_handler_impl.get();
}
@@ -249,7 +252,7 @@ TEST_F(ProtocolHandlerImplTest, StartSession_Protected_SessionObserverReject) {
// For enabled protection callback shall use protection ON
const bool callback_protection_flag = PROTECTION_ON;
#else
- // For disabled protection callback shall ignore protection income flad and use protection OFF
+ // For disabled protection callback shall ignore protection income flag and use protection OFF
const bool callback_protection_flag = PROTECTION_OFF;
#endif // ENABLE_SECURITY
// expect ConnectionHandler check
diff --git a/src/components/time_tester/test/time_manager_test.cc b/src/components/time_tester/test/time_manager_test.cc
index 60f3bd6814..c5ad607a97 100644
--- a/src/components/time_tester/test/time_manager_test.cc
+++ b/src/components/time_tester/test/time_manager_test.cc
@@ -55,7 +55,7 @@ class StreamerMock : public Streamer {
TEST(TimeManagerTest, DISABLED_MessageProcess) {
//TODO(AK) APPLINK-13351 Disable due to refactor TimeTester
protocol_handler_test::TransportManagerMock transport_manager_mock;
- protocol_handler::ProtocolHandlerImpl protocol_handler_mock(&transport_manager_mock, 0, 0, 0, 0, 0);
+ protocol_handler::ProtocolHandlerImpl protocol_handler_mock(&transport_manager_mock, 0, 0, 0, 0, 0, 0);
TimeManager * time_manager = new TimeManager();
// Streamer will be deleted by Thread
StreamerMock* streamer_mock = new StreamerMock(time_manager);