diff options
Diffstat (limited to 'implementation/endpoints/src/endpoint_manager_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/endpoint_manager_impl.cpp | 336 |
1 files changed, 248 insertions, 88 deletions
diff --git a/implementation/endpoints/src/endpoint_manager_impl.cpp b/implementation/endpoints/src/endpoint_manager_impl.cpp index dbb2107..a47234c 100644 --- a/implementation/endpoints/src/endpoint_manager_impl.cpp +++ b/implementation/endpoints/src/endpoint_manager_impl.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -7,17 +7,19 @@ #include <vsomeip/internal/logger.hpp> +#include "../include/local_tcp_server_endpoint_impl.hpp" +#if defined(__linux__) || defined(ANDROID) +#include "../include/local_uds_server_endpoint_impl.hpp" +#endif #include "../include/udp_client_endpoint_impl.hpp" #include "../include/udp_server_endpoint_impl.hpp" #include "../include/tcp_client_endpoint_impl.hpp" #include "../include/tcp_server_endpoint_impl.hpp" -#include "../include/local_server_endpoint_impl.hpp" #include "../include/virtual_server_endpoint_impl.hpp" #include "../include/endpoint_definition.hpp" #include "../../routing/include/routing_manager_base.hpp" #include "../../routing/include/routing_manager_impl.hpp" #include "../../routing/include/routing_host.hpp" -#include "../../security/include/security.hpp" #include "../../utility/include/utility.hpp" #include "../../utility/include/byteorder.hpp" @@ -33,9 +35,28 @@ namespace vsomeip_v3 { endpoint_manager_impl::endpoint_manager_impl( - routing_manager_base* const _rm, boost::asio::io_service& _io, + routing_manager_base* const _rm, boost::asio::io_context &_io, const std::shared_ptr<configuration>& _configuration) : - endpoint_manager_base(_rm, _io, _configuration) { + endpoint_manager_base(_rm, _io, _configuration), + is_processing_options_(true), + options_thread_(std::bind(&endpoint_manager_impl::process_multicast_options, this)) { + + local_port_ = port_t(_configuration->get_routing_host_port() + 1); + if (!is_local_routing_) { + VSOMEIP_INFO << __func__ << ": Connecting to other clients from " + << configuration_->get_routing_host_address().to_string() + << ":" << std::dec << local_port_; + } +} + +endpoint_manager_impl::~endpoint_manager_impl() { + + { + std::lock_guard<std::mutex> its_guard(options_mutex_); + is_processing_options_ = false; + options_condition_.notify_one(); + } + options_thread_.join(); } std::shared_ptr<endpoint> endpoint_manager_impl::find_or_create_remote_client( @@ -266,9 +287,7 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_server_endpoint( << " Server endpoint creation failed." << " Reason: "<< e.what() << " Port: " << _port - << " (reliable=" - << (_reliable ? "reliable" : "unreliable") - << ")"; + << " (" << _reliable << ")"; } return (its_endpoint); @@ -457,6 +476,7 @@ void endpoint_manager_impl::find_or_create_multicast_endpoint( // Only save multicast info if we created a new endpoint // to be able to delete the new endpoint // as soon as the instance stops offering its service + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); std::shared_ptr<endpoint_definition> endpoint_def = endpoint_definition::get(_address, _port, false, _service, _instance); multicast_info[_service][_instance] = endpoint_def; @@ -588,79 +608,139 @@ void endpoint_manager_impl::print_status() const { } } -std::shared_ptr<local_server_endpoint_impl> -endpoint_manager_impl::create_local_server( - bool* _is_socket_activated, - const std::shared_ptr<routing_host>& _routing_host) { - std::shared_ptr<local_server_endpoint_impl> its_endpoint; +bool +endpoint_manager_impl::create_routing_root( + std::shared_ptr<endpoint> &_root, + bool &_is_socket_activated, + const std::shared_ptr<routing_host> &_host) { + std::stringstream its_endpoint_path_ss; - its_endpoint_path_ss << utility::get_base_path(configuration_) << VSOMEIP_ROUTING_CLIENT; + its_endpoint_path_ss << utility::get_base_path(configuration_->get_network()) + << VSOMEIP_ROUTING_CLIENT; const std::string its_endpoint_path = its_endpoint_path_ss.str(); - client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host()); - if (security::get()->is_enabled() && get_client() != routing_host_id) { - VSOMEIP_ERROR << "endpoint_manager_impl::create_local_server: " - << std::hex << "Client " << get_client() << " isn't allowed" - << " to create the routing endpoint as its not configured as the routing master!"; - return its_endpoint; + client_t its_routing_host_id = configuration_->get_id(configuration_->get_routing_host_name()); + if (configuration_->is_security_enabled() && get_client() != its_routing_host_id) { + VSOMEIP_ERROR << "endpoint_manager_impl::" << __func__ << ": " + << "Client [" + << std::hex << std::setw(4) << std::setfill('0') + << get_client() + << "] does not match the configured routing manager client identifier [" + << std::hex << std::setw(4) << std::setfill('0') + << its_routing_host_id + << "]"; + + return (false); } - uint32_t native_socket_fd = 0; - int32_t num_fd = 0; + + if (configuration_->is_local_routing()) { + uint32_t native_socket_fd = 0; + int32_t num_fd = 0; #ifndef WITHOUT_SYSTEMD - num_fd = sd_listen_fds(0); + num_fd = sd_listen_fds(0); +#endif + +#if defined(__linux__) || defined(ANDROID) + if (num_fd > 1) { + VSOMEIP_ERROR << "Too many file descriptors received by systemd socket activation! num_fd: " << num_fd; + } else if (num_fd == 1) { + native_socket_fd = SD_LISTEN_FDS_START + 0; + VSOMEIP_INFO << "Using native socket created by systemd socket activation! fd: " << native_socket_fd; + if (is_local_routing_) { + try { + _root = + std::make_shared <local_uds_server_endpoint_impl>( + shared_from_this(), _host, +#if VSOMEIP_BOOST_VERSION < 106600 + boost::asio::local::stream_protocol_ext::endpoint(its_endpoint_path), +#else + boost::asio::local::stream_protocol::endpoint(its_endpoint_path), #endif - if (num_fd > 1) { - VSOMEIP_ERROR << "Too many file descriptors received by systemd socket activation! num_fd: " << num_fd; - } else if (num_fd == 1) { - native_socket_fd = SD_LISTEN_FDS_START + 0; - VSOMEIP_INFO << "Using native socket created by systemd socket activation! fd: " << native_socket_fd; - #ifndef _WIN32 - try { - its_endpoint = - std::make_shared <local_server_endpoint_impl>( - shared_from_this(), _routing_host, + io_, + native_socket_fd, + configuration_, true); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "Routing endpoint creation failed. Client ID: " + << std::hex << std::setw(4) << std::setfill('0') + << VSOMEIP_ROUTING_CLIENT << ": " << e.what(); + } + } + _is_socket_activated = true; + + } else { + if (is_local_routing_) { + if (-1 == ::unlink(its_endpoint_path.c_str()) && errno != ENOENT) { + VSOMEIP_ERROR << "endpoint_manager_impl::create_local_server unlink failed (" + << its_endpoint_path << "): "<< std::strerror(errno); + } + VSOMEIP_INFO << __func__ << ": Routing root @ " << its_endpoint_path; + + try { + _root = + std::make_shared <local_uds_server_endpoint_impl>( + shared_from_this(), _host, +#if VSOMEIP_BOOST_VERSION < 106600 boost::asio::local::stream_protocol_ext::endpoint(its_endpoint_path), - io_, - native_socket_fd, - configuration_, true); - } catch (const std::exception &e) { - VSOMEIP_ERROR << "Server endpoint creation failed. Client ID: " - << std::hex << std::setw(4) << std::setfill('0') - << VSOMEIP_ROUTING_CLIENT << ": " << e.what(); +#else + boost::asio::local::stream_protocol::endpoint(its_endpoint_path), +#endif + io_, configuration_, true); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "Local routing endpoint creation failed. Client ID: " + << std::hex << std::setw(4) << std::setfill('0') + << VSOMEIP_ROUTING_CLIENT << ": " << e.what(); + + return (false); + } } - #endif - *_is_socket_activated = true; + + _is_socket_activated = false; + } +#else + ::unlink(its_endpoint_path.c_str()); + port_t port = VSOMEIP_INTERNAL_BASE_PORT; + VSOMEIP_INFO << __func__ << ": Routing root @ " << std::dec << port; + + try { + _root = + std::make_shared <local_tcp_server_endpoint_impl>( + shared_from_this(), _host, + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port), + io_, configuration_, true); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "Local routing endpoint creation failed. Client ID: " + << std::hex << std::setw(4) << std::setfill('0') + << VSOMEIP_ROUTING_CLIENT << ": " << e.what(); + + return (false); + } + + _is_socket_activated = false; +#endif // __linux__ || ANDROID } else { - #if _WIN32 - ::_unlink(its_endpoint_path.c_str()); - int port = VSOMEIP_INTERNAL_BASE_PORT; - VSOMEIP_INFO << "Routing endpoint at " << port; - #else - if (-1 == ::unlink(its_endpoint_path.c_str()) && errno != ENOENT) { - VSOMEIP_ERROR << "endpoint_manager_impl::create_local_server unlink failed (" - << its_endpoint_path << "): "<< std::strerror(errno); - } - VSOMEIP_INFO << __func__ << " Routing endpoint at " << its_endpoint_path; - #endif + auto its_address = configuration_->get_routing_host_address(); + auto its_port = configuration_->get_routing_host_port(); + + VSOMEIP_INFO << __func__ << ": Routing root @ " + << its_address.to_string() << ":" << std::dec << its_port; try { - its_endpoint = - std::make_shared <local_server_endpoint_impl>( - shared_from_this(), _routing_host, - #ifdef _WIN32 - boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port), - #else - boost::asio::local::stream_protocol_ext::endpoint(its_endpoint_path), - #endif - io_, - configuration_, true); + _root = + std::make_shared <local_tcp_server_endpoint_impl>( + shared_from_this(), _host, + boost::asio::ip::tcp::endpoint(its_address, its_port), + io_, configuration_, true); } catch (const std::exception &e) { - VSOMEIP_ERROR << "Server endpoint creation failed. Client ID: " + VSOMEIP_ERROR << "Remote routing root endpoint creation failed. Client ID: " << std::hex << std::setw(4) << std::setfill('0') << VSOMEIP_ROUTING_CLIENT << ": " << e.what(); + + return (false); } - *_is_socket_activated = false; + + _is_socket_activated = false; } - return its_endpoint; + + return (true); } instance_t endpoint_manager_impl::find_instance( @@ -807,8 +887,8 @@ void endpoint_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) { if (!is_reliable) { static_cast<routing_manager_impl*>(rm_)->on_availability( its_service.first, its_instance.first, - false, its_info->get_major(), - its_info->get_minor()); + availability_state_e::AS_UNAVAILABLE, + its_info->get_major(), its_info->get_minor()); } static_cast<routing_manager_impl*>(rm_)->service_endpoint_disconnected( its_service.first, its_instance.first, @@ -820,7 +900,9 @@ void endpoint_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) { } } -bool endpoint_manager_impl::on_bind_error(std::shared_ptr<endpoint> _endpoint, std::uint16_t _remote_port) { +bool endpoint_manager_impl::on_bind_error(std::shared_ptr<endpoint> _endpoint, + const boost::asio::ip::address &_remote_address, std::uint16_t _remote_port) { + std::lock_guard<std::recursive_mutex> its_ep_lock(endpoint_mutex_); for (auto &its_service : remote_services_) { for (auto &its_instance : its_service.second) { @@ -833,15 +915,16 @@ bool endpoint_manager_impl::on_bind_error(std::shared_ptr<endpoint> _endpoint, s uint16_t its_new_local_port(ILLEGAL_PORT); std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_); - if (configuration_->get_client_port(its_service.first, - its_instance.first, - _remote_port, - is_reliable, - used_client_ports_, - its_new_local_port)) { + std::map<bool, std::set<port_t> > its_used_client_ports; + get_used_client_ports(_remote_address, _remote_port, its_used_client_ports); + if (configuration_->get_client_port( + its_service.first, its_instance.first, + _remote_port, is_reliable, + its_used_client_ports, its_new_local_port)) { _endpoint->set_local_port(its_new_local_port); its_lock.unlock(); - release_port(its_old_local_port, _endpoint->is_reliable()); + release_used_client_port(_remote_address, _remote_port, + _endpoint->is_reliable(), its_old_local_port); return true; } } @@ -866,9 +949,43 @@ void endpoint_manager_impl::on_error( _receiver->is_reliable(), _receiver, _remote_address, _remote_port); } -void endpoint_manager_impl::release_port(uint16_t _port, bool _reliable) { +void +endpoint_manager_impl::get_used_client_ports( + const boost::asio::ip::address &_remote_address, port_t _remote_port, + std::map<bool, std::set<port_t> > &_used_ports) { + auto find_address = used_client_ports_.find(_remote_address); + if (find_address != used_client_ports_.end()) { + auto find_port = find_address->second.find(_remote_port); + if (find_port != find_address->second.end()) + _used_ports = find_port->second; + } +} + +void +endpoint_manager_impl::request_used_client_port( + const boost::asio::ip::address &_remote_address, port_t _remote_port, + bool _reliable, port_t _local_port) { + std::lock_guard<std::mutex> its_lock(used_client_ports_mutex_); - used_client_ports_[_reliable].erase(_port); + used_client_ports_[_remote_address][_remote_port] + [_reliable].insert(_local_port); +} + +void +endpoint_manager_impl::release_used_client_port( + const boost::asio::ip::address &_remote_address, port_t _remote_port, + bool _reliable, port_t _local_port) { + + std::lock_guard<std::mutex> its_lock(used_client_ports_mutex_); + auto find_address = used_client_ports_.find(_remote_address); + if (find_address != used_client_ports_.end()) { + auto find_port = find_address->second.find(_remote_port); + if (find_port != find_address->second.end()) { + auto find_reliable = find_port->second.find(_reliable); + if (find_reliable != find_port->second.end()) + find_reliable->second.erase(_local_port); + } + } } std::shared_ptr<endpoint> @@ -946,6 +1063,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client( std::shared_ptr<endpoint> its_endpoint; std::shared_ptr<endpoint_definition> its_endpoint_def; uint16_t its_local_port; + + boost::asio::ip::address its_remote_address; uint16_t its_remote_port = ILLEGAL_PORT; auto found_service = remote_service_info_.find(_service); @@ -955,6 +1074,7 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client( auto found_reliability = found_instance->second.find(_reliable); if (found_reliability != found_instance->second.end()) { its_endpoint_def = found_reliability->second; + its_remote_address = its_endpoint_def->get_address(); its_remote_port = its_endpoint_def->get_port(); } } @@ -963,25 +1083,31 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client( if( its_remote_port != ILLEGAL_PORT) { // if client port range for remote service port range is configured // and remote port is in range, determine unused client port - std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_); - if (configuration_->get_client_port(_service, _instance, its_remote_port, _reliable, - used_client_ports_, its_local_port)) { - if(its_endpoint_def) { + std::map<bool, std::set<port_t> > its_used_client_ports; + { + std::lock_guard<std::mutex> its_lock(used_client_ports_mutex_); + get_used_client_ports(its_remote_address, its_remote_port, its_used_client_ports); + } + if (configuration_->get_client_port(_service, _instance, + its_remote_port, _reliable, + its_used_client_ports, its_local_port)) { + if (its_endpoint_def) { its_endpoint = create_client_endpoint( - its_endpoint_def->get_address(), + its_remote_address, its_local_port, - its_endpoint_def->get_port(), + its_remote_port, _reliable); } if (its_endpoint) { - partition_id_t its_partition - = configuration_->get_partition_id(_service, _instance); - used_client_ports_[_reliable].insert(its_local_port); - its_lock.unlock(); + request_used_client_port(its_remote_address, its_remote_port, + _reliable, its_local_port); + service_instances_[_service][its_endpoint.get()] = _instance; remote_services_[_service][_instance][_reliable] = its_endpoint; + partition_id_t its_partition + = configuration_->get_partition_id(_service, _instance); client_endpoints_by_ip_[its_endpoint_def->get_address()] [its_endpoint_def->get_port()] [_reliable] @@ -1148,4 +1274,38 @@ endpoint_manager_impl::log_server_states() const { VSOMEIP_INFO << "ESQ: [" << its_log.str() << "]"; } +void +endpoint_manager_impl::add_multicast_option(const multicast_option_t &_option) { + + std::lock_guard<std::mutex> its_guard(options_mutex_); + options_queue_.push(_option); + options_condition_.notify_one(); +} + +void +endpoint_manager_impl::process_multicast_options() { + + std::unique_lock<std::mutex> its_lock(options_mutex_); + while (is_processing_options_) { + if (options_queue_.size() > 0) { + auto its_front = options_queue_.front(); + options_queue_.pop(); + auto its_udp_server_endpoint + = std::dynamic_pointer_cast<udp_server_endpoint_impl>(its_front.endpoint_); + if (its_udp_server_endpoint) { + // Unlock before setting the option as this might block + its_lock.unlock(); + its_udp_server_endpoint->set_multicast_option( + its_front.address_, its_front.is_join_); + // Lock again after setting the option + its_lock.lock(); + } + } else { + options_condition_.wait(its_lock, [this] { + return !options_queue_.empty() || !is_processing_options_; + }); + } + } +} + } // namespace vsomeip_v3 |