diff options
Diffstat (limited to 'implementation/endpoints/src/endpoint_manager_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/endpoint_manager_impl.cpp | 125 |
1 files changed, 103 insertions, 22 deletions
diff --git a/implementation/endpoints/src/endpoint_manager_impl.cpp b/implementation/endpoints/src/endpoint_manager_impl.cpp index dc02aa5..32ba31d 100644 --- a/implementation/endpoints/src/endpoint_manager_impl.cpp +++ b/implementation/endpoints/src/endpoint_manager_impl.cpp @@ -146,18 +146,60 @@ void endpoint_manager_impl::is_remote_service_known( void endpoint_manager_impl::add_remote_service_info( service_t _service, instance_t _instance, const std::shared_ptr<endpoint_definition>& _ep_definition) { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - remote_service_info_[_service][_instance][_ep_definition->is_reliable()] = + + std::shared_ptr<serviceinfo> its_info; + std::shared_ptr<endpoint> its_endpoint; + bool must_report(false); + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + remote_service_info_[_service][_instance][_ep_definition->is_reliable()] = _ep_definition; + + if (_ep_definition->is_reliable()) { + its_endpoint = find_remote_client(_service, _instance, true); + must_report = (its_endpoint && its_endpoint->is_established_or_connected()); + if (must_report) + its_info = rm_->find_service(_service, _instance); + } + } + + if (must_report) + static_cast<routing_manager_impl*>(rm_)->service_endpoint_connected( + _service, _instance, its_info->get_major(), its_info->get_minor(), + its_endpoint, false); } void endpoint_manager_impl::add_remote_service_info( service_t _service, instance_t _instance, const std::shared_ptr<endpoint_definition>& _ep_definition_reliable, const std::shared_ptr<endpoint_definition>& _ep_definition_unreliable) { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - remote_service_info_[_service][_instance][true] = _ep_definition_reliable; - remote_service_info_[_service][_instance][false] = _ep_definition_unreliable; + + std::shared_ptr<serviceinfo> its_info; + std::shared_ptr<endpoint> its_reliable, its_unreliable; + bool must_report(false); + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + remote_service_info_[_service][_instance][false] = _ep_definition_unreliable; + remote_service_info_[_service][_instance][true] = _ep_definition_reliable; + + its_unreliable = find_remote_client(_service, _instance, false); + its_reliable = find_remote_client(_service, _instance, true); + + must_report = (its_unreliable && its_unreliable->is_established_or_connected() + && its_reliable && its_reliable->is_established_or_connected()); + + if (must_report) + its_info = rm_->find_service(_service, _instance); + } + + if (must_report) { + static_cast<routing_manager_impl*>(rm_)->service_endpoint_connected( + _service, _instance, its_info->get_major(), its_info->get_minor(), + its_unreliable, false); + static_cast<routing_manager_impl*>(rm_)->service_endpoint_connected( + _service, _instance, its_info->get_major(), its_info->get_minor(), + its_reliable, false); + } } void endpoint_manager_impl::clear_remote_service_info(service_t _service, @@ -230,7 +272,6 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_server_endpoint( return (its_endpoint); } - std::shared_ptr<endpoint> endpoint_manager_impl::find_server_endpoint( uint16_t _port, bool _reliable) const { std::shared_ptr<endpoint> its_endpoint; @@ -247,15 +288,20 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_server_endpoint( std::shared_ptr<endpoint> endpoint_manager_impl::find_or_create_server_endpoint( uint16_t _port, bool _reliable, bool _start, service_t _service, - instance_t _instance) { + instance_t _instance, bool &_is_found, bool _is_multicast) { std::shared_ptr<endpoint> its_endpoint = find_server_endpoint(_port, _reliable); + _is_found = false; if (!its_endpoint) { its_endpoint = create_server_endpoint(_port, _reliable, _start); + } else { + _is_found = true; } if (its_endpoint) { std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - service_instances_[_service][its_endpoint.get()] = _instance; + if (!_is_multicast) { + service_instances_[_service][its_endpoint.get()] = _instance; + } its_endpoint->increment_use_count(); } return (its_endpoint); @@ -369,6 +415,7 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_ void endpoint_manager_impl::find_or_create_multicast_endpoint( service_t _service, instance_t _instance, + const boost::asio::ip::address &_sender, const boost::asio::ip::address &_address, uint16_t _port) { bool multicast_known(false); { @@ -386,25 +433,27 @@ void endpoint_manager_impl::find_or_create_multicast_endpoint( } } } - if (!multicast_known) { - // Save multicast info to be able to delete the endpoint - // as soon as the instance stops offering its service - std::shared_ptr<endpoint_definition> endpoint_def = - endpoint_definition::get(_address, _port, false, _service, _instance); - multicast_info[_service][_instance] = endpoint_def; - } } const bool is_someip = configuration_->is_someip(_service, _instance); - + bool _is_found(false); // Create multicast endpoint & join multicase group std::shared_ptr<endpoint> its_endpoint = find_or_create_server_endpoint( - _port, false, is_someip, _service, _instance); + _port, false, is_someip, _service, _instance, _is_found, true); + if (!_is_found) { + // Only save multicast info if we created a new endpoint + // to be able to delete the new endpoint + // as soon as the instance stops offering its service + std::shared_ptr<endpoint_definition> endpoint_def = + endpoint_definition::get(_address, _port, false, _service, _instance); + multicast_info[_service][_instance] = endpoint_def; + } + if (its_endpoint) { if (!multicast_known) { std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - service_instances_[_service][its_endpoint.get()] = _instance; + service_instances_multicast_[_service][_sender] = _instance; } - dynamic_cast<udp_server_endpoint_impl*>(its_endpoint.get())->join( + dynamic_cast<udp_server_endpoint_impl*>(its_endpoint.get())->join_unlocked( _address.to_string()); } else { VSOMEIP_ERROR <<"Could not find/create multicast endpoint!"; @@ -412,7 +461,6 @@ void endpoint_manager_impl::find_or_create_multicast_endpoint( } void endpoint_manager_impl::clear_multicast_endpoints(service_t _service, instance_t _instance) { - std::shared_ptr<endpoint> multicast_endpoint; std::string address; @@ -438,14 +486,14 @@ void endpoint_manager_impl::clear_multicast_endpoints(service_t _service, instan if (0 >= multicast_info[_service].size()) { multicast_info.erase(_service); } - // Clear service_instances_ for multicast endpoint - (void)remove_instance(_service, multicast_endpoint.get()); + (void)remove_instance_multicast(_service, _instance); } } } if (multicast_endpoint) { dynamic_cast<udp_server_endpoint_impl*>( multicast_endpoint.get())->leave(address); + multicast_endpoint->stop(); } } @@ -607,6 +655,20 @@ instance_t endpoint_manager_impl::find_instance( return its_instance; } +instance_t endpoint_manager_impl::find_instance_multicast( + service_t _service, const boost::asio::ip::address &_sender) const { + instance_t its_instance(0xFFFF); + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + auto found_service = service_instances_multicast_.find(_service); + if (found_service != service_instances_multicast_.end()) { + auto found_sender = found_service->second.find(_sender); + if (found_sender != found_service->second.end()) { + its_instance = found_sender->second; + } + } + return its_instance; +} + bool endpoint_manager_impl::remove_instance(service_t _service, endpoint* const _endpoint) { std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); @@ -622,6 +684,25 @@ bool endpoint_manager_impl::remove_instance(service_t _service, return (_endpoint->get_use_count() == 0); } +bool endpoint_manager_impl::remove_instance_multicast(service_t _service, + instance_t _instance) { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + auto found_service = service_instances_multicast_.find(_service); + if (found_service != service_instances_multicast_.end()) { + for (auto &its_sender : found_service->second) { + if (its_sender.second == _instance) { + if (found_service->second.erase(its_sender.first)) { + if (!found_service->second.size()) { + service_instances_multicast_.erase(_service); + } + } + return (true); + } + } + } + return (false); +} + void endpoint_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) { // Is called when endpoint->connect succeeded! struct service_info { |