summaryrefslogtreecommitdiff
path: root/src/components/protocol_handler/src
diff options
context:
space:
mode:
authorElisey Zamakhov <EZamakhov@luxoft.com>2015-11-03 16:26:16 +0300
committerElisey Zamakhov <EZamakhov@luxoft.com>2015-11-06 21:26:05 +0300
commitdc7d430a5f40c9f2b0e42cf5e29dfe1d3d98c1a4 (patch)
tree80e8f47e137199ba35daa4f88ac9d753fcff74f1 /src/components/protocol_handler/src
parent889599ab98c2ed4eac38a7832aabbf4a872e6005 (diff)
downloadsdl_core-dc7d430a5f40c9f2b0e42cf5e29dfe1d3d98c1a4.tar.gz
Implement Multiframe assembling by MessageID
Each Session resposible for assembling multiframes according to MessageID Implemented MultiframeBuilder support class Add MultiframeBuilder tests Integrated MultiframeBuilder usage in PHimpl with pop all expired frames on each handled frame on any session of any connection Added Configurable parameter in ConfigProfile for expirated time configuration Updated PASA and Genevi ini files Fixed crash on printing empty ProtocolFramePtr Added ProtocolFramePtrList, ConnectionID typedefs to protocol_packet.h Removed redundant ConnectionID usage in PHImpl Fixed memoty leak in ProtocolPacket Code style fixs in PHIMpl. IncomingDataHandler Issues:CRQ - APPLINK-17629, Defect fixed indirectly - APPLINK-17954
Diffstat (limited to 'src/components/protocol_handler/src')
-rw-r--r--src/components/protocol_handler/src/incoming_data_handler.cc38
-rw-r--r--src/components/protocol_handler/src/multiframe_builder.cc273
-rw-r--r--src/components/protocol_handler/src/protocol_handler_impl.cc189
-rw-r--r--src/components/protocol_handler/src/protocol_packet.cc8
4 files changed, 369 insertions, 139 deletions
diff --git a/src/components/protocol_handler/src/incoming_data_handler.cc b/src/components/protocol_handler/src/incoming_data_handler.cc
index 43b0898cc2..f1ceb18425 100644
--- a/src/components/protocol_handler/src/incoming_data_handler.cc
+++ b/src/components/protocol_handler/src/incoming_data_handler.cc
@@ -40,32 +40,32 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "ProtocolHandler")
IncomingDataHandler::IncomingDataHandler()
: header_(),
validator_(NULL),
- last_portion_of_data_was_malformed_(false)
- {}
+ last_portion_of_data_was_malformed_(false) {
+}
void IncomingDataHandler::set_validator(
- const ProtocolPacket::ProtocolHeaderValidator *const validator) {
+ const ProtocolPacket::ProtocolHeaderValidator* const validator) {
validator_ = validator;
}
static const size_t MIN_HEADER_SIZE = std::min(PROTOCOL_HEADER_V1_SIZE,
PROTOCOL_HEADER_V2_SIZE);
-std::list<ProtocolFramePtr> IncomingDataHandler::ProcessData(
- const RawMessage &tm_message,
- RESULT_CODE *result,
- size_t *malformed_occurrence) {
+ProtocolFramePtrList IncomingDataHandler::ProcessData(
+ const RawMessage& tm_message,
+ RESULT_CODE* result,
+ size_t* malformed_occurrence) {
LOG4CXX_AUTO_TRACE(logger_);
DCHECK(result);
DCHECK(malformed_occurrence);
const transport_manager::ConnectionUID connection_id =
tm_message.connection_key();
- const uint8_t *data = tm_message.data();
+ const uint8_t* data = tm_message.data();
const size_t tm_message_size = tm_message.data_size();
if (tm_message_size == 0 || data == NULL) {
LOG4CXX_WARN(logger_, "Wrong raw message " << tm_message_size << " bytes");
*result = RESULT_FAIL;
- return std::list<ProtocolFramePtr>();
+ return ProtocolFramePtrList();
}
LOG4CXX_DEBUG(logger_, "Processing incoming data of size "
<< tm_message_size << " for connection " << connection_id);
@@ -73,13 +73,13 @@ std::list<ProtocolFramePtr> IncomingDataHandler::ProcessData(
if (connections_data_.end() == it) {
LOG4CXX_WARN(logger_, "ProcessData requested for unknown connection");
*result = RESULT_FAIL;
- return std::list<ProtocolFramePtr>();
+ return ProtocolFramePtrList();
}
- std::vector<uint8_t> &connection_data = it->second;
+ std::vector<uint8_t>& connection_data = it->second;
connection_data.insert(connection_data.end(), data, data + tm_message_size);
LOG4CXX_DEBUG(logger_, "Total data size for connection "
<< connection_id << " is " << connection_data.size());
- std::list<ProtocolFramePtr> out_frames;
+ ProtocolFramePtrList out_frames;
*malformed_occurrence = 0;
*result = CreateFrame(connection_data, out_frames, *malformed_occurrence, connection_id);
LOG4CXX_DEBUG(logger_, "New data size for connection " << connection_id
@@ -117,7 +117,7 @@ void IncomingDataHandler::RemoveConnection(
}
uint32_t IncomingDataHandler::GetPacketSize(
- const ProtocolPacket::ProtocolHeader &header) {
+ const ProtocolPacket::ProtocolHeader& header) {
switch (header.version) {
case PROTOCOL_VERSION_1:
return header.dataSize + PROTOCOL_HEADER_V1_SIZE;
@@ -134,9 +134,9 @@ uint32_t IncomingDataHandler::GetPacketSize(
}
RESULT_CODE IncomingDataHandler::CreateFrame(
- std::vector<uint8_t> &incoming_data,
- std::list<ProtocolFramePtr> &out_frames,
- size_t &malformed_occurrence,
+ std::vector<uint8_t>& incoming_data,
+ ProtocolFramePtrList& out_frames,
+ size_t& malformed_occurrence,
const transport_manager::ConnectionUID connection_id) {
LOG4CXX_AUTO_TRACE(logger_);
std::vector<uint8_t>::iterator data_it = incoming_data.begin();
@@ -157,7 +157,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame(
++data_it;
--data_size;
LOG4CXX_DEBUG(logger_, "Moved to the next byte " << std::hex
- << static_cast<const void *>(&*data_it));
+ << static_cast<const void*>(&*data_it));
continue;
}
LOG4CXX_DEBUG(logger_, "Payload size " << header_.dataSize);
@@ -167,7 +167,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame(
++data_it;
--data_size;
LOG4CXX_DEBUG(logger_, "Moved to the next byte " << std::hex
- << static_cast<const void *>(&*data_it));
+ << static_cast<const void*>(&*data_it));
continue;
}
if (data_size < packet_size) {
@@ -179,7 +179,7 @@ RESULT_CODE IncomingDataHandler::CreateFrame(
const RESULT_CODE deserialize_result =
frame->deserializePacket(&*data_it, packet_size);
LOG4CXX_DEBUG(
- logger_, "Deserialized frame " << frame);
+ logger_, "Deserialized frame " << frame);
if (deserialize_result != RESULT_OK) {
LOG4CXX_WARN(logger_, "Packet deserialization failed");
incoming_data.erase(incoming_data.begin(), data_it);
diff --git a/src/components/protocol_handler/src/multiframe_builder.cc b/src/components/protocol_handler/src/multiframe_builder.cc
new file mode 100644
index 0000000000..9dc2c08fa5
--- /dev/null
+++ b/src/components/protocol_handler/src/multiframe_builder.cc
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2015, Ford Motor Company
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * Neither the name of the Ford Motor Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "protocol_handler/multiframe_builder.h"
+
+#include <limits>
+
+#include "utils/logger.h"
+#include "utils/make_shared.h"
+#include "utils/lock.h"
+#include "utils/date_time.h"
+
+namespace protocol_handler {
+
+CREATE_LOGGERPTR_GLOBAL(logger_, "ProtocolHandler")
+
+MultiFrameBuilder::MultiFrameBuilder()
+ : consecutive_frame_wait_msecs_(0u) {
+}
+
+void MultiFrameBuilder::set_waiting_timeout(const uint32_t consecutive_frame_wait_msecs) {
+ consecutive_frame_wait_msecs_ = static_cast<int64_t>(consecutive_frame_wait_msecs);
+ if (consecutive_frame_wait_msecs == 0) {
+ LOG4CXX_WARN(logger_, "Waiting timout disabled");
+ } else {
+ LOG4CXX_DEBUG(logger_, "Waiting time in msec: " << consecutive_frame_wait_msecs_);
+ }
+}
+
+bool MultiFrameBuilder::AddConnection(const ConnectionID connection_id) {
+ LOG4CXX_DEBUG(logger_, "Adding connection_id: " << connection_id);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ const MultiFrameMap::const_iterator it = multiframes_map_.find(connection_id);
+ if (it != multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Exists connection_id: " << connection_id);
+ return false;
+ }
+ multiframes_map_[connection_id] = SessionToFrameMap();
+ return true;
+}
+
+bool MultiFrameBuilder::RemoveConnection(const ConnectionID connection_id) {
+ LOG4CXX_DEBUG(logger_, "Removing connection_id: " << connection_id);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ const MultiFrameMap::iterator it = multiframes_map_.find(connection_id);
+ if (it == multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Non-existent connection_id: " << connection_id);
+ return false;
+ }
+ const SessionToFrameMap& session_to_frame_map = it->second;
+ if (!session_to_frame_map.empty()) {
+ // FIXME(EZamakhov): Ask ReqManager - do we need to send GenericError
+ LOG4CXX_WARN(logger_, "For connection_id: " << connection_id
+ << " waiting: " << multiframes_map_);
+ }
+ multiframes_map_.erase(it);
+ return true;
+}
+
+ProtocolFramePtrList MultiFrameBuilder::PopMultiframes() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ ProtocolFramePtrList outpute_frame_list;
+ for (MultiFrameMap::iterator connection_it = multiframes_map_.begin();
+ connection_it != multiframes_map_.end(); ++connection_it) {
+ LOG4CXX_TRACE(logger_, "Step over connection: " << connection_it->first);
+ SessionToFrameMap& session_map = connection_it->second;
+
+ for (SessionToFrameMap::iterator session_it = session_map.begin();
+ session_it != session_map.end(); ++session_it) {
+ LOG4CXX_TRACE(logger_, "Step over session: "
+ << static_cast<int>(session_it->first));
+ MessageIDToFrameMap& messageId_map = session_it->second;
+
+ MessageIDToFrameMap::iterator messageId_it = messageId_map.begin();
+ while (messageId_it != messageId_map.end()) {
+ LOG4CXX_TRACE(logger_, "Step over messageId: " << messageId_it->first);
+ ProtocolFrameData& frame_data = messageId_it->second;
+ ProtocolFramePtr frame = frame_data.frame;
+
+ if (frame &&
+ frame->frame_data() == FRAME_DATA_LAST_CONSECUTIVE &&
+ frame->payload_size() > 0u ) {
+ LOG4CXX_DEBUG(logger_, "Ready frame: " << frame);
+ outpute_frame_list.push_back(frame);
+ messageId_map.erase(messageId_it++);
+ continue;
+ }
+ if (consecutive_frame_wait_msecs_ != 0) {
+ LOG4CXX_TRACE(logger_, "Expiration verification");
+ const int64_t time_left =
+ date_time::DateTime::calculateTimeSpan(frame_data.append_time);
+ LOG4CXX_DEBUG(logger_, "mSecs left: " << time_left);
+ if (time_left >= consecutive_frame_wait_msecs_) {
+ LOG4CXX_WARN(logger_, "Expired frame: " << frame);
+ outpute_frame_list.push_back(frame);
+ messageId_map.erase(messageId_it++);
+ continue;
+ }
+ }
+ ++messageId_it;
+ } // iteration over messageId_map
+ } // iteration over session_map
+ } // iteration over multiframes_map_
+ LOG4CXX_DEBUG(logger_, "Result frames count: " << outpute_frame_list.size());
+ return outpute_frame_list;
+}
+
+RESULT_CODE MultiFrameBuilder::AddFrame(const ProtocolFramePtr packet) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "Handling frame: " << packet);
+ LOG4CXX_DEBUG(logger_, "Current state is: " << multiframes_map_);
+ if (!packet) {
+ LOG4CXX_ERROR(logger_, "Skip empty frame");
+ return RESULT_FAIL;
+ }
+ switch (packet->frame_type()) {
+ case FRAME_TYPE_FIRST:
+ LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST");
+ return HandleFirstFrame(packet);
+ case FRAME_TYPE_CONSECUTIVE:
+ LOG4CXX_TRACE(logger_, "FRAME_TYPE_CONSECUTIVE");
+ return HandleConsecutiveFrame(packet);
+ default:
+ LOG4CXX_ERROR(logger_, "Frame is not FIRST or CONSECUTIVE :"
+ << packet);
+ break;
+ }
+ return RESULT_FAIL;
+}
+
+RESULT_CODE MultiFrameBuilder::HandleFirstFrame(const ProtocolFramePtr packet) {
+ DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_FIRST,
+ RESULT_FAIL);
+ LOG4CXX_DEBUG(logger_, "Waiting : " << multiframes_map_);
+ LOG4CXX_DEBUG(logger_, "Handling FIRST frame: " << packet);
+ if (packet->payload_size() != 0u) {
+ LOG4CXX_ERROR(logger_,
+ "First frame shall have no data:" << packet);
+ return RESULT_FAIL;
+ }
+
+ const ConnectionID connection_id = packet->connection_id();
+ MultiFrameMap::iterator connection_it = multiframes_map_.find(connection_id);
+ if (connection_it == multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Unknown connection_id: " << connection_id);
+ return RESULT_FAIL;
+ }
+ SessionToFrameMap& session_map = connection_it->second;
+
+ const SessionID session_id = packet->session_id();
+ // No need to verify session existance
+ MessageIDToFrameMap& messageId_map = session_map[session_id];
+
+ const MessageID message_id = packet->message_id();
+ MessageIDToFrameMap::iterator messageId_it = messageId_map.find(message_id);
+ if (messageId_it != messageId_map.end()) {
+ LOG4CXX_ERROR(logger_, "Already waiting message for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+ return RESULT_FAIL;
+ }
+
+ LOG4CXX_DEBUG(logger_, "Start waiting frames for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+ messageId_map[message_id] = {packet, date_time::DateTime::getCurrentTime()};
+ return RESULT_OK;
+}
+
+RESULT_CODE MultiFrameBuilder::HandleConsecutiveFrame(const ProtocolFramePtr packet) {
+ DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_CONSECUTIVE,
+ RESULT_FAIL);
+ LOG4CXX_DEBUG(logger_, "Handling CONSECUTIVE frame: " << packet);
+
+
+ const ConnectionID connection_id = packet->connection_id();
+ MultiFrameMap::iterator connection_it = multiframes_map_.find(connection_id);
+ if (connection_it == multiframes_map_.end()) {
+ LOG4CXX_ERROR(logger_, "Unknown connection_id: " << connection_id);
+ return RESULT_FAIL;
+ }
+ SessionToFrameMap& session_map = connection_it->second;
+
+ const SessionID session_id = packet->session_id();
+ // No need to verify session existance
+ MessageIDToFrameMap& messageId_map = session_map[session_id];
+
+ const MessageID message_id = packet->message_id();
+ MessageIDToFrameMap::iterator messageId_it = messageId_map.find(message_id);
+ if (messageId_it == messageId_map.end()) {
+ LOG4CXX_ERROR(logger_, "No waiting message for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+ return RESULT_FAIL;
+ }
+
+ ProtocolFrameData& frame_data = messageId_it->second;
+ ProtocolFramePtr assembling_frame = frame_data.frame;
+ DCHECK_OR_RETURN(packet->message_id() == assembling_frame->message_id(),
+ RESULT_FAIL);
+
+ const uint8_t new_frame_data = packet->frame_data();
+ const bool is_last_consecutive = (new_frame_data ==
+ FRAME_DATA_LAST_CONSECUTIVE);
+
+ if (is_last_consecutive) {
+ // TODO(EZamakhov): implement count of frames and result size verification
+ LOG4CXX_DEBUG(logger_, "Last CONSECUTIVE frame");
+ } else {
+ uint8_t previous_frame_data = assembling_frame->frame_data();
+ if (previous_frame_data == std::numeric_limits<uint8_t>::max()) {
+ previous_frame_data = 0u;
+ }
+ // The next frame data is bigger at 1
+ if (new_frame_data != (previous_frame_data + 1)) {
+ LOG4CXX_ERROR(logger_,
+ "Unexpected CONSECUTIVE frame for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id
+ << ", frame: " << packet);
+ return RESULT_FAIL;
+ }
+ }
+
+ assembling_frame->set_frame_data(new_frame_data);
+
+ LOG4CXX_DEBUG(logger_,
+ "Appending " << packet->data_size() << " bytes "
+ << "; frame_data " << static_cast<int>(new_frame_data)
+ << "; for connection_id: " << connection_id
+ << ", session_id: " << static_cast<int>(session_id)
+ << ", message_id: " << message_id);
+
+ if (assembling_frame->appendData(packet->data(),
+ packet->data_size()) != RESULT_OK) {
+ LOG4CXX_ERROR(logger_, "Failed to append frame for multiframe message.");
+ return RESULT_FAIL;
+ }
+ frame_data.append_time = date_time::DateTime::getCurrentTime();
+ return RESULT_OK;
+}
+
+} // namespace protocol_handler
diff --git a/src/components/protocol_handler/src/protocol_handler_impl.cc b/src/components/protocol_handler/src/protocol_handler_impl.cc
index dccfffdf04..b3fb337703 100644
--- a/src/components/protocol_handler/src/protocol_handler_impl.cc
+++ b/src/components/protocol_handler/src/protocol_handler_impl.cc
@@ -63,11 +63,12 @@ uint8_t SupportedSDLProtocolVersion();
const size_t kStackSize = 32768;
-ProtocolHandlerImpl::ProtocolHandlerImpl(
- transport_manager::TransportManager *transport_manager_param,
- size_t message_frequency_time, size_t message_frequency_count,
- bool malformed_message_filtering,
- size_t malformed_message_frequency_time, size_t malformed_message_frequency_count)
+ProtocolHandlerImpl::ProtocolHandlerImpl(transport_manager::TransportManager *transport_manager_param,
+ size_t message_frequency_time, size_t message_frequency_count,
+ bool malformed_message_filtering,
+ size_t malformed_message_frequency_time,
+ size_t malformed_message_frequency_count,
+ uint32_t multiframe_waiting_timeout)
: protocol_observers_(),
session_observer_(0),
transport_manager_(transport_manager_param),
@@ -117,6 +118,7 @@ ProtocolHandlerImpl::ProtocolHandlerImpl(
LOG4CXX_WARN(logger_, "Malformed message filtering is disabled."
<< "Connection will be close on first malformed message detection");
}
+ multiframe_builder_.set_waiting_timeout(multiframe_waiting_timeout);
}
ProtocolHandlerImpl::~ProtocolHandlerImpl() {
@@ -502,6 +504,11 @@ void ProtocolHandlerImpl::OnTMMessageReceiveFailed(
void ProtocolHandlerImpl::NotifySubscribers(const RawMessagePtr message) {
LOG4CXX_AUTO_TRACE(logger_);
sync_primitives::AutoLock lock(protocol_observers_lock_);
+ if (protocol_observers_.empty()) {
+ LOG4CXX_ERROR(
+ logger_,
+ "Cannot handle multiframe message: no IProtocolObserver is set.");
+ }
for (ProtocolObservers::iterator it = protocol_observers_.begin();
protocol_observers_.end() != it; ++it) {
(*it)->OnMessageReceived(message);
@@ -570,6 +577,7 @@ void ProtocolHandlerImpl::OnConnectionEstablished(
const transport_manager::DeviceInfo &device_info,
const transport_manager::ConnectionUID &connection_id) {
incoming_data_handler_.AddConnection(connection_id);
+ multiframe_builder_.AddConnection(connection_id);
}
void ProtocolHandlerImpl::OnConnectionClosed(
@@ -577,6 +585,7 @@ void ProtocolHandlerImpl::OnConnectionClosed(
incoming_data_handler_.RemoveConnection(connection_id);
message_meter_.ClearIdentifiers();
malformed_message_meter_.ClearIdentifiers();
+ multiframe_builder_.RemoveConnection(connection_id);
}
RESULT_CODE ProtocolHandlerImpl::SendFrame(const ProtocolFramePtr packet) {
@@ -705,21 +714,20 @@ RESULT_CODE ProtocolHandlerImpl::SendMultiFrameMessage(
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleMessage(ConnectionID connection_id,
- const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleMessage(const ProtocolFramePtr packet) {
DCHECK_OR_RETURN(packet, RESULT_UNKNOWN);
LOG4CXX_DEBUG(logger_, "Handling message " << packet);
switch (packet->frame_type()) {
case FRAME_TYPE_CONTROL:
LOG4CXX_TRACE(logger_, "FRAME_TYPE_CONTROL");
- return HandleControlMessage(connection_id, packet);
+ return HandleControlMessage(packet);
case FRAME_TYPE_SINGLE:
LOG4CXX_TRACE(logger_, "FRAME_TYPE_SINGLE");
- return HandleSingleFrameMessage(connection_id, packet);
+ return HandleSingleFrameMessage(packet);
case FRAME_TYPE_FIRST:
case FRAME_TYPE_CONSECUTIVE:
LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST or FRAME_TYPE_CONSECUTIVE");
- return HandleMultiFrameMessage(connection_id, packet);
+ return HandleMultiFrameMessage(packet);
default: {
LOG4CXX_WARN(logger_, "Unknown frame type"
<< packet->frame_type());
@@ -729,8 +737,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleMessage(ConnectionID connection_id,
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(
- ConnectionID connection_id, const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(const ProtocolFramePtr packet) {
LOG4CXX_AUTO_TRACE(logger_);
LOG4CXX_DEBUG(logger_,
@@ -745,7 +752,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(
}
const uint32_t connection_key =
- session_observer_->KeyFromPair(connection_id, packet->session_id());
+ session_observer_->KeyFromPair(packet->connection_id(), packet->session_id());
const RawMessagePtr rawMessage(
new RawMessage(connection_key,
@@ -773,8 +780,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage(
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(
- ConnectionID connection_id, const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(const ProtocolFramePtr packet) {
LOG4CXX_AUTO_TRACE(logger_);
if (!session_observer_) {
@@ -782,100 +788,14 @@ RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage(
return RESULT_FAIL;
}
- const uint32_t key = session_observer_->KeyFromPair(connection_id,
- packet->session_id());
- LOG4CXX_DEBUG(
- logger_,
- "Packet " << packet << "; session id " << static_cast<int32_t>(key));
-
- if (packet->frame_type() == FRAME_TYPE_FIRST) {
- LOG4CXX_TRACE(logger_, "FRAME_TYPE_FIRST");
- // First frame has no data
- DCHECK_OR_RETURN(packet->frame_data() == 0u, RESULT_FAIL);
- // We can not handle more than one multiframe with the same key
- DCHECK_OR_RETURN(incomplete_multi_frame_messages_.count(key) == 0,
- RESULT_FAIL);
- incomplete_multi_frame_messages_[key] = packet;
- return RESULT_OK;
+ if (multiframe_builder_.AddFrame(packet) != RESULT_OK) {
+ LOG4CXX_WARN(logger_, "Frame assembling issue");
}
- DCHECK_OR_RETURN(packet->frame_type() == FRAME_TYPE_CONSECUTIVE, RESULT_FAIL)
-
- MultiFrameMap::iterator it = incomplete_multi_frame_messages_.find(key);
- if (it == incomplete_multi_frame_messages_.end()) {
- LOG4CXX_ERROR(logger_,
- "Frame of multiframe message for non-existing session id " << key);
- return RESULT_FAIL;
- }
-
- ProtocolFramePtr& assembling_frame = it->second;
- const uint8_t previous_frame_data = assembling_frame->frame_data();
- const uint8_t new_frame_data = packet->frame_data();
-
- const bool is_last_consecutive = (new_frame_data == FRAME_DATA_LAST_CONSECUTIVE);
- // The next frame data is bigger at 1
- DCHECK_OR_RETURN((new_frame_data == (previous_frame_data + 1)) ||
- // except the last consecutive frame
- is_last_consecutive, RESULT_FAIL);
-
- assembling_frame->set_frame_data(new_frame_data);
-
- LOG4CXX_DEBUG(logger_,
- "Appending " << packet->data_size() << " bytes "
- << "; frame_data " << static_cast<int>(new_frame_data)
- << "; connection key " << key);
-
- DCHECK_OR_RETURN(packet->message_id() == assembling_frame->message_id(), RESULT_FAIL);
- if (assembling_frame->appendData(packet->data(), packet->data_size()) != RESULT_OK) {
- LOG4CXX_ERROR(logger_, "Failed to append frame for multiframe message.");
- return RESULT_FAIL;
- }
-
- if (is_last_consecutive) {
- LOG4CXX_DEBUG(
- logger_,
- "Last frame of multiframe message size " << packet->data_size()
- << "; connection key " << key);
- {
- sync_primitives::AutoLock lock(protocol_observers_lock_);
- if (protocol_observers_.empty()) {
- LOG4CXX_ERROR(
- logger_,
- "Cannot handle multiframe message: no IProtocolObserver is set.");
- return RESULT_FAIL;
- }
- }
- const uint32_t connection_key =
- session_observer_->KeyFromPair(connection_id,
- assembling_frame->session_id());
- LOG4CXX_DEBUG(logger_, "Result frame" << assembling_frame <<
- "for connection "<< connection_key);
- const RawMessagePtr rawMessage(
- new RawMessage(connection_key,
- assembling_frame->protocol_version(),
- assembling_frame->data(),
- assembling_frame->total_data_bytes(),
- assembling_frame->service_type(),
- assembling_frame->payload_size()));
- DCHECK_OR_RETURN(rawMessage, RESULT_FAIL);
-
-#ifdef TIME_TESTER
- if (metric_observer_) {
- PHMetricObserver::MessageMetric *metric =
- new PHMetricObserver::MessageMetric();
- metric->raw_msg = rawMessage;
- metric_observer_->EndMessageProcess(metric);
- }
-#endif // TIME_TESTER
- // TODO(EZamakhov): check service in session
- NotifySubscribers(rawMessage);
- incomplete_multi_frame_messages_.erase(it);
- }
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(
- ConnectionID connection_id, const ProtocolFramePtr packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(const ProtocolFramePtr packet) {
LOG4CXX_AUTO_TRACE(logger_);
if (!session_observer_) {
@@ -885,17 +805,16 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessage(
switch (packet->frame_data()) {
case FRAME_DATA_START_SERVICE:
- return HandleControlMessageStartSession(connection_id, *(packet.get()));
+ return HandleControlMessageStartSession(*packet);
case FRAME_DATA_END_SERVICE:
- return HandleControlMessageEndSession(connection_id, *(packet.get()));
+ return HandleControlMessageEndSession(*packet);
case FRAME_DATA_HEART_BEAT: {
- LOG4CXX_DEBUG(logger_,
- "Received heart beat for connection " << connection_id);
- return HandleControlMessageHeartBeat(connection_id, *(packet.get()));
+ LOG4CXX_TRACE(logger_, "FRAME_DATA_HEART_BEAT");
+ return HandleControlMessageHeartBeat(*packet);
}
case FRAME_DATA_HEART_BEAT_ACK: {
LOG4CXX_DEBUG(logger_, "Received heart beat ack from mobile app"
- " for connection " << connection_id);
+ " for connection " << packet->connection_id());
return RESULT_OK;
}
default:
@@ -922,14 +841,14 @@ uint32_t get_hash_id(const ProtocolPacket &packet) {
return hash_le == HASH_ID_NOT_SUPPORTED ? HASH_ID_WRONG : hash_le;
}
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessageEndSession(
- ConnectionID connection_id, const ProtocolPacket &packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessageEndSession(const ProtocolPacket &packet) {
LOG4CXX_AUTO_TRACE(logger_);
const uint8_t current_session_id = packet.session_id();
const uint32_t hash_id = get_hash_id(packet);
const ServiceType service_type = ServiceTypeFromByte(packet.service_type());
+ const ConnectionID connection_id = packet.connection_id();
const uint32_t session_key = session_observer_->OnSessionEndedCallback(
connection_id, current_session_id, hash_id, service_type);
@@ -1025,8 +944,7 @@ class StartSessionHandler : public security_manager::SecurityManagerListener {
} // namespace
#endif // ENABLE_SECURITY
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(
- ConnectionID connection_id, const ProtocolPacket &packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(const ProtocolPacket &packet) {
LOG4CXX_DEBUG(logger_,
"Protocol version:" <<
static_cast<int>(packet.protocol_version()));
@@ -1043,6 +961,7 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(
DCHECK(session_observer_);
uint32_t hash_id;
+ const ConnectionID connection_id = packet.connection_id();
const uint32_t session_id = session_observer_->OnSessionStartedCallback(
connection_id, packet.session_id(), service_type, protection, &hash_id);
@@ -1100,8 +1019,8 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageStartSession(
return RESULT_OK;
}
-RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(
- ConnectionID connection_id, const ProtocolPacket &packet) {
+RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(const ProtocolPacket &packet) {
+ const ConnectionID connection_id = packet.connection_id();
LOG4CXX_DEBUG(
logger_,
"Sending heart beat acknowledgment for connection " << connection_id);
@@ -1124,6 +1043,41 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat(
}
}
+void ProtocolHandlerImpl::PopValideAndExpirateMultiframes() {
+ const ProtocolFramePtrList& frame_list = multiframe_builder_.PopMultiframes();
+ for (ProtocolFramePtrList::const_iterator it = frame_list.begin();
+ it != frame_list.end(); ++it) {
+ const ProtocolFramePtr frame = *it;
+ DCHECK(frame);
+ if(!frame) {
+ continue;
+ }
+
+ const uint32_t connection_key =
+ session_observer_->KeyFromPair(frame->connection_id(), frame->session_id());
+ LOG4CXX_DEBUG(logger_, "Result frame" << frame <<
+ "for connection "<< connection_key);
+ const RawMessagePtr rawMessage(
+ new RawMessage(connection_key,
+ frame->protocol_version(),
+ frame->data(),
+ frame->total_data_bytes(),
+ frame->service_type(),
+ frame->payload_size()));
+ DCHECK(rawMessage);
+
+#ifdef TIME_TESTER
+ if (metric_observer_) {
+ PHMetricObserver::MessageMetric *metric =
+ new PHMetricObserver::MessageMetric();
+ metric->raw_msg = rawMessage;
+ metric_observer_->EndMessageProcess(metric);
+ }
+#endif // TIME_TESTER
+ NotifySubscribers(rawMessage);
+ }
+}
+
bool ProtocolHandlerImpl::TrackMessage(const uint32_t& connection_key) {
LOG4CXX_AUTO_TRACE(logger_);
if (message_frequency_time_ > 0u &&
@@ -1203,7 +1157,8 @@ void ProtocolHandlerImpl::Handle(
FRAME_TYPE_CONTROL == message->frame_type() ||
FRAME_TYPE_FIRST == message->frame_type()) {
LOG4CXX_DEBUG(logger_, "Packet: dataSize " << message->data_size());
- HandleMessage(message->connection_id(), message);
+ HandleMessage(message);
+ PopValideAndExpirateMultiframes();
} else {
LOG4CXX_WARN(logger_,
"handleMessagesFromMobileApp() - incorrect or NULL data");
diff --git a/src/components/protocol_handler/src/protocol_packet.cc b/src/components/protocol_handler/src/protocol_packet.cc
index 1ed6e05699..8675d7543e 100644
--- a/src/components/protocol_handler/src/protocol_packet.cc
+++ b/src/components/protocol_handler/src/protocol_packet.cc
@@ -33,6 +33,7 @@
#include <stdint.h>
#include <memory.h>
#include <new>
+#include <memory>
#include <cstring>
#include <limits>
@@ -247,7 +248,7 @@ RESULT_CODE ProtocolPacket::ProtocolHeaderValidator::validate(
}
ProtocolPacket::ProtocolPacket()
- : payload_size_(0), connection_id_(0) {
+ : payload_size_(0u), connection_id_(0u) {
}
ProtocolPacket::ProtocolPacket(ConnectionID connection_id,
@@ -382,7 +383,7 @@ RESULT_CODE ProtocolPacket::deserializePacket(
dataPayloadSize = messageSize - offset;
}
- uint8_t *data = NULL;
+ uint8_t* data = NULL;
if (dataPayloadSize) {
data = new (std::nothrow) uint8_t[dataPayloadSize];
if (!data) {
@@ -401,6 +402,7 @@ RESULT_CODE ProtocolPacket::deserializePacket(
total_data_bytes |= data[3];
set_total_data_bytes(total_data_bytes);
if (0 == packet_data_.data) {
+ delete[] data;
return RESULT_FAIL;
}
} else {
@@ -482,7 +484,7 @@ uint32_t ProtocolPacket::total_data_bytes() const {
return packet_data_.totalDataBytes;
}
-uint8_t ProtocolPacket::connection_id() const {
+ConnectionID ProtocolPacket::connection_id() const {
return connection_id_;
}