diff options
Diffstat (limited to 'src/components/protocol_handler/src/incoming_data_handler.cc')
-rw-r--r-- | src/components/protocol_handler/src/incoming_data_handler.cc | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/src/components/protocol_handler/src/incoming_data_handler.cc b/src/components/protocol_handler/src/incoming_data_handler.cc new file mode 100644 index 0000000000..0baab6d802 --- /dev/null +++ b/src/components/protocol_handler/src/incoming_data_handler.cc @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2014, Ford Motor Company + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#include "protocol_handler/incoming_data_handler.h" +#include "utils/logger.h" +#include "protocol/common.h" + +namespace protocol_handler { + +CREATE_LOGGERPTR_GLOBAL(logger_, "ProtocolHandler") + +IncomingDataHandler::IncomingDataHandler() + : header_(), validator_(NULL) {} + +void IncomingDataHandler::set_validator( + const ProtocolPacket::ProtocolHeaderValidator* const validator) { + validator_ = validator; +} + +static const size_t MIN_HEADER_SIZE = std::min(PROTOCOL_HEADER_V1_SIZE, PROTOCOL_HEADER_V2_SIZE); + +std::list<ProtocolFramePtr> IncomingDataHandler::ProcessData(const RawMessage& tm_message, + RESULT_CODE* result) { + LOG4CXX_AUTO_TRACE(logger_); + const transport_manager::ConnectionUID connection_id = tm_message.connection_key(); + const uint8_t* data = tm_message.data(); + const size_t tm_message_size = tm_message.data_size(); + if (tm_message_size == 0 || data == NULL) { + LOG4CXX_WARN(logger_, "Wrong raw message " << tm_message_size << " bytes"); + if (result) { + *result = RESULT_FAIL; + } + return std::list<ProtocolFramePtr>(); + } + LOG4CXX_DEBUG(logger_, "Processing incoming data of size " + << tm_message_size << " for connection " << connection_id); + ConnectionsDataMap::iterator it = connections_data_.find(connection_id); + if (connections_data_.end() == it) { + LOG4CXX_WARN(logger_, "ProcessData requested for unknown connection"); + if (result) { + *result = RESULT_FAIL; + } + return std::list<ProtocolFramePtr>(); + } + std::vector<uint8_t>& connection_data = it->second; + connection_data.insert(connection_data.end(), data, data + tm_message_size); + LOG4CXX_DEBUG(logger_, "Total data size for connection " + << connection_id << " is " << connection_data.size()); + std::list<ProtocolFramePtr> out_frames; + while (connection_data.size() >= MIN_HEADER_SIZE) { + const RESULT_CODE frame_creation_result = + CreateFrame(connection_data, out_frames, connection_id); + + if (RESULT_DEFERRED == frame_creation_result) { + LOG4CXX_DEBUG(logger_, "Wait next portion of data"); + break; + } + if (RESULT_OK != frame_creation_result) { + LOG4CXX_WARN(logger_, "Packet could not be parsed from data stream"); + // TODO(EZamakhov): add to malformed messages counter + connection_data.clear(); + if (result) { + *result = frame_creation_result; + } + return out_frames; + } + LOG4CXX_DEBUG(logger_, + "Packet created and passed, new data size for connection " + << connection_id << " is " << connection_data.size()); + } + if (result) { + *result = RESULT_OK; + } + return out_frames; +} + +void IncomingDataHandler::AddConnection( + const transport_manager::ConnectionUID connection_id) { + // Add empty list of session to new connection + connections_data_[connection_id] = ConnectionsDataMap::mapped_type(); +} + +void IncomingDataHandler::RemoveConnection( + const transport_manager::ConnectionUID connection_id) { + connections_data_.erase(connection_id); +} + +uint32_t IncomingDataHandler::GetPacketSize( + const ProtocolPacket::ProtocolHeader& header) { + switch (header.version) { + case PROTOCOL_VERSION_1: + return header.dataSize + PROTOCOL_HEADER_V1_SIZE; + case PROTOCOL_VERSION_2: + case PROTOCOL_VERSION_3: + return header.dataSize + PROTOCOL_HEADER_V2_SIZE; + default: + LOG4CXX_WARN(logger_, "Unknown version"); + break; + } + return 0u; +} + +RESULT_CODE IncomingDataHandler::CreateFrame(std::vector<uint8_t>& incoming_data, + std::list<ProtocolFramePtr>& out_frames, + const transport_manager::ConnectionUID connection_id) { + LOG4CXX_AUTO_TRACE(logger_); + if (incoming_data.size() >= MIN_HEADER_SIZE) { + header_.deserialize(incoming_data.data(), incoming_data.size()); + const RESULT_CODE validate_result = + validator_ ? validator_->validate(header_) : RESULT_OK; + if (validate_result != RESULT_OK) { + LOG4CXX_WARN(logger_, "Packet validation failed with error " << validate_result); + return validate_result; + } + LOG4CXX_DEBUG(logger_, "Packet size " << header_.dataSize); + const uint32_t packet_size = GetPacketSize(header_); + if (packet_size <= 0) { + LOG4CXX_WARN(logger_, "Null packet size"); + return RESULT_FAIL; + } + if (incoming_data.size() < packet_size) { + LOG4CXX_DEBUG(logger_, "Packet data is not available yet"); + return RESULT_DEFERRED; + } + ProtocolFramePtr frame(new protocol_handler::ProtocolPacket(connection_id)); + const RESULT_CODE deserialize_result = + frame->deserializePacket(&incoming_data[0], packet_size); + if (deserialize_result != RESULT_OK) { + LOG4CXX_WARN(logger_, "Packet deserialization failed with error " << deserialize_result); + return deserialize_result; + } + out_frames.push_back(frame); + incoming_data.erase(incoming_data.begin(), incoming_data.begin() + packet_size); + } + return RESULT_OK; +} +} // namespace protocol_handler |