diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_proxy.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_proxy.cpp | 241 |
1 files changed, 135 insertions, 106 deletions
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index a000211..e12e962 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -10,7 +10,7 @@ #include <future> #include <forward_list> -#ifndef WIN32 +#ifndef _WIN32 // for umask #include <sys/types.h> #include <sys/stat.h> @@ -54,7 +54,7 @@ void routing_manager_proxy::init() { routing_manager_base::init(); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); sender_ = create_local(VSOMEIP_ROUTING_CLIENT); } @@ -64,24 +64,24 @@ void routing_manager_proxy::init() { void routing_manager_proxy::start() { is_started_ = true; - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (!sender_) { - // application has been stopped and started again - sender_ = create_local(VSOMEIP_ROUTING_CLIENT); - } - } if (!receiver_) { // application has been stopped and started again init_receiver(); } - - if (sender_) { - sender_->start(); + if (receiver_) { + receiver_->start(); } - if (receiver_) - receiver_->start(); + { + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (!sender_) { + // application has been stopped and started again + sender_ = create_local(VSOMEIP_ROUTING_CLIENT); + } + if (sender_) { + sender_->start(); + } + } } void routing_manager_proxy::stop() { @@ -114,9 +114,15 @@ void routing_manager_proxy::stop() { if (receiver_) { receiver_->stop(); } + receiver_ = nullptr; - if (sender_) { - sender_->stop(); + { + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->stop(); + } + // delete the sender + sender_ = nullptr; } for (auto client: get_connected_clients()) { @@ -125,15 +131,9 @@ void routing_manager_proxy::stop() { } } - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - // delete the sender - sender_ = nullptr; - } - receiver_ = nullptr; std::stringstream its_client; its_client << VSOMEIP_BASE_PATH << std::hex << client_; -#ifdef WIN32 +#ifdef _WIN32 ::_unlink(its_client.str().c_str()); #else if (-1 == ::unlink(its_client.str().c_str())) { @@ -187,7 +187,7 @@ void routing_manager_proxy::send_offer_service(client_t _client, sizeof(_minor)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -226,7 +226,7 @@ void routing_manager_proxy::stop_offer_service(client_t _client, sizeof(_minor)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -287,31 +287,39 @@ void routing_manager_proxy::register_event(client_t _client, std::chrono::milliseconds _cycle, bool _change_resets_cycle, epsilon_change_func_t _epsilon_change_func, bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) { - (void)_is_shadow; (void)_is_cache_placeholder; - routing_manager_base::register_event(_client, _service, _instance, - _event,_eventgroups, _is_field, - _cycle, _change_resets_cycle, - _epsilon_change_func, - _is_provided); - + const event_data_t registration = { + _service, + _instance, + _event, + _is_field, + _is_provided, + _eventgroups + }; + bool is_first(false); { std::lock_guard<std::mutex> its_lock(state_mutex_); - if (state_ == inner_state_type_e::ST_REGISTERED) { + is_first = pending_event_registrations_.find(registration) + == pending_event_registrations_.end(); + if (is_first) { + pending_event_registrations_.insert(registration); + } + } + if (is_first) { + routing_manager_base::register_event(_client, _service, _instance, + _event,_eventgroups, _is_field, + _cycle, _change_resets_cycle, + _epsilon_change_func, + _is_provided); + } + { + std::lock_guard<std::mutex> its_lock(state_mutex_); + if (state_ == inner_state_type_e::ST_REGISTERED && is_first) { send_register_event(client_, _service, _instance, _event, _eventgroups, _is_field, _is_provided); } - event_data_t registration = { - _service, - _instance, - _event, - _is_field, - _is_provided, - _eventgroups - }; - pending_event_registrations_.insert(registration); } } @@ -344,7 +352,7 @@ void routing_manager_proxy::unregister_event(client_t _client, = static_cast<byte_t>(_is_provided); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -418,11 +426,9 @@ void routing_manager_proxy::send_subscribe(client_t _client, service_t _service, auto its_target = find_or_create_local(target_client); its_target->send(its_command, sizeof(its_command)); } else { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_command, sizeof(its_command)); - } + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_command, sizeof(its_command)); } } } @@ -445,16 +451,16 @@ void routing_manager_proxy::send_subscribe_nack(client_t _subscriber, sizeof(_instance)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber, + sizeof(_subscriber)); auto its_target = find_local(_subscriber); if (its_target) { its_target->send(its_command, sizeof(its_command)); } else { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_command, sizeof(its_command)); - } + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_command, sizeof(its_command)); } } } @@ -477,16 +483,16 @@ void routing_manager_proxy::send_subscribe_ack(client_t _subscriber, sizeof(_instance)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber, + sizeof(_subscriber)); auto its_target = find_local(_subscriber); if (its_target) { its_target->send(its_command, sizeof(its_command)); } else { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_command, sizeof(its_command)); - } + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_command, sizeof(its_command)); } } } @@ -497,7 +503,6 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service, { std::lock_guard<std::mutex> its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_REGISTERED) { - bool is_remote(false); byte_t its_command[VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE]; uint32_t its_size = VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE; @@ -513,19 +518,16 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service, sizeof(_instance)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &is_remote, - sizeof(is_remote)); + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = 0; // is_local auto its_target = find_local(_service, _instance); if (its_target) { its_target->send(its_command, sizeof(its_command)); } else { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_command, sizeof(its_command)); - } - }; + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_command, sizeof(its_command)); + } } } remove_pending_subscription(_service, _instance); @@ -591,11 +593,18 @@ bool routing_manager_proxy::send(client_t _client, const byte_t *_data, _instance, _flush, _reliable, VSOMEIP_SEND); } } - // If no direct endpoint could be found/is connected + // If no direct endpoint could be found // or for notifications ~> route to routing_manager_stub - if (!its_target || !its_target->is_connected()) { +#ifdef USE_DLT + bool message_to_stub(false); +#endif + if (!its_target) { + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { its_target = sender_; +#ifdef USE_DLT + message_to_stub = true; +#endif } else { return false; } @@ -615,7 +624,7 @@ bool routing_manager_proxy::send(client_t _client, const byte_t *_data, } } #ifdef USE_DLT - else if (its_target != sender_) { + else if (!message_to_stub) { const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); @@ -656,8 +665,11 @@ bool routing_manager_proxy::send_to( } void routing_manager_proxy::on_connect(std::shared_ptr<endpoint> _endpoint) { - if (_endpoint != sender_) { - return; + { + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (_endpoint != sender_) { + return; + } } is_connected_ = true; if (is_connected_ && is_started_) { @@ -668,7 +680,10 @@ void routing_manager_proxy::on_connect(std::shared_ptr<endpoint> _endpoint) { } void routing_manager_proxy::on_disconnect(std::shared_ptr<endpoint> _endpoint) { - is_connected_ = !(_endpoint == sender_); + { + std::lock_guard<std::mutex> its_lock(sender_mutex_); + is_connected_ = !(_endpoint == sender_); + } if (!is_connected_) { host_->on_state(state_type_e::ST_DEREGISTERED); } @@ -717,8 +732,9 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, instance_t its_instance; eventgroup_t its_eventgroup; major_version_t its_major; - bool is_remote_subscriber; + uint8_t is_remote_subscriber; client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host()); + client_t its_subscriber; if (_size > VSOMEIP_COMMAND_SIZE_POS_MAX) { its_command = _data[VSOMEIP_COMMAND_TYPE_POS]; @@ -818,6 +834,10 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_SUBSCRIBE: + if (_size != VSOMEIP_SUBSCRIBE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a SUBSCRIBE command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], @@ -878,6 +898,10 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_UNSUBSCRIBE: + if (_size != VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received an UNSUBSCRIBE command with wrong ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], @@ -905,14 +929,20 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_SUBSCRIBE_NACK: + if (_size != VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_NACK command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], sizeof(its_instance)); std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4], sizeof(its_eventgroup)); + std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], + sizeof(its_subscriber)); - on_subscribe_nack(its_client, its_service, its_instance, its_eventgroup); + on_subscribe_nack(its_subscriber, its_service, its_instance, its_eventgroup); VSOMEIP_INFO << "SUBSCRIBE NACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." @@ -921,14 +951,20 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_SUBSCRIBE_ACK: + if (_size != VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_ACK command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], sizeof(its_instance)); std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4], sizeof(its_eventgroup)); + std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], + sizeof(its_subscriber)); - on_subscribe_ack(its_client, its_service, its_instance, its_eventgroup); + on_subscribe_ack(its_subscriber, its_service, its_instance, its_eventgroup); VSOMEIP_INFO << "SUBSCRIBE ACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." @@ -1160,20 +1196,16 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data, } } - if (clients_to_delete.size()) { - std::async(std::launch::async, [this, clients_to_delete, restart_sender] () { - for (const auto client : clients_to_delete) { - if (client != VSOMEIP_ROUTING_CLIENT) { - remove_local(client); - } - } - }); + for (const auto client : clients_to_delete) { + if (client != VSOMEIP_ROUTING_CLIENT) { + remove_local(client); + } } if (restart_sender && is_started_) { VSOMEIP_INFO << std::hex << "Application/Client " << get_client() <<": Reconnecting to routing manager."; - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->start(); } @@ -1191,7 +1223,7 @@ void routing_manager_proxy::register_application() { if (is_connected_) { std::lock_guard<std::mutex> its_state_lock(state_mutex_); - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { { state_ = inner_state_type_e::ST_REGISTERING; @@ -1219,7 +1251,7 @@ void routing_manager_proxy::deregister_application() { sizeof(its_size)); if (is_connected_) { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(&its_command[0], uint32_t(its_command.size())); } @@ -1234,11 +1266,9 @@ void routing_manager_proxy::send_pong() const { sizeof(client_t)); if (is_connected_) { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_pong, sizeof(its_pong)); - } + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_pong, sizeof(its_pong)); } } } @@ -1268,7 +1298,7 @@ void routing_manager_proxy::send_request_service(client_t _client, service_t _se sizeof(_use_exclusive_proxy)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -1294,7 +1324,7 @@ void routing_manager_proxy::send_release_service(client_t _client, service_t _se sizeof(_instance)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -1336,7 +1366,7 @@ void routing_manager_proxy::send_register_event(client_t _client, } { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, uint32_t(VSOMEIP_REGISTER_EVENT_COMMAND_SIZE @@ -1361,8 +1391,7 @@ void routing_manager_proxy::on_subscribe_nack(client_t _client, void routing_manager_proxy::on_identify_response(client_t _client, service_t _service, instance_t _instance, bool _reliable) { - static const uint32_t size = uint32_t(VSOMEIP_COMMAND_HEADER_SIZE + sizeof(service_t) + sizeof(instance_t) - + sizeof(bool)); + static const uint32_t size = VSOMEIP_ID_RESPONSE_COMMAND_SIZE; byte_t its_command[size]; uint32_t its_size = size - VSOMEIP_COMMAND_HEADER_SIZE; its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_ID_RESPONSE; @@ -1377,7 +1406,7 @@ void routing_manager_proxy::on_identify_response(client_t _client, service_t _se std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_reliable, sizeof(_reliable)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, size); } @@ -1454,7 +1483,7 @@ void routing_manager_proxy::send_pending_commands() { void routing_manager_proxy::init_receiver() { std::stringstream its_client; its_client << VSOMEIP_BASE_PATH << std::hex << client_; -#ifdef WIN32 +#ifdef _WIN32 ::_unlink(its_client.str().c_str()); int port = VSOMEIP_INTERNAL_BASE_PORT + client_; #else @@ -1472,14 +1501,14 @@ void routing_manager_proxy::init_receiver() { #endif try { receiver_ = std::make_shared<local_server_endpoint_impl>(shared_from_this(), -#ifdef WIN32 +#ifdef _WIN32 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port), #else boost::asio::local::stream_protocol::endpoint(its_client.str()), #endif io_, configuration_->get_max_message_size_local(), configuration_->get_buffer_shrink_threshold()); -#ifdef WIN32 +#ifdef _WIN32 VSOMEIP_INFO << "Listening at " << port; #else VSOMEIP_INFO << "Listening at " << its_client.str(); @@ -1488,7 +1517,7 @@ void routing_manager_proxy::init_receiver() { host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED); VSOMEIP_ERROR << "Client ID: " << std::hex << client_ << ": " << e.what(); } -#ifndef WIN32 +#ifndef _WIN32 ::umask(previous_mask); #endif } @@ -1514,7 +1543,7 @@ void routing_manager_proxy::notify_remote_initally(service_t _service, instance_ std::lock_guard<std::mutex> its_lock(serialize_mutex_); if (serializer_->serialize(its_notification.get())) { { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { send_local(sender_, VSOMEIP_ROUTING_CLIENT, serializer_->get_data(), serializer_->get_size(), _instance, true, false, VSOMEIP_NOTIFY); @@ -1573,7 +1602,7 @@ void routing_manager_proxy::register_application_timeout_cbk( } } if (register_again) { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); VSOMEIP_WARNING << std::hex << "Client 0x" << get_client() << " register timeout!" << " : Restart route to stub!"; if (sender_) { @@ -1592,7 +1621,7 @@ void routing_manager_proxy::send_registered_ack() { std::memset(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], 0, sizeof(uint32_t)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, VSOMEIP_COMMAND_HEADER_SIZE); } |