diff options
Diffstat (limited to 'src/components')
16 files changed, 1105 insertions, 192 deletions
diff --git a/src/components/config_profile/include/config_profile/profile.h b/src/components/config_profile/include/config_profile/profile.h index e3ef1a82a7..7c8f3c3958 100644 --- a/src/components/config_profile/include/config_profile/profile.h +++ b/src/components/config_profile/include/config_profile/profile.h @@ -545,6 +545,8 @@ class Profile : public utils::Singleton<Profile> { size_t malformed_frequency_time() const; + uint32_t multiframe_waiting_timeout() const; + uint16_t attempts_to_open_policy_db() const; uint16_t open_attempt_timeout_ms() const; diff --git a/src/components/config_profile/src/profile.cc b/src/components/config_profile/src/profile.cc index 92c139ce8d..153c89b0d2 100644 --- a/src/components/config_profile/src/profile.cc +++ b/src/components/config_profile/src/profile.cc @@ -196,6 +196,7 @@ const char* kFrequencyTime = "FrequencyTime"; const char* kMalformedMessageFiltering = "MalformedMessageFiltering"; const char* kMalformedFrequencyCount = "MalformedFrequencyCount"; const char* kMalformedFrequencyTime = "MalformedFrequencyTime"; +const char* kExpectedConsecutiveFramesTimeout = "ExpectedConsecutiveFramesTimeout"; const char* kHashStringSizeKey = "HashStringSize"; const char* kUseDBForResumptionKey = "UseDBForResumption"; const char* kAttemptsToOpenResumptionDBKey = "AttemptsToOpenResumptionDB"; @@ -273,6 +274,7 @@ const size_t kDefaultFrequencyTime = 1000; const bool kDefaulMalformedMessageFiltering = true; const size_t kDefaultMalformedFrequencyCount = 10; const size_t kDefaultMalformedFrequencyTime = 1000; +const uint32_t kDefaultExpectedConsecutiveFramesTimeout = 10000; const uint16_t kDefaultAttemptsToOpenPolicyDB = 5; const uint16_t kDefaultOpenAttemptTimeoutMs = 500; const uint32_t kDefaultAppIconsFolderMaxSize = 104857600; @@ -709,6 +711,12 @@ size_t Profile::malformed_frequency_time() const { kProtocolHandlerSection, kMalformedFrequencyTime); return malformed_frequency_time; } +uint32_t Profile::multiframe_waiting_timeout() const { + uint32_t multiframe_waiting_timeout = 0; + ReadUIntValue(&multiframe_waiting_timeout, kDefaultExpectedConsecutiveFramesTimeout, + kProtocolHandlerSection, kExpectedConsecutiveFramesTimeout); + return multiframe_waiting_timeout; +} uint16_t Profile::attempts_to_open_policy_db() const { return attempts_to_open_policy_db_; diff --git a/src/components/protocol_handler/CMakeLists.txt b/src/components/protocol_handler/CMakeLists.txt index 10a18c48b6..81ce371001 100644 --- a/src/components/protocol_handler/CMakeLists.txt +++ b/src/components/protocol_handler/CMakeLists.txt @@ -43,6 +43,7 @@ set(SOURCES ${COMPONENTS_DIR}/protocol_handler/src/protocol_handler_impl.cc ${COMPONENTS_DIR}/protocol_handler/src/protocol_packet.cc ${COMPONENTS_DIR}/protocol_handler/src/protocol_payload.cc + ${COMPONENTS_DIR}/protocol_handler/src/multiframe_builder.cc ) set(LIBRARIES diff --git a/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h b/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h index 7be82843c7..b02e7de3bf 100644 --- a/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h +++ b/src/components/protocol_handler/include/protocol_handler/incoming_data_handler.h @@ -32,7 +32,6 @@ #ifndef SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_INCOMING_DATA_HANDLER_H_ #define SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_INCOMING_DATA_HANDLER_H_ -#include <list> #include <map> #include <vector> #include "utils/macro.h" @@ -53,7 +52,7 @@ class IncomingDataHandler { * @brief Setting additional validator for checking malformed packets * \param validator pointer */ - void set_validator(const ProtocolPacket::ProtocolHeaderValidator *const validator); + void set_validator(const ProtocolPacket::ProtocolHeaderValidator* const validator); /** * @brief Concatenate TM messages to ford frames and validate ford header data * \param TM messages for converting to frames @@ -65,9 +64,9 @@ class IncomingDataHandler { * - RESULT_FAIL - packet serialization or validation error occurs * \return list of complete, correct packets */ - std::list<ProtocolFramePtr> ProcessData(const RawMessage &tm_message, - RESULT_CODE *result, - size_t *malformed_occurrence); + ProtocolFramePtrList ProcessData(const RawMessage& tm_message, + RESULT_CODE* result, + size_t* malformed_occurrence); /** * @brief Add connection for data handling and verification */ @@ -83,7 +82,7 @@ class IncomingDataHandler { /** * @brief Returns size of frame to be formed from raw bytes. */ - static uint32_t GetPacketSize(const ProtocolPacket::ProtocolHeader &header); + static uint32_t GetPacketSize(const ProtocolPacket::ProtocolHeader& header); /** * @brief Try to create frame from incoming data * \param incommung_data raw stream @@ -95,16 +94,16 @@ class IncomingDataHandler { * - RESULT_OK - one or more frames successfully created * - RESULT_FAIL - packet serialization or validation error occurs */ - RESULT_CODE CreateFrame(std::vector<uint8_t> &incoming_data, - std::list<ProtocolFramePtr> &out_frames, - size_t &malformed_occurrence, + RESULT_CODE CreateFrame(std::vector<uint8_t>& incoming_data, + ProtocolFramePtrList& out_frames, + size_t& malformed_occurrence, const transport_manager::ConnectionUID connection_id); typedef std::map<transport_manager::ConnectionUID, std::vector<uint8_t> > ConnectionsDataMap; ConnectionsDataMap connections_data_; ProtocolPacket::ProtocolHeader header_; - const ProtocolPacket::ProtocolHeaderValidator *validator_; + const ProtocolPacket::ProtocolHeaderValidator* validator_; bool last_portion_of_data_was_malformed_; DISALLOW_COPY_AND_ASSIGN(IncomingDataHandler); }; diff --git a/src/components/protocol_handler/include/protocol_handler/multiframe_builder.h b/src/components/protocol_handler/include/protocol_handler/multiframe_builder.h new file mode 100644 index 0000000000..6ccf891b38 --- /dev/null +++ b/src/components/protocol_handler/include/protocol_handler/multiframe_builder.h @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2015, Ford Motor Company + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_MULTIFRAME_BUILDER_H_ +#define SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_MULTIFRAME_BUILDER_H_ + +#include <map> +#include <ostream> // std::basic_ostream +#include <iterator> // std::ostream_iterator +#include <algorithm> // std::copy + +#include "utils/date_time.h" +#include "protocol_handler/protocol_packet.h" + +/** + *\namespace protocol_handlerHandler + *\brief Namespace for SmartDeviceLink ProtocolHandler related functionality. + */ +namespace protocol_handler { +/** + * \brief Session identifier - contains connection identifier and + * session_id from protocol (can be used as hash) + */ +// TODO(EZamakhov): move SessionID to protocol_handler/protocol_packet.h +typedef uint8_t SessionID; +/** + * \brief Message identifier - unique to the session messages + */ +// TODO(EZamakhov): move MessageID to protocol_handler/session_observer.h +typedef uint32_t MessageID; + +struct ProtocolFrameData { + ProtocolFramePtr frame; + TimevalStruct append_time; +}; +/** + *\brief Map of frames with last frame data for messages received in multiple frames. + */ +typedef std::map<MessageID, ProtocolFrameData> MessageIDToFrameMap; +/** + *\brief Map of frames with last frame data for messages received in multiple frames. + */ +typedef std::map<SessionID, MessageIDToFrameMap> SessionToFrameMap; +/** + *\brief Map of frames with last frame data for messages received in multiple frames. + */ +typedef std::map<ConnectionID, SessionToFrameMap> MultiFrameMap; + +/** + * \class MultiFrameBuilder + * \brief Class for assembling consecutive frames according to + * messageID to complete multiframes. + */ +class MultiFrameBuilder { + public: + /** + * @brief Constructor + */ + MultiFrameBuilder(); + + /** + *\brief Set timeout of waiting CONSECUTIVE frames + */ + void set_waiting_timeout(const uint32_t consecutive_frame_wait_msecs); + + /** + * @brief Add connection for pending data + * @return true on success + */ + bool AddConnection(const ConnectionID connection_id); + + /** + * @brief Clear all data related to connection_id + * @return true on success + */ + bool RemoveConnection(const ConnectionID connection_id); + + /** + *\brief Pop assembled and expired frames + */ + ProtocolFramePtrList PopMultiframes(); + + /** + *\brief Handle Single or Consecutive frame + * @return RESULT_OK on success, or RESULT_FAIL in case of any error + */ + RESULT_CODE AddFrame(const ProtocolFramePtr packet); + + private: + RESULT_CODE HandleFirstFrame(const ProtocolFramePtr packet); + RESULT_CODE HandleConsecutiveFrame(const ProtocolFramePtr packet); + + // Map of frames with last frame data for messages received in multiple frames. + MultiFrameMap multiframes_map_; + int64_t consecutive_frame_wait_msecs_; +}; + +template<typename _CharT> +std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream, + const protocol_handler::MultiFrameMap& map) { + if (map.empty()) { + stream << "{empty}"; + return stream; + } + for (MultiFrameMap::const_iterator connection_it = map.begin(); + connection_it != map.end(); ++connection_it) { + const SessionToFrameMap& session_map = connection_it->second; + + for (SessionToFrameMap::const_iterator session_it = session_map.begin(); + session_it != session_map.end(); ++session_it) { + const MessageIDToFrameMap& messageId_map = session_it->second; + + for (MessageIDToFrameMap::const_iterator messageId_it = messageId_map.begin(); + messageId_it != messageId_map.end(); ++messageId_it) { + const ProtocolFrameData& frame_data = messageId_it->second; + + stream << "ConnectionID: " << connection_it->first + << ", SessionID: " << static_cast<uint32_t>(session_it->first) + << ", MessageID: " << static_cast<uint32_t>(messageId_it->first) + << " msec, frame: " << frame_data.frame << std::endl; + } + } + } + return stream; +} + +} // namespace protocol_handler +#endif // SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_MULTIFRAME_BUILDER_H_ diff --git a/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h b/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h index 260ad16ec7..fb10811f14 100644 --- a/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h +++ b/src/components/protocol_handler/include/protocol_handler/protocol_handler_impl.h @@ -50,6 +50,7 @@ #include "protocol_handler/session_observer.h" #include "protocol_handler/protocol_observer.h" #include "protocol_handler/incoming_data_handler.h" +#include "protocol_handler/multiframe_builder.h" #include "transport_manager/common.h" #include "transport_manager/transport_manager.h" #include "transport_manager/transport_manager_listener_empty.h" @@ -142,14 +143,17 @@ class ProtocolHandlerImpl * \param malformed_message_frequency_time used as time for malformed flood filtering * \param malformed_message_frequency_count used as maximum value of malformed * messages per message_frequency_time period + * \param multiframe_waiting_timeout used as maximum time of consecutive + * frames handling * message exchange. */ explicit ProtocolHandlerImpl( - transport_manager::TransportManager *transport_manager_param, - size_t message_frequency_time, size_t message_frequency_count, - bool malformed_message_filtering, - size_t malformed_message_frequency_time, - size_t malformed_message_frequency_count); + transport_manager::TransportManager *transport_manager_param, + size_t message_frequency_time, size_t message_frequency_count, + bool malformed_message_filtering, + size_t malformed_message_frequency_time, + size_t malformed_message_frequency_count, + uint32_t multiframe_waiting_timeout); /** * \brief Destructor @@ -402,59 +406,40 @@ class ProtocolHandlerImpl /** * \brief Handles received message. - * \param connection_handle Identifier of connection through which message - * is received. * \param packet Received message with protocol header. * \return \saRESULT_CODE Status of operation */ - RESULT_CODE HandleMessage( - ConnectionID connection_id, - const ProtocolFramePtr packet); + RESULT_CODE HandleMessage(const ProtocolFramePtr packet); /** * \brief Handles message received in single frame. - * \param connection_handle Identifier of connection through which message - * is received. * \param packet Frame of message with protocol header. * \return \saRESULT_CODE Status of operation */ - RESULT_CODE HandleSingleFrameMessage( - ConnectionID connection_id, - const ProtocolFramePtr packet); + RESULT_CODE HandleSingleFrameMessage(const ProtocolFramePtr packet); /** * \brief Handles message received in multiple frames. Collects all frames * of message. - * \param connection_handle Identifier of connection through which message - * is received. * \param packet Current frame of message with protocol header. * \return \saRESULT_CODE Status of operation */ - RESULT_CODE HandleMultiFrameMessage( - ConnectionID connection_id, - const ProtocolFramePtr packet); + RESULT_CODE HandleMultiFrameMessage(const ProtocolFramePtr packet); /** * \brief Handles message received in single frame. - * \param connection_handle Identifier of connection through which message - * is received. * \param packet Received message with protocol header. * \return \saRESULT_CODE Status of operation */ RESULT_CODE HandleControlMessage( - ConnectionID connection_id, const ProtocolFramePtr packet); - RESULT_CODE HandleControlMessageEndSession( - ConnectionID connection_id, - const ProtocolPacket &packet); + RESULT_CODE HandleControlMessageEndSession(const ProtocolPacket &packet); + + RESULT_CODE HandleControlMessageStartSession(const ProtocolPacket &packet); - RESULT_CODE HandleControlMessageStartSession( - ConnectionID connection_id, - const ProtocolPacket &packet); + RESULT_CODE HandleControlMessageHeartBeat(const ProtocolPacket &packet); - RESULT_CODE HandleControlMessageHeartBeat( - ConnectionID connection_id, - const ProtocolPacket &packet); + void PopValideAndExpirateMultiframes(); // threads::MessageLoopThread<*>::Handler implementations // CALLED ON raw_ford_messages_from_mobile_ thread! @@ -495,10 +480,9 @@ class ProtocolHandlerImpl transport_manager::TransportManager *transport_manager_; /** - *\brief Map of frames with last frame data for messages received in multiple frames. + *\brief Assembling support class. */ - typedef std::map<int32_t, ProtocolFramePtr> MultiFrameMap; - MultiFrameMap incomplete_multi_frame_messages_; + MultiFrameBuilder multiframe_builder_; /** * \brief Map of messages (frames) received over mobile nave session diff --git a/src/components/protocol_handler/include/protocol_handler/protocol_packet.h b/src/components/protocol_handler/include/protocol_handler/protocol_packet.h index db0650cfd8..1b68e6c870 100644 --- a/src/components/protocol_handler/include/protocol_handler/protocol_packet.h +++ b/src/components/protocol_handler/include/protocol_handler/protocol_packet.h @@ -33,6 +33,7 @@ #ifndef SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_PROTOCOL_PACKET_H_ #define SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_PROTOCOL_PACKET_H_ +#include <list> #include "utils/macro.h" #include "protocol/common.h" #include "transport_manager/common.h" @@ -245,7 +246,7 @@ class ProtocolPacket { /** * \brief Getter for Connection Identifier */ - uint8_t connection_id() const; + ConnectionID connection_id() const; /** * \brief Getter for data payload size @@ -286,6 +287,7 @@ class ProtocolPacket { * @brief Type definition for variable that hold shared pointer to protocolol packet */ typedef utils::SharedPtr<protocol_handler::ProtocolPacket> ProtocolFramePtr; +typedef std::list<ProtocolFramePtr> ProtocolFramePtrList; template<typename _CharT> std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream, @@ -304,7 +306,7 @@ template<typename _CharT> std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream, const protocol_handler::ProtocolPacket& packet) { stream << packet.packet_header() << - ", ConnectionID: " << (packet.connection_id()) << + ", ConnectionID: " << static_cast<uint32_t>(packet.connection_id()) << ", TotalDataBytes: " << (packet.total_data_bytes()) << ", Data: " << static_cast<void*>(packet.data()); return stream; @@ -312,7 +314,9 @@ std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream, template<typename _CharT> std::basic_ostream<_CharT>& operator<<(std::basic_ostream<_CharT>& stream, const ProtocolFramePtr packet_ptr) { - stream << *packet_ptr; - return stream; + if(packet_ptr) { + return stream << *packet_ptr; + } + return stream << "empty smart pointer"; } #endif // SRC_COMPONENTS_PROTOCOL_HANDLER_INCLUDE_PROTOCOL_HANDLER_PROTOCOL_PACKET_H_ diff --git a/src/components/protocol_handler/src/incoming_data_handler.cc b/src/components/protocol_handler/src/incoming_data_handler.cc index 43b0898cc2..f1ceb18425 100644 --- a/src/components/protocol_handler/src/incoming_data_handler.cc +++ b/src/components/protocol_handler/src/incoming_data_handler.cc @@ -40,32 +40,32 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "ProtocolHandler") IncomingDataHandler::IncomingDataHandler() : header_(), validator_(NULL), - last_portion_of_data_was_malformed_(false) - {} + last_portion_of_data_was_malformed_(false) { +} void IncomingDataHandler::set_validator( - const ProtocolPacket::ProtocolHeaderValidator *const validator) { + const ProtocolPacket::ProtocolHeaderValidator* const validator) { validator_ = validator; } static const size_t MIN_HEADER_SIZE = std::min(PROTOCOL_HEADER_V1_SIZE, PROTOCOL_HEADER_V2_SIZE); -std::list<ProtocolFramePtr> IncomingDataHandler::ProcessData( - const RawMessage &tm_message, - RESULT_CODE *result, - size_t *malformed_occurrence) { +ProtocolFramePtrList IncomingDataHandler::ProcessData( + const RawMessage& tm_message, + RESULT_CODE* result, + size_t* malformed_occurrence) { LOG4CXX_AUTO_TRACE(logger_); DCHECK(result); DCHECK(malformed_occurrence); const transport_manager::ConnectionUID connection_id = tm_message.connection_key(); - const uint8_t *data = tm_message.data(); + const uint8_t* data = tm_message.data(); const size_t tm_message_size = tm_message.data_size(); if (tm_message_size == 0 || data == NULL) { LOG4CXX_WARN(logger_, "Wrong raw message " << tm_message_size << " bytes"); *result = RESULT_FAIL; - return std::list<ProtocolFramePtr>(); + return ProtocolFramePtrList(); } LOG4CXX_DEBUG(logger_, "Processing incoming data of size " << tm_message_size << " for connection " << connection_id); @@ -73,13 +73,13 @@ std::list<ProtocolFramePtr> IncomingDataHandler::ProcessData( if (connections_data_.end() == it) { LOG4CXX_WARN(logger_, "ProcessData requested for unknown connection"); *result = RESULT_FAIL; - return std::list<ProtocolFramePtr>(); + return ProtocolFramePtrList(); } - std::vector<uint8_t> &connection_data = it->second; + std::vector<uint8_t>& connection_data = it->second; connection_data.insert(connection_data.end(), data, data + tm_message_size); LOG4CXX_DEBUG(logger_, "Total data size for connection " << connection_id << " is " << connection_data.size()); - std::list<ProtocolFramePtr> out_frames; + ProtocolFramePtrList out_frames; *malformed_occurrence = 0; *result = CreateFrame(connection_data, out_frames, *malformed_occurrence, connection_id); LOG4CXX_DEBUG(logger_, "New data size for connection " << connection_id @@ -117,7 +117,7 @@ void IncomingDataHandler::RemoveConnection( } uint32_t IncomingDataHandler::GetPacketSize( - const ProtocolPacket::ProtocolHeader &header) { + const ProtocolPacket::ProtocolHeader& header) { switch (header.version) { case PROTOCOL_VERSION_1: return header.dataSize + PROTOCOL_HEADER_V1_SIZE; @@ -134,9 +134,9 @@ uint32_t IncomingDataHandler::GetPacketSize( } RESULT_CODE IncomingDataHandler::CreateFrame( - std::vector<uint8_t> &incoming_data, - std::list<ProtocolFramePtr> &out_frames, - size_t &malformed_occurrence, + std::vector<uint8_t>& incoming_data, + ProtocolFramePtrList& out_frames, + size_t& malformed_occurrence, const transport_manager::ConnectionUID connection_id) { LOG4CXX_AUTO_TRACE(logger_); std::vector<uint8_t>::iterator data_it = incoming_data.begin(); @@ -157,7 +157,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame( ++data_it; --data_size; LOG4CXX_DEBUG(logger_, "Moved to the next byte " << std::hex - << static_cast<const void *>(&*data_it)); + << static_cast<const void*>(&*data_it)); continue; } LOG4CXX_DEBUG(logger_, "Payload size " << header_.dataSize); @@ -167,7 +167,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame( ++data_it; --data_size; LOG4CXX_DEBUG(logger_, "Moved to the next byte " << std::hex - << static_cast<const void *>(&*data_it)); + << static_cast<const void*>(&*data_it)); continue; } if (data_size < packet_size) { @@ -179,7 +179,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame( const RESULT_CODE deserialize_result = frame->deserializePacket(&*data_it, packet_size); LOG4CXX_DEBUG( - logger_, "Deserialized frame " << frame); + logger_, "Deserialized frame " << frame); if (deserialize_result != RESULT_OK) { LOG4CXX_WARN(logger_, "Packet deserialization failed"); incoming_data.erase(incoming_data.begin(), data_it); diff --git a/src/components/protocol_handler/src/multiframe_builder.cc b/src/components/protocol_handler/src/multiframe_builder.cc new file mode 100644 index 0000000000..9dc2c08fa5 --- /dev/null +++ b/src/components/protocol_handler/src/multiframe_builder.cc @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2015, Ford Motor Company + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "protocol_handler/multiframe_builder.h" + +#include <limits> + +#include "utils/logger.h" +#include "utils/make_shared.h" +#include "utils/lock.h" +#include "utils/date_time.h" + +namespace protocol_handler { + +CREATE_LOGGERPTR_GLOBAL(logger_, "ProtocolHandler") + +MultiFrameBuilder::MultiFrameBuilder() + : consecutive_frame_wait_msecs_(0u) { +} + +void MultiFrameBuilder::set_waiting_timeout(const uint32_t consecutive_frame_wait_msecs) { + consecutive_frame_wait_msecs_ = static_cast<int64_t>(consecutive_frame_wait_msecs); + if (consecutive_frame_wait_msecs == 0) { + LOG4CXX_WARN(logger_, "Waiting timout disabled"); + } else { + LOG4CXX_DEBUG(logger_, "Waiting time in msec: " << consecutive_frame_wait_msecs_); + } +} + +bool MultiFrameBuilder::AddConnection(const ConnectionID connection_id) { + LOG4CXX_DEBUG(logger_, "Adding connection_id: " << connection_id); + LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_); + const MultiFrameMap::const_iterator it = multiframes_map_.find(connection_id); + if (it != multiframes_map_.end()) { + LOG4CXX_ERROR(logger_, "Exists connection_id: " << connection_id); + return false; + } + multiframes_map_[connection_id] = SessionToFrameMap(); + return true; +} + +bool MultiFrameBuilder::RemoveConnection(const ConnectionID connection_id) { + LOG4CXX_DEBUG(logger_, "Removing connection_id: " << connection_id); + LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_); + const MultiFrameMap::iterator it = multiframes_map_.find(connection_id); + if (it == multiframes_map_.end()) { + LOG4CXX_ERROR(logger_, "Non-existent connection_id: " << connection_id); + return false; + } + const SessionToFrameMap& session_to_frame_map = it->second; + if (!session_to_frame_map.empty()) { + // FIXME(EZamakhov): Ask ReqManager - do we need to send GenericError + LOG4CXX_WARN(logger_, "For connection_id: " << connection_id + << " waiting: " << multiframes_map_); + } + multiframes_map_.erase(it); + return true; +} + +ProtocolFramePtrList MultiFrameBuilder::PopMultiframes() { + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_); + ProtocolFramePtrList outpute_frame_list; + for (MultiFrameMap::iterator connection_it = multiframes_map_.begin(); + connection_it != multiframes_map_.end(); ++connection_it) { + LOG4CXX_TRACE(logger_, "Step over connection: " << connection_it->first); + SessionToFrameMap& session_map = connection_it->second; + + for (SessionToFrameMap::iterator session_it = session_map.begin(); + session_it != session_map.end(); ++session_it) { + LOG4CXX_TRACE(logger_, "Step over session: " + << static_cast<int>(session_it->first)); + MessageIDToFrameMap& messageId_map = session_it->second; + + MessageIDToFrameMap::iterator messageId_it = messageId_map.begin(); + while (messageId_it != messageId_map.end()) { + LOG4CXX_TRACE(logger_, "Step over messageId: " << messageId_it->first); + ProtocolFrameData& frame_data = messageId_it->second; + ProtocolFramePtr frame = frame_data.frame; + + if (frame && + frame->frame_data() == FRAME_DATA_LAST_CONSECUTIVE && + frame->payload_size() > 0u ) { + LOG4CXX_DEBUG(logger_, "Ready frame: " << frame); + outpute_frame_list.push_back(frame); + messageId_map.erase(messageId_it++); + continue; + } + if (consecutive_frame_wait_msecs_ != 0) { + LOG4CXX_TRACE(logger_, "Expiration verification"); + const int64_t time_left = + date_time::DateTime::calculateTimeSpan(frame_data.append_time); + LOG4CXX_DEBUG(logger_, "mSecs left: " << time_left); + if (time_left >= consecutive_frame_wait_msecs_) { + LOG4CXX_WARN(logger_, "Expired frame: " << frame); + outpute_frame_list.push_back(frame); + messageId_map.erase(messageId_it++); + continue; + } + } + ++messageId_it; + } // iteration over messageId_map + } // iteration over session_map + } // iteration over multiframes_map_ + LOG4CXX_DEBUG(logger_, "Result frames count: " << outpute_frame_list.size()); + return outpute_frame_list; +} + +RESULT_CODE MultiFrameBuilder::AddFrame(const ProtocolFramePtr packet) { + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "Handling frame: " << packet); + LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_); + if (!packet) { + LOG4CXX_ERROR(logger_, "Skip empty frame"); + return RESULT_FAIL; + } + switch (packet->frame_type()) { + case FRAME_TYPE_FIRST: + LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST"); + return HandleFirstFrame(packet); + case FRAME_TYPE_CONSECUTIVE: + LOG4CXX_TRACE(logger_, "FRAME_TYPE_CONSECUTIVE"); + return HandleConsecutiveFrame(packet); + default: + LOG4CXX_ERROR(logger_, "Frame is not FIRST or CONSECUTIVE :" + << packet); + break; + } + return RESULT_FAIL; +} + +RESULT_CODE MultiFrameBuilder::HandleFirstFrame(const ProtocolFramePtr packet) { + DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_FIRST, + RESULT_FAIL); + LOG4CXX_DEBUG(logger_, "Waiting : " << multiframes_map_); + LOG4CXX_DEBUG(logger_, "Handling FIRST frame: " << packet); + if (packet->payload_size() != 0u) { + LOG4CXX_ERROR(logger_, + "First frame shall have no data:" << packet); + return RESULT_FAIL; + } + + const ConnectionID connection_id = packet->connection_id(); + MultiFrameMap::iterator connection_it = multiframes_map_.find(connection_id); + if (connection_it == multiframes_map_.end()) { + LOG4CXX_ERROR(logger_, "Unknown connection_id: " << connection_id); + return RESULT_FAIL; + } + SessionToFrameMap& session_map = connection_it->second; + + const SessionID session_id = packet->session_id(); + // No need to verify session existance + MessageIDToFrameMap& messageId_map = session_map[session_id]; + + const MessageID message_id = packet->message_id(); + MessageIDToFrameMap::iterator messageId_it = messageId_map.find(message_id); + if (messageId_it != messageId_map.end()) { + LOG4CXX_ERROR(logger_, "Already waiting message for connection_id: " << connection_id + << ", session_id: " << static_cast<int>(session_id) + << ", message_id: " << message_id); + return RESULT_FAIL; + } + + LOG4CXX_DEBUG(logger_, "Start waiting frames for connection_id: " << connection_id + << ", session_id: " << static_cast<int>(session_id) + << ", message_id: " << message_id); + messageId_map[message_id] = {packet, date_time::DateTime::getCurrentTime()}; + return RESULT_OK; +} + +RESULT_CODE MultiFrameBuilder::HandleConsecutiveFrame(const ProtocolFramePtr packet) { + DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_CONSECUTIVE, + RESULT_FAIL); + LOG4CXX_DEBUG(logger_, "Handling CONSECUTIVE frame: " << packet); + + + const ConnectionID connection_id = packet->connection_id(); + MultiFrameMap::iterator connection_it = multiframes_map_.find(connection_id); + if (connection_it == multiframes_map_.end()) { + LOG4CXX_ERROR(logger_, "Unknown connection_id: " << connection_id); + return RESULT_FAIL; + } + SessionToFrameMap& session_map = connection_it->second; + + const SessionID session_id = packet->session_id(); + // No need to verify session existance + MessageIDToFrameMap& messageId_map = session_map[session_id]; + + const MessageID message_id = packet->message_id(); + MessageIDToFrameMap::iterator messageId_it = messageId_map.find(message_id); + if (messageId_it == messageId_map.end()) { + LOG4CXX_ERROR(logger_, "No waiting message for connection_id: " << connection_id + << ", session_id: " << static_cast<int>(session_id) + << ", message_id: " << message_id); + return RESULT_FAIL; + } + + ProtocolFrameData& frame_data = messageId_it->second; + ProtocolFramePtr assembling_frame = frame_data.frame; + DCHECK_OR_RETURN(packet->message_id() == assembling_frame->message_id(), + RESULT_FAIL); + + const uint8_t new_frame_data = packet->frame_data(); + const bool is_last_consecutive = (new_frame_data == + FRAME_DATA_LAST_CONSECUTIVE); + + if (is_last_consecutive) { + // TODO(EZamakhov): implement count of frames and result size verification + LOG4CXX_DEBUG(logger_, "Last CONSECUTIVE frame"); + } else { + uint8_t previous_frame_data = assembling_frame->frame_data(); + if (previous_frame_data == std::numeric_limits<uint8_t>::max()) { + previous_frame_data = 0u; + } + // The next frame data is bigger at 1 + if (new_frame_data != (previous_frame_data + 1)) { + LOG4CXX_ERROR(logger_, + "Unexpected CONSECUTIVE frame for connection_id: " << connection_id + << ", session_id: " << static_cast<int>(session_id) + << ", message_id: " << message_id + << ", frame: " << packet); + return RESULT_FAIL; + } + } + + assembling_frame->set_frame_data(new_frame_data); + + LOG4CXX_DEBUG(logger_, + "Appending " << packet->data_size() << " bytes " + << "; frame_data " << static_cast<int>(new_frame_data) + << "; for connection_id: " << connection_id + << ", session_id: " << static_cast<int>(session_id) + << ", message_id: " << message_id); + + if (assembling_frame->appendData(packet->data(), + packet->data_size()) != RESULT_OK) { + LOG4CXX_ERROR(logger_, "Failed to append frame for multiframe message."); + return RESULT_FAIL; + } + frame_data.append_time = date_time::DateTime::getCurrentTime(); + return RESULT_OK; +} + +} // namespace protocol_handler diff --git a/src/components/protocol_handler/src/protocol_handler_impl.cc b/src/components/protocol_handler/src/protocol_handler_impl.cc index dccfffdf04..b3fb337703 100644 --- a/src/components/protocol_handler/src/protocol_handler_impl.cc +++ b/src/components/protocol_handler/src/protocol_handler_impl.cc @@ -63,11 +63,12 @@ uint8_t SupportedSDLProtocolVersion(); const size_t kStackSize = 32768; -ProtocolHandlerImpl::ProtocolHandlerImpl( - transport_manager::TransportManager *transport_manager_param, - size_t message_frequency_time, size_t message_frequency_count, - bool malformed_message_filtering, - size_t malformed_message_frequency_time, size_t malformed_message_frequency_count) +ProtocolHandlerImpl::ProtocolHandlerImpl(transport_manager::TransportManager *transport_manager_param, + size_t message_frequency_time, size_t message_frequency_count, + bool malformed_message_filtering, + size_t malformed_message_frequency_time, + size_t malformed_message_frequency_count, + uint32_t multiframe_waiting_timeout) : protocol_observers_(), session_observer_(0), transport_manager_(transport_manager_param), @@ -117,6 +118,7 @@ ProtocolHandlerImpl::ProtocolHandlerImpl( LOG4CXX_WARN(logger_, "Malformed message filtering is disabled." << "Connection will be close on first malformed message detection"); } + multiframe_builder_.set_waiting_timeout(multiframe_waiting_timeout); } ProtocolHandlerImpl::~ProtocolHandlerImpl() { @@ -502,6 +504,11 @@ void ProtocolHandlerImpl::OnTMMessageReceiveFailed( void ProtocolHandlerImpl::NotifySubscribers(const RawMessagePtr message) { LOG4CXX_AUTO_TRACE(logger_); sync_primitives::AutoLock lock(protocol_observers_lock_); + if (protocol_observers_.empty()) { + LOG4CXX_ERROR( + logger_, + "Cannot handle multiframe message: no IProtocolObserver is set."); + } for (ProtocolObservers::iterator it = protocol_observers_.begin(); protocol_observers_.end() != it; ++it) { (*it)->OnMessageReceived(message); @@ -570,6 +577,7 @@ void ProtocolHandlerImpl::OnConnectionEstablished( const transport_manager::DeviceInfo &device_info, const transport_manager::ConnectionUID &connection_id) { incoming_data_handler_.AddConnection(connection_id); + multiframe_builder_.AddConnection(connection_id); } void ProtocolHandlerImpl::OnConnectionClosed( @@ -577,6 +585,7 @@ void ProtocolHandlerImpl::OnConnectionClosed( incoming_data_handler_.RemoveConnection(connection_id); message_meter_.ClearIdentifiers(); malformed_message_meter_.ClearIdentifiers(); + multiframe_builder_.RemoveConnection(connection_id); } RESULT_CODE ProtocolHandlerImpl::SendFrame(const ProtocolFramePtr packet) { @@ -705,21 +714,20 @@ RESULT_CODE ProtocolHandlerImpl::SendMultiFrameMessage( return RESULT_OK; } -RESULT_CODE ProtocolHandlerImpl::HandleMessage(ConnectionID connection_id, - const ProtocolFramePtr packet) { +RESULT_CODE ProtocolHandlerImpl::HandleMessage(const ProtocolFramePtr packet) { DCHECK_OR_RETURN(packet, RESULT_UNKNOWN); LOG4CXX_DEBUG(logger_, "Handling message " << packet); switch (packet->frame_type()) { case FRAME_TYPE_CONTROL: LOG4CXX_TRACE(logger_, "FRAME_TYPE_CONTROL"); - return HandleControlMessage(connection_id, packet); + return HandleControlMessage(packet); case FRAME_TYPE_SINGLE: LOG4CXX_TRACE(logger_, "FRAME_TYPE_SINGLE"); - return HandleSingleFrameMessage(connection_id, packet); + return HandleSingleFrameMessage(packet); case FRAME_TYPE_FIRST: case FRAME_TYPE_CONSECUTIVE: LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST or FRAME_TYPE_CONSECUTIVE"); - return HandleMultiFrameMessage(connection_id, packet); + return HandleMultiFrameMessage(packet); default: { LOG4CXX_WARN(logger_, "Unknown frame type" << packet->frame_type()); @@ -729,8 +737,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleMessage(ConnectionID connection_id, return RESULT_OK; } -RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage( - ConnectionID connection_id, const ProtocolFramePtr packet) { +RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(const ProtocolFramePtr packet) { LOG4CXX_AUTO_TRACE(logger_); LOG4CXX_DEBUG(logger_, @@ -745,7 +752,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage( } const uint32_t connection_key = - session_observer_->KeyFromPair(connection_id, packet->session_id()); + session_observer_->KeyFromPair(packet->connection_id(), packet->session_id()); const RawMessagePtr rawMessage( new RawMessage(connection_key, @@ -773,8 +780,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage( return RESULT_OK; } -RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage( - ConnectionID connection_id, const ProtocolFramePtr packet) { +RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(const ProtocolFramePtr packet) { LOG4CXX_AUTO_TRACE(logger_); if (!session_observer_) { @@ -782,100 +788,14 @@ RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage( return RESULT_FAIL; } - const uint32_t key = session_observer_->KeyFromPair(connection_id, - packet->session_id()); - LOG4CXX_DEBUG( - logger_, - "Packet " << packet << "; session id " << static_cast<int32_t>(key)); - - if (packet->frame_type() == FRAME_TYPE_FIRST) { - LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST"); - // First frame has no data - DCHECK_OR_RETURN(packet->frame_data() == 0u, RESULT_FAIL); - // We can not handle more than one multiframe with the same key - DCHECK_OR_RETURN(incomplete_multi_frame_messages_.count(key) == 0, - RESULT_FAIL); - incomplete_multi_frame_messages_[key] = packet; - return RESULT_OK; + if (multiframe_builder_.AddFrame(packet) != RESULT_OK) { + LOG4CXX_WARN(logger_, "Frame assembling issue"); } - DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_CONSECUTIVE, RESULT_FAIL) - - MultiFrameMap::iterator it = incomplete_multi_frame_messages_.find(key); - if (it == incomplete_multi_frame_messages_.end()) { - LOG4CXX_ERROR(logger_, - "Frame of multiframe message for non-existing session id " << key); - return RESULT_FAIL; - } - - ProtocolFramePtr& assembling_frame = it->second; - const uint8_t previous_frame_data = assembling_frame->frame_data(); - const uint8_t new_frame_data = packet->frame_data(); - - const bool is_last_consecutive = (new_frame_data == FRAME_DATA_LAST_CONSECUTIVE); - // The next frame data is bigger at 1 - DCHECK_OR_RETURN((new_frame_data == (previous_frame_data + 1)) || - // except the last consecutive frame - is_last_consecutive, RESULT_FAIL); - - assembling_frame->set_frame_data(new_frame_data); - - LOG4CXX_DEBUG(logger_, - "Appending " << packet->data_size() << " bytes " - << "; frame_data " << static_cast<int>(new_frame_data) - << "; connection key " << key); - - DCHECK_OR_RETURN(packet->message_id() == assembling_frame->message_id(), RESULT_FAIL); - if (assembling_frame->appendData(packet->data(), packet->data_size()) != RESULT_OK) { - LOG4CXX_ERROR(logger_, "Failed to append frame for multiframe message."); - return RESULT_FAIL; - } - - if (is_last_consecutive) { - LOG4CXX_DEBUG( - logger_, - "Last frame of multiframe message size " << packet->data_size() - << "; connection key " << key); - { - sync_primitives::AutoLock lock(protocol_observers_lock_); - if (protocol_observers_.empty()) { - LOG4CXX_ERROR( - logger_, - "Cannot handle multiframe message: no IProtocolObserver is set."); - return RESULT_FAIL; - } - } - const uint32_t connection_key = - session_observer_->KeyFromPair(connection_id, - assembling_frame->session_id()); - LOG4CXX_DEBUG(logger_, "Result frame" << assembling_frame << - "for connection "<< connection_key); - const RawMessagePtr rawMessage( - new RawMessage(connection_key, - assembling_frame->protocol_version(), - assembling_frame->data(), - assembling_frame->total_data_bytes(), - assembling_frame->service_type(), - assembling_frame->payload_size())); - DCHECK_OR_RETURN(rawMessage, RESULT_FAIL); - -#ifdef TIME_TESTER - if (metric_observer_) { - PHMetricObserver::MessageMetric *metric = - new PHMetricObserver::MessageMetric(); - metric->raw_msg = rawMessage; - metric_observer_->EndMessageProcess(metric); - } -#endif // TIME_TESTER - // TODO(EZamakhov): check service in session - NotifySubscribers(rawMessage); - incomplete_multi_frame_messages_.erase(it); - } return RESULT_OK; } -RESULT_CODE ProtocolHandlerImpl::HandleControlMessage( - ConnectionID connection_id, const ProtocolFramePtr packet) { +RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(const ProtocolFramePtr packet) { LOG4CXX_AUTO_TRACE(logger_); if (!session_observer_) { @@ -885,17 +805,16 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessage( switch (packet->frame_data()) { case FRAME_DATA_START_SERVICE: - return HandleControlMessageStartSession(connection_id, *(packet.get())); + return HandleControlMessageStartSession(*packet); case FRAME_DATA_END_SERVICE: - return HandleControlMessageEndSession(connection_id, *(packet.get())); + return HandleControlMessageEndSession(*packet); case FRAME_DATA_HEART_BEAT: { - LOG4CXX_DEBUG(logger_, - "Received heart beat for connection " << connection_id); - return HandleControlMessageHeartBeat(connection_id, *(packet.get())); + LOG4CXX_TRACE(logger_, "FRAME_DATA_HEART_BEAT"); + return HandleControlMessageHeartBeat(*packet); } case FRAME_DATA_HEART_BEAT_ACK: { LOG4CXX_DEBUG(logger_, "Received heart beat ack from mobile app" - " for connection " << connection_id); + " for connection " << packet->connection_id()); return RESULT_OK; } default: @@ -922,14 +841,14 @@ uint32_t get_hash_id(const ProtocolPacket &packet) { return hash_le == HASH_ID_NOT_SUPPORTED ? HASH_ID_WRONG : hash_le; } -RESULT_CODE ProtocolHandlerImpl::HandleControlMessageEndSession( - ConnectionID connection_id, const ProtocolPacket &packet) { +RESULT_CODE ProtocolHandlerImpl::HandleControlMessageEndSession(const ProtocolPacket &packet) { LOG4CXX_AUTO_TRACE(logger_); const uint8_t current_session_id = packet.session_id(); const uint32_t hash_id = get_hash_id(packet); const ServiceType service_type = ServiceTypeFromByte(packet.service_type()); + const ConnectionID connection_id = packet.connection_id(); const uint32_t session_key = session_observer_->OnSessionEndedCallback( connection_id, current_session_id, hash_id, service_type); @@ -1025,8 +944,7 @@ class StartSessionHandler : public security_manager::SecurityManagerListener { } // namespace #endif // ENABLE_SECURITY -RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession( - ConnectionID connection_id, const ProtocolPacket &packet) { +RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(const ProtocolPacket &packet) { LOG4CXX_DEBUG(logger_, "Protocol version:" << static_cast<int>(packet.protocol_version())); @@ -1043,6 +961,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession( DCHECK(session_observer_); uint32_t hash_id; + const ConnectionID connection_id = packet.connection_id(); const uint32_t session_id = session_observer_->OnSessionStartedCallback( connection_id, packet.session_id(), service_type, protection, &hash_id); @@ -1100,8 +1019,8 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession( return RESULT_OK; } -RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat( - ConnectionID connection_id, const ProtocolPacket &packet) { +RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(const ProtocolPacket &packet) { + const ConnectionID connection_id = packet.connection_id(); LOG4CXX_DEBUG( logger_, "Sending heart beat acknowledgment for connection " << connection_id); @@ -1124,6 +1043,41 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat( } } +void ProtocolHandlerImpl::PopValideAndExpirateMultiframes() { + const ProtocolFramePtrList& frame_list = multiframe_builder_.PopMultiframes(); + for (ProtocolFramePtrList::const_iterator it = frame_list.begin(); + it != frame_list.end(); ++it) { + const ProtocolFramePtr frame = *it; + DCHECK(frame); + if(!frame) { + continue; + } + + const uint32_t connection_key = + session_observer_->KeyFromPair(frame->connection_id(), frame->session_id()); + LOG4CXX_DEBUG(logger_, "Result frame" << frame << + "for connection "<< connection_key); + const RawMessagePtr rawMessage( + new RawMessage(connection_key, + frame->protocol_version(), + frame->data(), + frame->total_data_bytes(), + frame->service_type(), + frame->payload_size())); + DCHECK(rawMessage); + +#ifdef TIME_TESTER + if (metric_observer_) { + PHMetricObserver::MessageMetric *metric = + new PHMetricObserver::MessageMetric(); + metric->raw_msg = rawMessage; + metric_observer_->EndMessageProcess(metric); + } +#endif // TIME_TESTER + NotifySubscribers(rawMessage); + } +} + bool ProtocolHandlerImpl::TrackMessage(const uint32_t& connection_key) { LOG4CXX_AUTO_TRACE(logger_); if (message_frequency_time_ > 0u && @@ -1203,7 +1157,8 @@ void ProtocolHandlerImpl::Handle( FRAME_TYPE_CONTROL == message->frame_type() || FRAME_TYPE_FIRST == message->frame_type()) { LOG4CXX_DEBUG(logger_, "Packet: dataSize " << message->data_size()); - HandleMessage(message->connection_id(), message); + HandleMessage(message); + PopValideAndExpirateMultiframes(); } else { LOG4CXX_WARN(logger_, "handleMessagesFromMobileApp() - incorrect or NULL data"); diff --git a/src/components/protocol_handler/src/protocol_packet.cc b/src/components/protocol_handler/src/protocol_packet.cc index 1ed6e05699..8675d7543e 100644 --- a/src/components/protocol_handler/src/protocol_packet.cc +++ b/src/components/protocol_handler/src/protocol_packet.cc @@ -33,6 +33,7 @@ #include <stdint.h> #include <memory.h> #include <new> +#include <memory> #include <cstring> #include <limits> @@ -247,7 +248,7 @@ RESULT_CODE ProtocolPacket::ProtocolHeaderValidator::validate( } ProtocolPacket::ProtocolPacket() - : payload_size_(0), connection_id_(0) { + : payload_size_(0u), connection_id_(0u) { } ProtocolPacket::ProtocolPacket(ConnectionID connection_id, @@ -382,7 +383,7 @@ RESULT_CODE ProtocolPacket::deserializePacket( dataPayloadSize = messageSize - offset; } - uint8_t *data = NULL; + uint8_t* data = NULL; if (dataPayloadSize) { data = new (std::nothrow) uint8_t[dataPayloadSize]; if (!data) { @@ -401,6 +402,7 @@ RESULT_CODE ProtocolPacket::deserializePacket( total_data_bytes |= data[3]; set_total_data_bytes(total_data_bytes); if (0 == packet_data_.data) { + delete[] data; return RESULT_FAIL; } } else { @@ -482,7 +484,7 @@ uint32_t ProtocolPacket::total_data_bytes() const { return packet_data_.totalDataBytes; } -uint8_t ProtocolPacket::connection_id() const { +ConnectionID ProtocolPacket::connection_id() const { return connection_id_; } diff --git a/src/components/protocol_handler/test/CMakeLists.txt b/src/components/protocol_handler/test/CMakeLists.txt index 6e12eeb581..3cb0dfc106 100644 --- a/src/components/protocol_handler/test/CMakeLists.txt +++ b/src/components/protocol_handler/test/CMakeLists.txt @@ -52,6 +52,7 @@ set(SOURCES protocol_handler_tm_test.cc protocol_packet_test.cc protocol_payload_test.cc + multiframe_builder_test.cc ) create_test("protocol_handler_test" "${SOURCES}" "${LIBRARIES}") diff --git a/src/components/protocol_handler/test/include/protocol_handler_mock.h b/src/components/protocol_handler/test/include/protocol_handler_mock.h index 41b7c491a4..9ad3545ceb 100644 --- a/src/components/protocol_handler/test/include/protocol_handler_mock.h +++ b/src/components/protocol_handler/test/include/protocol_handler_mock.h @@ -163,8 +163,10 @@ class SessionObserverMock : public protocol_handler::SessionObserver { MOCK_METHOD3(ProtocolVersionUsed, bool( uint32_t connection_id, uint8_t session_id, uint8_t& protocol_version)); +#ifdef ENABLE_SECURITY MOCK_CONST_METHOD1(GetHandshakeContext, security_manager::SSLContext::HandshakeContext (const uint32_t key) ); +#endif // ENABLE_SECURITY }; #ifdef ENABLE_SECURITY diff --git a/src/components/protocol_handler/test/multiframe_builder_test.cc b/src/components/protocol_handler/test/multiframe_builder_test.cc new file mode 100644 index 0000000000..d1910c18e1 --- /dev/null +++ b/src/components/protocol_handler/test/multiframe_builder_test.cc @@ -0,0 +1,522 @@ +/* + * Copyright (c) 2015, Ford Motor Company + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#include <gtest/gtest.h> +#include <vector> +#include <utility> +#include <limits> +#include "utils/make_shared.h" +#include "protocol_handler/multiframe_builder.h" + +namespace test { +namespace components { +namespace protocol_handler_test { + +using namespace protocol_handler; + +typedef std::vector<ConnectionID> ConnectionList; +typedef std::vector<uint8_t> UCharDataVector; + +struct MutiframeData { + UCharDataVector binary_data; + ProtocolFramePtrList multiframes; +}; + +/** + *\brief Map of MutiframeData by MessageID key + */ +typedef std::map<MessageID, MutiframeData> MessageIDToMutiframeDataTestMap; +/** + *\brief Map of MessageIDToMutiframeDataMap by SessionID key + */ +typedef std::map<SessionID, MessageIDToMutiframeDataTestMap> SessionToMutiframeDataTestMap; +/** + *\brief Map of SessionToMutiframeDataMap by ConnectionID key + */ +typedef std::map<ConnectionID, SessionToMutiframeDataTestMap> MultiFrameTestMap; + +template<typename IntegerType> +std::vector<IntegerType> getTestVector() { + // Prepare array with a few minimals, middle and a few maximum values + const IntegerType array[] = { + std::numeric_limits<IntegerType>::min(), + std::numeric_limits<IntegerType>::min() + 1, + std::numeric_limits<IntegerType>::max() / 2, + std::numeric_limits<IntegerType>::max() - 1 , + std::numeric_limits<IntegerType>::max() + }; + return std::vector<IntegerType>(array, + array + sizeof(array) / sizeof(array[0]) ); +} +template<typename IntegerType> +struct Incrementor { + IntegerType value; + Incrementor(const IntegerType value = 0u) + : value(value) { + } + IntegerType operator() () { + return ++value; + } +}; + + +class MultiFrameBuilderTest : public ::testing::Test { + protected: + void SetUp() OVERRIDE { + + const std::vector<ConnectionID> connections = getTestVector<ConnectionID>(); + const std::vector<SessionID> sessions = getTestVector<SessionID>(); + const std::vector<MessageID> messages = getTestVector<MessageID>(); + + MutiframeData some_data; + + const uint8_t protocol_version = PROTOCOL_VERSION_2; + const uint8_t service_type = SERVICE_TYPE_RPC; + + // We need 255+ messages for rolling over max uint8_t value + int multi_frames_count = std::numeric_limits<uint8_t>::max() * 2; + + // Prepare C connections with S sessions with M messages data + for (size_t c = 0; c < connections.size(); ++c) { + const ConnectionID connection_id = connections[c]; + + SessionToMutiframeDataTestMap sessions_map; + for (size_t s = 0; s < sessions.size(); ++s) { + const SessionID session_id = sessions[s]; + + MessageIDToMutiframeDataTestMap messages_map; + for (size_t m = 0; m < messages.size(); ++m) { + const MessageID message_id = messages[m]; + + UCharDataVector& data_vector = some_data.binary_data; + // Sahll not be 1 consecutive frame + ASSERT_GT(multi_frames_count, 1); + data_vector.resize(++multi_frames_count * mtu_); + + std::generate(data_vector.begin(), data_vector.end(), Incrementor<uint8_t>(0u)); + + PrepareMultiFrames(connection_id, + protocol_version, + service_type, + session_id, + message_id, + mtu_, + data_vector, + some_data.multiframes); + messages_map.insert(std::make_pair(message_id, some_data)); + } + sessions_map.insert(std::make_pair(session_id, messages_map)); + } + test_data_map_.insert(std::make_pair(connection_id, sessions_map)); + } + } + + // Support method for first and consecutive frame disassembling + static void PrepareMultiFrames( + const ConnectionID connection_id, + const uint8_t protocol_version, + const uint8_t service_type, + const uint8_t session_id, + const uint32_t message_id, + const size_t max_frame_size, + const UCharDataVector& data, + ProtocolFramePtrList& out_frames); + + void AddConnection(const ConnectionID connection_id); + + void AddConnections(); + + void VerifyConsecutiveAdd(const MutiframeData& multiframe_data); + + MultiFrameBuilder multiframe_builder_; + MultiFrameTestMap test_data_map_; + static size_t mtu_; +}; + +size_t MultiFrameBuilderTest::mtu_ = 10; + +TEST_F(MultiFrameBuilderTest, Pop_Frames_From_Empty_builder) { + EXPECT_EQ(ProtocolFramePtrList(), + multiframe_builder_.PopMultiframes()); +} + +TEST_F(MultiFrameBuilderTest, Pop_Frames_with_existing_connections) { + AddConnections(); + EXPECT_EQ(ProtocolFramePtrList(), + multiframe_builder_.PopMultiframes()); +} + +TEST_F(MultiFrameBuilderTest, Add_EmptyFrame) { + EXPECT_EQ(RESULT_FAIL, + multiframe_builder_.AddFrame(ProtocolFramePtr())); + EXPECT_EQ(ProtocolFramePtrList(), + multiframe_builder_.PopMultiframes()); +} + +TEST_F(MultiFrameBuilderTest, Add_NonSingleOrConsecutive_Frames) { + UCharDataVector types; + types.reserve(std::numeric_limits<uint8_t>::max()); + for (uint8_t type = std::numeric_limits<uint8_t>::min(); + type < std::numeric_limits<uint8_t>::max(); ++type) { + if (type != FRAME_TYPE_FIRST && + type != FRAME_TYPE_CONSECUTIVE) { + types.push_back(type); + } + } + + for (UCharDataVector::iterator it = types.begin(); it != types.end(); ++it) { + const uint8_t frame_type = *it; + const ProtocolFramePtr unexpected_frame( + new ProtocolPacket( 0u, PROTOCOL_VERSION_3, PROTECTION_OFF, frame_type, + SERVICE_TYPE_RPC, FRAME_DATA_FIRST, 0u, 0u, 0u)); + EXPECT_EQ(RESULT_FAIL, + multiframe_builder_.AddFrame(unexpected_frame)) + << "Unexpected frame: " << unexpected_frame; + + EXPECT_EQ(ProtocolFramePtrList(), + multiframe_builder_.PopMultiframes()) + << "Unexpected frame: " << unexpected_frame; + } +} + +TEST_F(MultiFrameBuilderTest, Add_FirstFrames_NoConnections) { + for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin(); + connection_it != test_data_map_.end(); ++connection_it) { + SessionToMutiframeDataTestMap& session_map = connection_it->second; + const ConnectionID connection_id = connection_it->first; + + for (SessionToMutiframeDataTestMap::iterator session_it = session_map.begin(); + session_it != session_map.end(); ++session_it) { + MessageIDToMutiframeDataTestMap& messageId_map = session_it->second; + + for (MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin(); + messageId_it != messageId_map.end(); ++messageId_it) { + const MutiframeData& multiframe_data = messageId_it->second; + + const ProtocolFramePtrList& multiframes = multiframe_data.multiframes; + ASSERT_FALSE(multiframes.empty()); + const ProtocolFramePtr first_frame = multiframes.front(); + ASSERT_TRUE(first_frame); + EXPECT_EQ(RESULT_FAIL, + multiframe_builder_.AddFrame(first_frame)) + << "Unexisting connection " << connection_id + << "- to be skipped first frame: " << first_frame; + + EXPECT_EQ(ProtocolFramePtrList(), + multiframe_builder_.PopMultiframes()) + << "First frame: " << first_frame; + } + } + } +} + +TEST_F(MultiFrameBuilderTest, Add_FirstFrames_only) { + AddConnections(); + for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin(); + connection_it != test_data_map_.end(); ++connection_it) { + SessionToMutiframeDataTestMap& session_map = connection_it->second; + + for (SessionToMutiframeDataTestMap::iterator session_it = session_map.begin(); + session_it != session_map.end(); ++session_it) { + MessageIDToMutiframeDataTestMap& messageId_map = session_it->second; + + for (MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin(); + messageId_it != messageId_map.end(); ++messageId_it) { + const MutiframeData& multiframe_data = messageId_it->second; + + const ProtocolFramePtrList& multiframes = multiframe_data.multiframes; + ASSERT_FALSE(multiframes.empty()); + const ProtocolFramePtr first_frame = multiframes.front(); + ASSERT_TRUE(first_frame); + EXPECT_EQ(RESULT_OK, + multiframe_builder_.AddFrame(first_frame)) + << "First frame: " << first_frame; + + EXPECT_EQ(ProtocolFramePtrList(), + multiframe_builder_.PopMultiframes()) + << "First frame: " << first_frame; + } + } + } +} + +TEST_F(MultiFrameBuilderTest, Add_ConsecutiveFrame) { + ASSERT_FALSE(test_data_map_.empty()); + const ConnectionID& connection_id = test_data_map_.begin()->first; + SessionToMutiframeDataTestMap& session_map = test_data_map_.begin()->second; + + AddConnection(connection_id); + + ASSERT_FALSE(session_map.empty()); + MessageIDToMutiframeDataTestMap& messageId_map = session_map.begin()->second; + + ASSERT_FALSE(messageId_map.empty()); + const MutiframeData& multiframe_data = messageId_map.begin()->second; + + VerifyConsecutiveAdd(multiframe_data); +} + +TEST_F(MultiFrameBuilderTest, Add_ConsecutiveFrames_OneByOne) { + AddConnections(); + for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin(); + connection_it != test_data_map_.end(); ++connection_it) { + SessionToMutiframeDataTestMap& session_map = connection_it->second; + + for (SessionToMutiframeDataTestMap::iterator session_it = session_map.begin(); + session_it != session_map.end(); ++session_it) { + MessageIDToMutiframeDataTestMap& messageId_map = session_it->second; + + for (MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin(); + messageId_it != messageId_map.end(); ++messageId_it) { + const MutiframeData& multiframe_data = messageId_it->second; + + VerifyConsecutiveAdd(multiframe_data); + } + } + } +} + +TEST_F(MultiFrameBuilderTest, Add_ConsecutiveFrames_per1) { + AddConnections(); + ASSERT_FALSE(test_data_map_.empty()); + // After processing each frame we remove it from messageId_it + // After processing all session data - it removes from session_map + // and so on + // TODO(Ezamakhov): optimize speed of test by skipping erasing data + while (!test_data_map_.empty()) { + MultiFrameTestMap::iterator connection_it = test_data_map_.begin(); + while (connection_it != test_data_map_.end()) { + SessionToMutiframeDataTestMap& session_map = connection_it->second; + + SessionToMutiframeDataTestMap::iterator session_it = session_map.begin(); + while (session_it != session_map.end()) { + MessageIDToMutiframeDataTestMap& messageId_map = session_it->second; + + MessageIDToMutiframeDataTestMap::iterator messageId_it = messageId_map.begin(); + while (messageId_it != messageId_map.end()) { + + MutiframeData& multiframe_data = messageId_it->second; + ProtocolFramePtrList& multiframes = multiframe_data.multiframes; + ASSERT_FALSE(multiframes.empty()); + + const ProtocolFramePtr frame = multiframes.front(); + ASSERT_TRUE(frame); + + EXPECT_EQ(RESULT_OK, + multiframe_builder_.AddFrame(frame)) << "Frame: " << frame; + + multiframes.pop_front(); + + // If all frames are assembled + if (multiframes.empty()) { + const ProtocolFramePtrList& multiframe_list + = multiframe_builder_.PopMultiframes(); + ASSERT_EQ(multiframe_list.size(), 1u); + + const ProtocolFramePtr result_multiframe = multiframe_list.front(); + const UCharDataVector& binary_data = multiframe_data.binary_data; + EXPECT_EQ(binary_data, + UCharDataVector(result_multiframe->data(), + result_multiframe->data() + + result_multiframe->payload_size())); + messageId_map.erase(messageId_it++); + } else { + // Multiframe is not completed + EXPECT_EQ(ProtocolFramePtrList(), + multiframe_builder_.PopMultiframes()) << "Frame: " << frame; + ++messageId_it; + } + } + if (messageId_map.empty()) { + session_map.erase(session_it++); + } else { + ++session_it; + } + } + if (session_map.empty()) { + test_data_map_.erase(connection_it++); + } else { + ++connection_it; + } + } + } +} + +TEST_F(MultiFrameBuilderTest, FrameExpired_OneMSec) { + multiframe_builder_.set_waiting_timeout(1); + + ASSERT_FALSE(test_data_map_.empty()); + const ConnectionID& connection_id = test_data_map_.begin()->first; + SessionToMutiframeDataTestMap& session_map = test_data_map_.begin()->second; + + AddConnection(connection_id); + + ASSERT_FALSE(session_map.empty()); + MessageIDToMutiframeDataTestMap& messageId_map = session_map.begin()->second; + + ASSERT_FALSE(messageId_map.empty()); + const MutiframeData& multiframe_data = messageId_map.begin()->second; + + const ProtocolFramePtrList& multiframes = multiframe_data.multiframes; + ASSERT_FALSE(multiframes.empty()); + const ProtocolFramePtr first_frame = multiframes.front(); + ASSERT_TRUE(first_frame); + EXPECT_EQ(RESULT_OK, + multiframe_builder_.AddFrame(first_frame)) + << "First frame: " << first_frame; + + // Wait frame expire + usleep(1000); + const ProtocolFramePtrList& list = multiframe_builder_.PopMultiframes(); + ASSERT_FALSE(list.empty()); + EXPECT_EQ(first_frame, + list.front()); +} + +/* + * Testing support methods + */ + +void MultiFrameBuilderTest::VerifyConsecutiveAdd(const MutiframeData& multiframe_data) { + const ProtocolFramePtrList& multiframes = multiframe_data.multiframes; + const UCharDataVector& binary_data = multiframe_data.binary_data; + ASSERT_FALSE(multiframes.empty()); + + // Frame of multiframe loop + ProtocolFramePtrList::const_iterator it = multiframes.begin(); + // Skip last final frame + const ProtocolFramePtrList::const_iterator it_last = --(multiframes.end()); + while (it != it_last) { + const ProtocolFramePtr frame = *it; + ASSERT_TRUE(frame); + EXPECT_EQ(RESULT_OK, + multiframe_builder_.AddFrame(frame)) + << "Non final CONSECUTIVE frame: " << frame; + EXPECT_EQ(ProtocolFramePtrList(), + multiframe_builder_.PopMultiframes()) + << "Non final CONSECUTIVE frame: " << frame; + ++it; + // Skip last final frame + } + + const ProtocolFramePtr final_frame = multiframes.back(); + + EXPECT_EQ(RESULT_OK, + multiframe_builder_.AddFrame(final_frame)) + << "Final CONSECUTIVE frame: " << final_frame; + + const ProtocolFramePtrList& multiframe_list + = multiframe_builder_.PopMultiframes(); + ASSERT_EQ(multiframe_list.size(), 1u); + + const ProtocolFramePtr result_multiframe = multiframe_list.front(); + EXPECT_EQ(binary_data, + UCharDataVector(result_multiframe->data(), + result_multiframe->data() + result_multiframe->payload_size())); +} + +void MultiFrameBuilderTest::PrepareMultiFrames(const ConnectionID connection_id, + const uint8_t protocol_version, + const uint8_t service_type, + const uint8_t session_id, + const uint32_t message_id, + const size_t max_payload_size, + const UCharDataVector& data, + ProtocolFramePtrList& out_frames) { + ASSERT_GT(max_payload_size, FIRST_FRAME_DATA_SIZE); + ASSERT_EQ(FIRST_FRAME_DATA_SIZE, 0x08); + + // TODO(EZamakhov): move to the separate class + const size_t data_size = data.size(); + // remainder of last frame + const size_t lastframe_remainder = data_size % max_payload_size; + // size of last frame (full fill or not) + const size_t lastframe_size = + lastframe_remainder > 0 ? lastframe_remainder : max_payload_size; + + const size_t frames_count = data_size / max_payload_size + + // add last frame if not empty + (lastframe_remainder > 0 ? 1 : 0); + + uint8_t out_data[FIRST_FRAME_DATA_SIZE]; + out_data[0] = data_size >> 24; + out_data[1] = data_size >> 16; + out_data[2] = data_size >> 8; + out_data[3] = data_size; + + out_data[4] = frames_count >> 24; + out_data[5] = frames_count >> 16; + out_data[6] = frames_count >> 8; + out_data[7] = frames_count; + + ProtocolFramePtr first_frame( + new ProtocolPacket( + connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_FIRST, + service_type, FRAME_DATA_FIRST, session_id, FIRST_FRAME_DATA_SIZE, + message_id, out_data)); + // Note: PHIMpl already prepare First frames the total_data_bytes on desirialization + first_frame->set_total_data_bytes(data_size); + + out_frames.clear(); + out_frames.push_back(first_frame); + + for (size_t i = 0; i < frames_count; ++i) { + const bool is_last_frame = (i == (frames_count - 1)); + const size_t frame_size = is_last_frame ? lastframe_size : max_payload_size; + const uint8_t data_type = + is_last_frame + ? FRAME_DATA_LAST_CONSECUTIVE + : (i % FRAME_DATA_MAX_CONSECUTIVE + 1); + + const ProtocolFramePtr consecutive_frame( + new ProtocolPacket( + connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_CONSECUTIVE, + service_type, data_type, session_id, frame_size, message_id, + &data[max_payload_size * i])); + out_frames.push_back(consecutive_frame); + } +} + +void MultiFrameBuilderTest::AddConnection(const ConnectionID connection_id) { + ASSERT_TRUE(multiframe_builder_.AddConnection(connection_id)); +} + +void MultiFrameBuilderTest::AddConnections() { + for (MultiFrameTestMap::iterator connection_it = test_data_map_.begin(); + connection_it != test_data_map_.end(); ++connection_it) { + const ConnectionID connection_id = connection_it->first; + ASSERT_TRUE(multiframe_builder_.AddConnection(connection_id)); + } +} + +} // namespace protocol_handler_test +} // namespace components +} // namespace test diff --git a/src/components/protocol_handler/test/protocol_handler_tm_test.cc b/src/components/protocol_handler/test/protocol_handler_tm_test.cc index 4ef070c52f..ea8f7b122e 100644 --- a/src/components/protocol_handler/test/protocol_handler_tm_test.cc +++ b/src/components/protocol_handler/test/protocol_handler_tm_test.cc @@ -64,12 +64,15 @@ class ProtocolHandlerImplTest : public ::testing::Test { const size_t period_msec, const size_t max_messages, bool malformed_message_filtering = false, const size_t malformd_period_msec = 0u, - const size_t malformd_max_messages = 0u) { + const size_t malformd_max_messages = 0u, + const int32_t multiframe_waiting_timeout = 0) { protocol_handler_impl.reset( new ProtocolHandlerImpl(&transport_manager_mock, period_msec, max_messages, malformed_message_filtering, - malformd_period_msec, malformd_max_messages)); + malformd_period_msec, + malformd_max_messages, + multiframe_waiting_timeout)); protocol_handler_impl->set_session_observer(&session_observer_mock); tm_listener = protocol_handler_impl.get(); } @@ -249,7 +252,7 @@ TEST_F(ProtocolHandlerImplTest, StartSession_Protected_SessionObserverReject) { // For enabled protection callback shall use protection ON const bool callback_protection_flag = PROTECTION_ON; #else - // For disabled protection callback shall ignore protection income flad and use protection OFF + // For disabled protection callback shall ignore protection income flag and use protection OFF const bool callback_protection_flag = PROTECTION_OFF; #endif // ENABLE_SECURITY // expect ConnectionHandler check diff --git a/src/components/time_tester/test/time_manager_test.cc b/src/components/time_tester/test/time_manager_test.cc index 60f3bd6814..c5ad607a97 100644 --- a/src/components/time_tester/test/time_manager_test.cc +++ b/src/components/time_tester/test/time_manager_test.cc @@ -55,7 +55,7 @@ class StreamerMock : public Streamer { TEST(TimeManagerTest, DISABLED_MessageProcess) { //TODO(AK) APPLINK-13351 Disable due to refactor TimeTester protocol_handler_test::TransportManagerMock transport_manager_mock; - protocol_handler::ProtocolHandlerImpl protocol_handler_mock(&transport_manager_mock, 0, 0, 0, 0, 0); + protocol_handler::ProtocolHandlerImpl protocol_handler_mock(&transport_manager_mock, 0, 0, 0, 0, 0, 0); TimeManager * time_manager = new TimeManager(); // Streamer will be deleted by Thread StreamerMock* streamer_mock = new StreamerMock(time_manager); |