diff options
Diffstat (limited to 'src/components/protocol_handler/src/protocol_handler_impl.cc')
-rw-r--r-- | src/components/protocol_handler/src/protocol_handler_impl.cc | 286 |
1 files changed, 100 insertions, 186 deletions
diff --git a/src/components/protocol_handler/src/protocol_handler_impl.cc b/src/components/protocol_handler/src/protocol_handler_impl.cc index 454287595a..66600a3f2c 100644 --- a/src/components/protocol_handler/src/protocol_handler_impl.cc +++ b/src/components/protocol_handler/src/protocol_handler_impl.cc @@ -56,106 +56,14 @@ std::string ConvertPacketDataToString(const uint8_t *data, const size_t kStackSize = 32768; -class ProtocolHandlerImpl::IncomingDataHandler { - public: - IncomingDataHandler() : connections_data_() {} - - bool ProcessData(const RawMessagePtr tm_message, - std::vector<ProtocolFramePtr> *out_frames) { - DCHECK(tm_message); - DCHECK(out_frames != NULL); - const ConnectionID connection_id = tm_message->connection_key(); - const uint8_t *data = tm_message->data(); - const size_t size = tm_message->data_size(); - DCHECK(size > 0); DCHECK(data != NULL); - LOG4CXX_TRACE(logger_, "Start of processing incoming data of size " - << size << " for connection " << connection_id); - const uint32_t kBytesForSizeDetection = 8; - ConnectionsData::iterator it = connections_data_.find(connection_id); - if (connections_data_.end() == it) { - LOG4CXX_ERROR(logger_, "ProcessData requested for unknown connection"); - return false; - } - std::vector<uint8_t> &connection_data = it->second; - connection_data.insert(connection_data.end(), data, data + size); - - LOG4CXX_TRACE(logger_, "Total data size for connection " - << connection_id << " is " - << connection_data.size()); - while (connection_data.size() >= kBytesForSizeDetection) { - const uint32_t packet_size = GetPacketSize(&connection_data[0]); - if (0 == packet_size) { - LOG4CXX_ERROR(logger_, "Failed to get packet size"); - return false; - } - LOG4CXX_TRACE(logger_, "Packet size " << packet_size); - if (connection_data.size() >= packet_size) { - ProtocolFramePtr frame(new protocol_handler::ProtocolPacket( - connection_id, &connection_data[0], packet_size)); - out_frames->push_back(frame); - connection_data.erase(connection_data.begin(), - connection_data.begin() + packet_size); - LOG4CXX_TRACE(logger_, - "Packet created and passed, new data size for connection " - << connection_id << " is " << connection_data.size()); - } else { - LOG4CXX_TRACE(logger_, "Packet data is not available yet"); - return true; - } - } - return true; - } - - void AddConnection(ConnectionID connection_id) { - // Add empty list of session to new connection - connections_data_[connection_id] = std::vector<uint8_t>(); - } - - void RemoveConnection(ConnectionID connection_id) { - connections_data_.erase(connection_id); - } - - private: - /** - * @brief Returns size of frame to be formed from raw bytes. - * expects first bytes of message which will be treated as frame header. - */ - uint32_t GetPacketSize(unsigned char *received_bytes) { - DCHECK(received_bytes != NULL); - unsigned char offset = sizeof(uint32_t); - unsigned char version = received_bytes[0] >> 4u; - uint32_t frame_body_size = received_bytes[offset++] << 24u; - frame_body_size |= received_bytes[offset++] << 16u; - frame_body_size |= received_bytes[offset++] << 8u; - frame_body_size |= received_bytes[offset++]; - - uint32_t required_size = frame_body_size; - switch (version) { - case PROTOCOL_VERSION_1: - required_size += PROTOCOL_HEADER_V1_SIZE; - break; - case PROTOCOL_VERSION_3: - case PROTOCOL_VERSION_2: - required_size += PROTOCOL_HEADER_V2_SIZE; - break; - default: - LOG4CXX_ERROR(logger_, "Unknown protocol version."); - return 0; - } - return required_size; - } - - typedef std::map<ConnectionID, std::vector<uint8_t> > ConnectionsData; - ConnectionsData connections_data_; -}; - ProtocolHandlerImpl::ProtocolHandlerImpl( - transport_manager::TransportManager *transport_manager_param) + transport_manager::TransportManager *transport_manager_param, + size_t message_frequency_time, size_t message_frequency_count) : protocol_observers_(), session_observer_(0), transport_manager_(transport_manager_param), kPeriodForNaviAck(5), - incoming_data_handler_(new IncomingDataHandler), + message_max_frequency_(0), #ifdef ENABLE_SECURITY security_manager_(NULL), #endif // ENABLE_SECURITY @@ -168,9 +76,23 @@ ProtocolHandlerImpl::ProtocolHandlerImpl( #endif // TIME_TESTER { - LOG4CXX_TRACE_ENTER(logger_); - - LOG4CXX_TRACE_EXIT(logger_); + LOG4CXX_AUTO_TRACE(logger_); + protocol_header_validator_.set_max_payload_size(profile::Profile::instance()->maximum_payload_size()); + incoming_data_handler_.set_validator(&protocol_header_validator_); + const size_t time_range_msecs = message_frequency_time; + message_meter_.set_time_range(time_range_msecs); + if (time_range_msecs > 0) { + message_max_frequency_ = message_frequency_count; + if (message_max_frequency_ > 0) { + LOG4CXX_DEBUG(logger_, "Frequency meter is enabled ( " << message_max_frequency_ + << " per " << time_range_msecs << " mSecond)"); + } else { + LOG4CXX_WARN(logger_, "Invalid massage frequency value. MessageMeter will be disabled"); + message_meter_.set_time_range(0u); + } + } else { + LOG4CXX_WARN(logger_, "Frequency meter is disabled"); + } } ProtocolHandlerImpl::~ProtocolHandlerImpl() { @@ -191,15 +113,13 @@ void ProtocolHandlerImpl::AddProtocolObserver(ProtocolObserver *observer) { } void ProtocolHandlerImpl::RemoveProtocolObserver(ProtocolObserver* observer) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); if (!observer) { LOG4CXX_ERROR(logger_, "Invalid (NULL) pointer to IProtocolObserver."); - LOG4CXX_TRACE_EXIT(logger_); return; } sync_primitives::AutoLock lock(protocol_observers_lock_); protocol_observers_.erase(observer); - LOG4CXX_TRACE_EXIT(logger_); } void ProtocolHandlerImpl::set_session_observer(SessionObserver *observer) { @@ -233,10 +153,9 @@ void ProtocolHandlerImpl::SendStartSessionAck(ConnectionID connection_id, uint32_t hash_id, uint8_t service_type, bool protection) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); uint8_t protocolVersion; - if (0 == profile::Profile::instance()->heart_beat_timeout()) { protocolVersion = PROTOCOL_VERSION_2; LOG4CXX_INFO(logger_, "Heart beat timeout == 0 => SET PROTOCOL_VERSION_2"); @@ -260,14 +179,13 @@ void ProtocolHandlerImpl::SendStartSessionAck(ConnectionID connection_id, << " for service_type " << static_cast<int32_t>(service_type) << " session_id " << static_cast<int32_t>(session_id) << " protection " << (protection ? "ON" : "OFF")); - LOG4CXX_TRACE_EXIT(logger_); } void ProtocolHandlerImpl::SendStartSessionNAck(ConnectionID connection_id, uint8_t session_id, uint8_t protocol_version, uint8_t service_type) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); ProtocolFramePtr ptr(new protocol_handler::ProtocolPacket(connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_CONTROL, @@ -281,14 +199,13 @@ void ProtocolHandlerImpl::SendStartSessionNAck(ConnectionID connection_id, "SendStartSessionNAck() for connection " << connection_id << " for service_type " << static_cast<int32_t>(service_type) << " session_id " << static_cast<int32_t>(session_id)); - LOG4CXX_TRACE_EXIT(logger_); } void ProtocolHandlerImpl::SendEndSessionNAck(ConnectionID connection_id, uint32_t session_id, uint8_t protocol_version, uint8_t service_type) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); ProtocolFramePtr ptr(new protocol_handler::ProtocolPacket(connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_CONTROL, @@ -301,14 +218,13 @@ void ProtocolHandlerImpl::SendEndSessionNAck(ConnectionID connection_id, LOG4CXX_INFO(logger_, "SendEndSessionNAck() for connection " << connection_id << " for service_type " << static_cast<int32_t>(service_type) << " session_id " << static_cast<int32_t>(session_id)); - LOG4CXX_TRACE_EXIT(logger_); } void ProtocolHandlerImpl::SendEndSessionAck(ConnectionID connection_id, uint8_t session_id, uint8_t protocol_version, uint8_t service_type) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); ProtocolFramePtr ptr(new protocol_handler::ProtocolPacket(connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_CONTROL, @@ -322,12 +238,11 @@ void ProtocolHandlerImpl::SendEndSessionAck(ConnectionID connection_id, "SendEndSessionAck() for connection " << connection_id << " for service_type " << static_cast<int32_t>(service_type) << " session_id " << static_cast<int32_t>(session_id)); - LOG4CXX_TRACE_EXIT(logger_); } void ProtocolHandlerImpl::SendEndSession(int32_t connection_id, uint8_t session_id) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); ProtocolFramePtr ptr(new protocol_handler::ProtocolPacket(connection_id, PROTOCOL_VERSION_3, PROTECTION_OFF, FRAME_TYPE_CONTROL, @@ -340,13 +255,12 @@ void ProtocolHandlerImpl::SendEndSession(int32_t connection_id, LOG4CXX_INFO(logger_, "SendEndSession() for connection " << connection_id << " for service_type " << static_cast<int32_t>(SERVICE_TYPE_RPC) << " session_id " << static_cast<int32_t>(session_id)); - LOG4CXX_TRACE_EXIT(logger_); } RESULT_CODE ProtocolHandlerImpl::SendHeartBeatAck(ConnectionID connection_id, uint8_t session_id, uint32_t message_id) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); ProtocolFramePtr ptr(new protocol_handler::ProtocolPacket(connection_id, PROTOCOL_VERSION_3, PROTECTION_OFF, FRAME_TYPE_CONTROL, @@ -355,14 +269,12 @@ RESULT_CODE ProtocolHandlerImpl::SendHeartBeatAck(ConnectionID connection_id, raw_ford_messages_to_mobile_.PostMessage( impl::RawFordMessageToMobile(ptr, false)); - - LOG4CXX_TRACE_EXIT(logger_); return RESULT_OK; } void ProtocolHandlerImpl::SendHeartBeat(int32_t connection_id, uint8_t session_id) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); ProtocolFramePtr ptr(new protocol_handler::ProtocolPacket(connection_id, PROTOCOL_VERSION_3, PROTECTION_OFF, FRAME_TYPE_CONTROL, @@ -371,8 +283,6 @@ void ProtocolHandlerImpl::SendHeartBeat(int32_t connection_id, raw_ford_messages_to_mobile_.PostMessage( impl::RawFordMessageToMobile(ptr, false)); - - LOG4CXX_TRACE_EXIT(logger_); } void ProtocolHandlerImpl::SendMessageToMobileApp(const RawMessagePtr message, @@ -380,11 +290,10 @@ void ProtocolHandlerImpl::SendMessageToMobileApp(const RawMessagePtr message, #ifdef TIME_TESTER const TimevalStruct start_time = date_time::DateTime::getCurrentTime(); #endif // TIME_TESTER - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); if (!message) { LOG4CXX_ERROR(logger_, "Invalid message for sending to mobile app is received."); - LOG4CXX_TRACE_EXIT(logger_); return; } @@ -460,11 +369,10 @@ void ProtocolHandlerImpl::SendMessageToMobileApp(const RawMessagePtr message, metric_observer_->EndMessageProcess(metric); } #endif - LOG4CXX_TRACE_EXIT(logger_); } void ProtocolHandlerImpl::OnTMMessageReceived(const RawMessagePtr tm_message) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); if (tm_message) { LOG4CXX_INFO(logger_, @@ -475,20 +383,19 @@ void ProtocolHandlerImpl::OnTMMessageReceived(const RawMessagePtr tm_message) { logger_, "Invalid incoming message received in" << " ProtocolHandler from Transport Manager."); - LOG4CXX_TRACE_EXIT(logger_); return; } - std::vector<ProtocolFramePtr> protocol_frames; - const bool ok = - incoming_data_handler_->ProcessData(tm_message, &protocol_frames); - if (!ok) { + RESULT_CODE result; + const std::list<ProtocolFramePtr> protocol_frames = + incoming_data_handler_.ProcessData(*tm_message, &result); + if (result == RESULT_FAIL) { LOG4CXX_ERROR(logger_, "Incoming data processing failed. Terminating connection."); transport_manager_->DisconnectForce(tm_message->connection_key()); } - for (std::vector<ProtocolFramePtr>::const_iterator it = + for (std::list<ProtocolFramePtr>::const_iterator it = protocol_frames.begin(); it != protocol_frames.end(); ++it) { #ifdef TIME_TESTER const TimevalStruct start_time = date_time::DateTime::getCurrentTime(); @@ -510,7 +417,6 @@ void ProtocolHandlerImpl::OnTMMessageReceived(const RawMessagePtr tm_message) { raw_ford_messages_from_mobile_.PostMessage(msg); } - LOG4CXX_TRACE_EXIT(logger_); } void ProtocolHandlerImpl::OnTMMessageReceiveFailed( @@ -520,6 +426,7 @@ void ProtocolHandlerImpl::OnTMMessageReceiveFailed( } void ProtocolHandlerImpl::NotifySubscribers(const RawMessagePtr message) { + LOG4CXX_AUTO_TRACE(logger_); sync_primitives::AutoLock lock(protocol_observers_lock_); for (ProtocolObservers::iterator it = protocol_observers_.begin(); protocol_observers_.end() != it; ++it) { @@ -532,9 +439,6 @@ void ProtocolHandlerImpl::OnTMMessageSend(const RawMessagePtr message) { uint32_t connection_handle = 0; uint8_t sessionID = 0; - const ProtocolPacket sent_message(message->connection_key(), - message->data(), - message->data_size()); session_observer_->PairFromKey(message->connection_key(), &connection_handle, @@ -550,6 +454,13 @@ void ProtocolHandlerImpl::OnTMMessageSend(const RawMessagePtr message) { return; } + ProtocolPacket sent_message(message->connection_key()); + const RESULT_CODE result = sent_message.deserializePacket(message->data(), + message->data_size()); + if (result != RESULT_OK) { + LOG4CXX_ERROR(logger_, "Error while message deserialization."); + return; + } std::map<uint8_t, uint32_t>::iterator it = sessions_last_message_id_.find(sent_message.session_id()); @@ -582,19 +493,19 @@ void ProtocolHandlerImpl::OnTMMessageSendFailed( void ProtocolHandlerImpl::OnConnectionEstablished( const transport_manager::DeviceInfo &device_info, const transport_manager::ConnectionUID &connection_id) { - incoming_data_handler_->AddConnection(connection_id); + incoming_data_handler_.AddConnection(connection_id); } void ProtocolHandlerImpl::OnConnectionClosed( const transport_manager::ConnectionUID &connection_id) { - incoming_data_handler_->RemoveConnection(connection_id); + incoming_data_handler_.RemoveConnection(connection_id); + message_meter_.ClearIdentifiers(); } RESULT_CODE ProtocolHandlerImpl::SendFrame(const ProtocolFramePtr packet) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); if (!packet) { LOG4CXX_ERROR(logger_, "Failed to send empty packet."); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } #ifdef ENABLE_SECURITY @@ -602,7 +513,6 @@ RESULT_CODE ProtocolHandlerImpl::SendFrame(const ProtocolFramePtr packet) { const RESULT_CODE result = EncryptFrame(packet); if (result != RESULT_OK) { LOG4CXX_WARN(logger_, "Error frame encryption. Frame droped."); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } #endif // ENABLE_SECURITY @@ -621,26 +531,22 @@ RESULT_CODE ProtocolHandlerImpl::SendFrame(const ProtocolFramePtr packet) { if (!transport_manager_) { LOG4CXX_WARN(logger_, "No Transport Manager found."); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } if (transport_manager::E_SUCCESS != transport_manager_->SendMessageToDevice(message_to_send)) { LOG4CXX_WARN(logger_, "Can't send message to device"); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; }; - - LOG4CXX_TRACE_EXIT(logger_); return RESULT_OK; } RESULT_CODE ProtocolHandlerImpl::SendSingleFrameMessage( - ConnectionID connection_id, const uint8_t session_id, - uint32_t protocol_version, const uint8_t service_type, - size_t data_size, const uint8_t *data, + const ConnectionID connection_id, const uint8_t session_id, + const uint32_t protocol_version, const uint8_t service_type, + const size_t data_size, const uint8_t *data, const bool is_final_message) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); ProtocolFramePtr ptr(new protocol_handler::ProtocolPacket(connection_id, protocol_version, PROTECTION_OFF, FRAME_TYPE_SINGLE, service_type, FRAME_DATA_SINGLE, @@ -648,17 +554,15 @@ RESULT_CODE ProtocolHandlerImpl::SendSingleFrameMessage( raw_ford_messages_to_mobile_.PostMessage( impl::RawFordMessageToMobile(ptr, is_final_message)); - - LOG4CXX_TRACE_EXIT(logger_); return RESULT_OK; } RESULT_CODE ProtocolHandlerImpl::SendMultiFrameMessage( - ConnectionID connection_id, const uint8_t session_id, - uint32_t protocol_version, const uint8_t service_type, + const ConnectionID connection_id, const uint8_t session_id, + const uint8_t protocol_version, const uint8_t service_type, const size_t data_size, const uint8_t *data, const size_t maxdata_size, const bool is_final_message) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); LOG4CXX_INFO_EXT( logger_, " data size " << data_size << " maxdata_size " << maxdata_size); @@ -720,41 +624,34 @@ RESULT_CODE ProtocolHandlerImpl::SendMultiFrameMessage( raw_ford_messages_to_mobile_.PostMessage( impl::RawFordMessageToMobile(ptr, is_final_packet)); } - LOG4CXX_TRACE_EXIT(logger_); return RESULT_OK; } RESULT_CODE ProtocolHandlerImpl::HandleMessage(ConnectionID connection_id, const ProtocolFramePtr packet) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); switch (packet->frame_type()) { case FRAME_TYPE_CONTROL: LOG4CXX_TRACE(logger_, "handleMessage() - case FRAME_TYPE_CONTROL"); - LOG4CXX_TRACE_EXIT(logger_); return HandleControlMessage(connection_id, packet); case FRAME_TYPE_SINGLE: - LOG4CXX_TRACE_EXIT(logger_); return HandleSingleFrameMessage(connection_id, packet); case FRAME_TYPE_FIRST: case FRAME_TYPE_CONSECUTIVE: LOG4CXX_TRACE(logger_, "handleMessage() - case FRAME_TYPE_CONSECUTIVE"); - LOG4CXX_TRACE_EXIT(logger_); return HandleMultiFrameMessage(connection_id, packet); default: { LOG4CXX_WARN(logger_, "handleMessage() - case unknown frame type" << packet->frame_type()); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } } - - LOG4CXX_TRACE_EXIT(logger_); return RESULT_OK; } RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage( ConnectionID connection_id, const ProtocolFramePtr packet) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); LOG4CXX_INFO(logger_, "FRAME_TYPE_SINGLE message of size " << packet->data_size() << "; message " @@ -764,7 +661,6 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage( LOG4CXX_ERROR(logger_, "Cannot handle message from Transport" << " Manager: ISessionObserver doesn't exist."); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } @@ -779,7 +675,6 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage( packet->service_type(), packet->payload_size())); if (!rawMessage) { - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } #ifdef TIME_TESTER @@ -795,17 +690,15 @@ RESULT_CODE ProtocolHandlerImpl::HandleSingleFrameMessage( // TODO(EZamakhov): check service in session NotifySubscribers(rawMessage); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_OK; } RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage( ConnectionID connection_id, const ProtocolFramePtr packet) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); if (!session_observer_) { LOG4CXX_ERROR(logger_, "No ISessionObserver set."); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } @@ -828,8 +721,6 @@ RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage( if (it == incomplete_multi_frame_messages_.end()) { LOG4CXX_ERROR( logger_, "Frame of multiframe message for non-existing session id"); - - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } @@ -837,8 +728,6 @@ RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage( != RESULT_OK) { LOG4CXX_ERROR(logger_, "Failed to append frame for multiframe message."); - - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } @@ -853,8 +742,6 @@ RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage( LOG4CXX_ERROR( logger_, "Cannot handle multiframe message: no IProtocolObserver is set."); - - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } } @@ -878,7 +765,6 @@ RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage( " payload_size " << completePacket->payload_size()); if (!rawMessage) { - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } @@ -896,18 +782,15 @@ RESULT_CODE ProtocolHandlerImpl::HandleMultiFrameMessage( incomplete_multi_frame_messages_.erase(it); } } - - LOG4CXX_TRACE_EXIT(logger_); return RESULT_OK; } RESULT_CODE ProtocolHandlerImpl::HandleControlMessage( ConnectionID connection_id, const ProtocolFramePtr packet) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); if (!session_observer_) { LOG4CXX_ERROR(logger_, "ISessionObserver is not set."); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_FAIL; } @@ -919,7 +802,6 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessage( case FRAME_DATA_HEART_BEAT: { LOG4CXX_DEBUG(logger_, "Received heart beat for connection " << connection_id); - LOG4CXX_TRACE_EXIT(logger_); return HandleControlMessageHeartBeat(connection_id, *(packet.get())); } case FRAME_DATA_HEART_BEAT_ACK: { @@ -931,7 +813,6 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessage( LOG4CXX_WARN(logger_, "Control message of type " << static_cast<int>(packet->frame_data()) << " ignored"); - LOG4CXX_TRACE_EXIT(logger_); return RESULT_OK; } return RESULT_OK; @@ -1034,6 +915,7 @@ class StartSessionHandler : public security_manager::SecurityManagerListener { delete this; return true; } + private: const uint32_t connection_key_; ProtocolHandlerImpl *protocol_handler_; @@ -1138,26 +1020,53 @@ RESULT_CODE ProtocolHandlerImpl::HandleControlMessageHeartBeat( return RESULT_HEARTBEAT_IS_NOT_SUPPORTED; } +bool ProtocolHandlerImpl::TrackMessage(const uint32_t& connection_key) { + LOG4CXX_AUTO_TRACE(logger_); + const size_t message_frequency = message_meter_.TrackMessage(connection_key); + LOG4CXX_DEBUG(logger_, "Frequency of " << connection_key << " is " << message_frequency); + if (message_frequency > message_max_frequency_) { + LOG4CXX_WARN(logger_, "Frequency of " << connection_key << " is marked as high."); + session_observer_->OnApplicationFloodCallBack(connection_key); + message_meter_.RemoveIdentifier(connection_key); + return true; + } + return false; +} + void ProtocolHandlerImpl::Handle( const impl::RawFordMessageFromMobile message) { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); if (NULL == session_observer_) { LOG4CXX_WARN(logger_, "Session Observer is NULL"); return; } + + switch (message->service_type()) { + case kMobileNav: + case kAudio: + break; + default: { + const uint32_t connection_key = session_observer_->KeyFromPair( + message->connection_id(), message->session_id()); + if (TrackMessage(connection_key)) { + return; + } + } + break; + } + connection_handler::ConnectionHandlerImpl *connection_handler = connection_handler::ConnectionHandlerImpl::instance(); - LOG4CXX_INFO(logger_, "Message : " << message.get()); - LOG4CXX_INFO(logger_, "session_observer_: " <<session_observer_); - uint8_t c_id = message->connection_id(); - uint32_t m_id = message->session_id(); + LOG4CXX_DEBUG(logger_, "Message : " << message.get()); + const uint8_t c_id = message->connection_id(); + const uint32_t m_id = message->session_id(); if (session_observer_->IsHeartBeatSupported(c_id, m_id)) { - connection_handler->KeepConnectionAlive(message->connection_id(), - message->session_id()); + connection_handler->KeepConnectionAlive(c_id, m_id); } + // TODO(EZamakhov): remove dublication of IncomingDataHandler logic if (((0 != message->data()) && (0 != message->data_size())) || FRAME_TYPE_CONTROL == message->frame_type() || FRAME_TYPE_FIRST == message->frame_type()) { @@ -1167,7 +1076,6 @@ void ProtocolHandlerImpl::Handle( LOG4CXX_WARN(logger_, "handleMessagesFromMobileApp() - incorrect or NULL data"); } - LOG4CXX_TRACE_EXIT(logger_); } void ProtocolHandlerImpl::Handle(const impl::RawFordMessageToMobile message) { @@ -1187,8 +1095,14 @@ void ProtocolHandlerImpl::Handle(const impl::RawFordMessageToMobile message) { SendFrame(message); } +void ProtocolHandlerImpl::Stop() { + raw_ford_messages_from_mobile_.Shutdown(); + raw_ford_messages_to_mobile_.Shutdown(); +} + #ifdef ENABLE_SECURITY -void ProtocolHandlerImpl::set_security_manager(security_manager::SecurityManager* security_manager) { +void ProtocolHandlerImpl::set_security_manager( + security_manager::SecurityManager* security_manager) { if (!security_manager) { LOG4CXX_ERROR(logger_, "Invalid (NULL) pointer to SecurityManager."); return; |