diff options
Diffstat (limited to 'implementation/endpoints/src/endpoint_manager_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/endpoint_manager_impl.cpp | 190 |
1 files changed, 133 insertions, 57 deletions
diff --git a/implementation/endpoints/src/endpoint_manager_impl.cpp b/implementation/endpoints/src/endpoint_manager_impl.cpp index a82a173..dbb2107 100644 --- a/implementation/endpoints/src/endpoint_manager_impl.cpp +++ b/implementation/endpoints/src/endpoint_manager_impl.cpp @@ -328,8 +328,10 @@ bool endpoint_manager_impl::remove_server_endpoint(uint16_t _port, bool _reliabl return ret; } -void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_t _instance, - bool _reliable) { +void +endpoint_manager_impl::clear_client_endpoints( + service_t _service, instance_t _instance, bool _reliable) { + std::shared_ptr<endpoint> endpoint_to_delete; bool other_services_reachable_through_endpoint(false); { @@ -371,8 +373,12 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_ } if (!other_services_reachable_through_endpoint) { - std::uint16_t its_port(0); + partition_id_t its_partition; boost::asio::ip::address its_address; + std::uint16_t its_port(0); + + its_partition = configuration_->get_partition_id(_service, _instance); + if (_reliable) { std::shared_ptr<tcp_client_endpoint_impl> ep = std::dynamic_pointer_cast<tcp_client_endpoint_impl>(endpoint_to_delete); @@ -392,15 +398,21 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_ if (found_ip != client_endpoints_by_ip_.end()) { const auto found_port = found_ip->second.find(its_port); if (found_port != found_ip->second.end()) { - const auto found_reliable = found_port->second.find(_reliable); + auto found_reliable = found_port->second.find(_reliable); if (found_reliable != found_port->second.end()) { - if (found_reliable->second == endpoint_to_delete) { - found_port->second.erase(_reliable); - // delete if necessary - if (!found_port->second.size()) { - found_ip->second.erase(found_port); - if (!found_ip->second.size()) { - client_endpoints_by_ip_.erase(found_ip); + const auto found_partition = found_reliable->second.find(its_partition); + if (found_partition != found_reliable->second.end()) { + if (found_partition->second == endpoint_to_delete) { + found_reliable->second.erase(its_partition); + // delete if necessary + if (0 == found_reliable->second.size()) { + found_port->second.erase(_reliable); + if (0 == found_port->second.size()) { + found_ip->second.erase(found_port); + if (0 == found_ip->second.size()) { + client_endpoints_by_ip_.erase(found_ip); + } + } } } } @@ -455,8 +467,14 @@ void endpoint_manager_impl::find_or_create_multicast_endpoint( std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); service_instances_multicast_[_service][_sender] = _instance; } - dynamic_cast<udp_server_endpoint_impl*>(its_endpoint.get())->join_unlocked( - _address.to_string()); + + auto its_udp_server_endpoint + = std::dynamic_pointer_cast<udp_server_endpoint_impl>(its_endpoint); + if (_port != configuration_->get_sd_port()) { + its_udp_server_endpoint->join(_address.to_string()); + } else { + its_udp_server_endpoint->join_unlocked(_address.to_string()); + } } else { VSOMEIP_ERROR <<"Could not find/create multicast endpoint!"; } @@ -540,11 +558,13 @@ void endpoint_manager_impl::print_status() const { VSOMEIP_INFO << "status start remote client endpoints:"; std::uint32_t num_remote_client_endpoints(0); // normal endpoints - for (const auto &a : client_endpoints_by_ip) { - for (const auto& p : a.second) { - for (const auto& ru : p.second) { - ru.second->print_status(); - num_remote_client_endpoints++; + for (const auto &its_address : client_endpoints_by_ip) { + for (const auto &its_port : its_address.second) { + for (const auto &its_reliability : its_port.second) { + for (const auto &its_partition : its_reliability.second) { + its_partition.second->print_status(); + num_remote_client_endpoints++; + } } } } @@ -800,6 +820,37 @@ void endpoint_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) { } } +bool endpoint_manager_impl::on_bind_error(std::shared_ptr<endpoint> _endpoint, std::uint16_t _remote_port) { + std::lock_guard<std::recursive_mutex> its_ep_lock(endpoint_mutex_); + for (auto &its_service : remote_services_) { + for (auto &its_instance : its_service.second) { + const bool is_reliable = _endpoint->is_reliable(); + auto found_endpoint = its_instance.second.find(is_reliable); + if (found_endpoint != its_instance.second.end()) { + if (found_endpoint->second == _endpoint) { + // get a new client port using service / instance / remote port + uint16_t its_old_local_port = _endpoint->get_local_port(); + uint16_t its_new_local_port(ILLEGAL_PORT); + + std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_); + if (configuration_->get_client_port(its_service.first, + its_instance.first, + _remote_port, + is_reliable, + used_client_ports_, + its_new_local_port)) { + _endpoint->set_local_port(its_new_local_port); + its_lock.unlock(); + release_port(its_old_local_port, _endpoint->is_reliable()); + return true; + } + } + } + } + } + return false; +} + void endpoint_manager_impl::on_error( const byte_t *_data, length_t _length, endpoint* const _receiver, const boost::asio::ip::address &_remote_address, @@ -820,8 +871,10 @@ void endpoint_manager_impl::release_port(uint16_t _port, bool _reliable) { used_client_ports_[_reliable].erase(_port); } -std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client( +std::shared_ptr<endpoint> +endpoint_manager_impl::find_remote_client( service_t _service, instance_t _instance, bool _reliable) { + std::shared_ptr<endpoint> its_endpoint; auto found_service = remote_services_.find(_service); if (found_service != remote_services_.end()) { @@ -837,35 +890,46 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client( return its_endpoint; } - // If another service is hosted on the same server_endpoint - // reuse the existing client_endpoint. + // Endpoint did not yet exist. Get the partition id to check + // whether the client endpoint for the partition does exist. + partition_id_t its_partition_id + = configuration_->get_partition_id(_service, _instance); + + // If another service within the same partition is hosted on the + // same server_endpoint reuse the existing client_endpoint. auto found_service_info = remote_service_info_.find(_service); - if(found_service_info != remote_service_info_.end()) { + if (found_service_info != remote_service_info_.end()) { auto found_instance = found_service_info->second.find(_instance); - if(found_instance != found_service_info->second.end()) { + if (found_instance != found_service_info->second.end()) { auto found_reliable = found_instance->second.find(_reliable); - if(found_reliable != found_instance->second.end()) { - std::shared_ptr<endpoint_definition> its_ep_def = - found_reliable->second; + if (found_reliable != found_instance->second.end()) { + std::shared_ptr<endpoint_definition> its_ep_def + = found_reliable->second; auto found_address = client_endpoints_by_ip_.find( its_ep_def->get_address()); - if(found_address != client_endpoints_by_ip_.end()) { + if (found_address != client_endpoints_by_ip_.end()) { auto found_port = found_address->second.find( its_ep_def->get_remote_port()); - if(found_port != found_address->second.end()) { - auto found_reliable2 = found_port->second.find( - _reliable); - if(found_reliable2 != found_port->second.end()) { - its_endpoint = found_reliable2->second; - // store the endpoint under this service/instance id - // as well - needed for later cleanup - remote_services_[_service][_instance][_reliable] = - its_endpoint; - service_instances_[_service][its_endpoint.get()] = _instance; - // add endpoint to serviceinfo object - auto found_service_info = rm_->find_service(_service,_instance); - if (found_service_info) { - found_service_info->set_endpoint(its_endpoint, _reliable); + if (found_port != found_address->second.end()) { + auto found_reliable2 + = found_port->second.find(_reliable); + if (found_reliable2 != found_port->second.end()) { + auto found_partition + = found_reliable2->second.find(its_partition_id); + if (found_partition != found_reliable2->second.end()) { + its_endpoint = found_partition->second; + + // store the endpoint under this service/instance id + // as well - needed for later cleanup + remote_services_[_service][_instance][_reliable] + = its_endpoint; + service_instances_[_service][its_endpoint.get()] = _instance; + + // add endpoint to serviceinfo object + auto found_service_info = rm_->find_service(_service,_instance); + if (found_service_info) { + found_service_info->set_endpoint(its_endpoint, _reliable); + } } } } @@ -873,7 +937,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client( } } } - return its_endpoint; + + return (its_endpoint); } std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client( @@ -910,6 +975,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client( } if (its_endpoint) { + partition_id_t its_partition + = configuration_->get_partition_id(_service, _instance); used_client_ports_[_reliable].insert(its_local_port); its_lock.unlock(); service_instances_[_service][its_endpoint.get()] = _instance; @@ -917,12 +984,19 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client( client_endpoints_by_ip_[its_endpoint_def->get_address()] [its_endpoint_def->get_port()] - [_reliable] = its_endpoint; + [_reliable] + [its_partition]= its_endpoint; // Set the basic route to the service in the service info auto found_service_info = rm_->find_service(_service, _instance); if (found_service_info) { found_service_info->set_endpoint(its_endpoint, _reliable); } + boost::system::error_code ec; + VSOMEIP_INFO << "endpoint_manager_impl::create_remote_client: " + << its_endpoint_def->get_address().to_string(ec) + << ":" << std::dec << its_endpoint_def->get_port() + << " reliable: " << _reliable + << " using local port: " << std::dec << its_local_port; } } } @@ -983,18 +1057,20 @@ endpoint_manager_impl::log_client_states() const { its_client_endpoints = client_endpoints_by_ip_; } - for (const auto& its_address : its_client_endpoints) { - for (const auto& its_port : its_address.second) { - for (const auto& its_reliability : its_port.second) { - size_t its_queue_size = its_reliability.second->get_queue_size(); - if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE) - its_client_queue_sizes.push_back( - std::make_pair( - std::make_tuple( - its_address.first, - its_port.first, - its_reliability.first), - its_queue_size)); + for (const auto &its_address : its_client_endpoints) { + for (const auto &its_port : its_address.second) { + for (const auto &its_reliability : its_port.second) { + for (const auto &its_partition : its_reliability.second) { + size_t its_queue_size = its_partition.second->get_queue_size(); + if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE) + its_client_queue_sizes.push_back( + std::make_pair( + std::make_tuple( + its_address.first, + its_port.first, + its_reliability.first), + its_queue_size)); + } } } } @@ -1040,8 +1116,8 @@ endpoint_manager_impl::log_server_states() const { its_server_endpoints = server_endpoints_; } - for (const auto& its_port : its_server_endpoints) { - for (const auto& its_reliability : its_port.second) { + for (const auto &its_port : its_server_endpoints) { + for (const auto &its_reliability : its_port.second) { size_t its_queue_size = its_reliability.second->get_queue_size(); if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE) its_client_queue_sizes.push_back( |