diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_impl.cpp | 303 |
1 files changed, 169 insertions, 134 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index c1daec7..00108a2 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -107,6 +107,7 @@ void routing_manager_impl::start() { host_->on_state(state_type_e::ST_REGISTERED); if (configuration_->log_version()) { + std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_); version_log_timer_.expires_from_now( std::chrono::seconds(0)); version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk, @@ -121,7 +122,10 @@ void routing_manager_impl::start() { } void routing_manager_impl::stop() { - version_log_timer_.cancel(); + { + std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_); + version_log_timer_.cancel(); + } #ifndef WIN32 if (netlink_connector_) { netlink_connector_->stop(); @@ -425,10 +429,12 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, } } } - std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); - eventgroup_data_t subscription = { _service, _instance, _eventgroup, _major, - _subscription_type}; - pending_subscriptions_.insert(subscription); + { + std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); + eventgroup_data_t subscription = { _service, _instance, _eventgroup, _major, + _subscription_type}; + pending_subscriptions_.insert(subscription); + } } else { VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!"; } @@ -1621,11 +1627,14 @@ void routing_manager_impl::remove_local(client_t _client) { instance_t routing_manager_impl::find_instance(service_t _service, endpoint * _endpoint) { instance_t its_instance(0xFFFF); - auto found_service = service_instances_.find(_service); - if (found_service != service_instances_.end()) { - auto found_endpoint = found_service->second.find(_endpoint); - if (found_endpoint != found_service->second.end()) { - its_instance = found_endpoint->second; + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + auto found_service = service_instances_.find(_service); + if (found_service != service_instances_.end()) { + auto found_endpoint = found_service->second.find(_endpoint); + if (found_endpoint != found_service->second.end()) { + its_instance = found_endpoint->second; + } } } return (its_instance); @@ -1849,48 +1858,51 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( bool is_reliable_known(false); bool is_unreliable_known(false); - auto found_service = remote_service_info_.find(_service); - if (found_service != remote_service_info_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - std::shared_ptr<endpoint_definition> its_definition; - if (_reliable_port != ILLEGAL_PORT) { - auto found_reliable = found_instance->second.find(true); - if (found_reliable != found_instance->second.end()) { - its_definition = found_reliable->second; - if (its_definition->get_address() == _reliable_address - && its_definition->get_port() == _reliable_port) { - is_reliable_known = true; - } else { - VSOMEIP_WARNING << "Reliable service endpoint has changed: [" - << std::hex << std::setfill('0') << std::setw(4) << _service << "." - << std::hex << std::setfill('0') << std::setw(4) << _instance << "." - << std::dec << static_cast<std::uint32_t>(_major) << "." - << std::dec << _minor << "] old: " - << its_definition->get_address().to_string() << ":" - << its_definition->get_port() << " new: " - << _reliable_address.to_string() << ":" - << _reliable_port; + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + auto found_service = remote_service_info_.find(_service); + if (found_service != remote_service_info_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + std::shared_ptr<endpoint_definition> its_definition; + if (_reliable_port != ILLEGAL_PORT) { + auto found_reliable = found_instance->second.find(true); + if (found_reliable != found_instance->second.end()) { + its_definition = found_reliable->second; + if (its_definition->get_address() == _reliable_address + && its_definition->get_port() == _reliable_port) { + is_reliable_known = true; + } else { + VSOMEIP_WARNING << "Reliable service endpoint has changed: [" + << std::hex << std::setfill('0') << std::setw(4) << _service << "." + << std::hex << std::setfill('0') << std::setw(4) << _instance << "." + << std::dec << static_cast<std::uint32_t>(_major) << "." + << std::dec << _minor << "] old: " + << its_definition->get_address().to_string() << ":" + << its_definition->get_port() << " new: " + << _reliable_address.to_string() << ":" + << _reliable_port; + } } } - } - if (_unreliable_port != ILLEGAL_PORT) { - auto found_unreliable = found_instance->second.find(false); - if (found_unreliable != found_instance->second.end()) { - its_definition = found_unreliable->second; - if (its_definition->get_address() == _unreliable_address - && its_definition->get_port() == _unreliable_port) { - is_unreliable_known = true; - } else { - VSOMEIP_WARNING << "Unreliable service endpoint has changed: [" - << std::hex << std::setfill('0') << std::setw(4) << _service << "." - << std::hex << std::setfill('0') << std::setw(4) << _instance << "." - << std::dec << static_cast<std::uint32_t>(_major) << "." - << std::dec << _minor << "] old: " - << its_definition->get_address().to_string() << ":" - << its_definition->get_port() << " new: " - << _unreliable_address.to_string() << ":" - << _unreliable_port; + if (_unreliable_port != ILLEGAL_PORT) { + auto found_unreliable = found_instance->second.find(false); + if (found_unreliable != found_instance->second.end()) { + its_definition = found_unreliable->second; + if (its_definition->get_address() == _unreliable_address + && its_definition->get_port() == _unreliable_port) { + is_unreliable_known = true; + } else { + VSOMEIP_WARNING << "Unreliable service endpoint has changed: [" + << std::hex << std::setfill('0') << std::setw(4) << _service << "." + << std::hex << std::setfill('0') << std::setw(4) << _instance << "." + << std::dec << static_cast<std::uint32_t>(_major) << "." + << std::dec << _minor << "] old: " + << its_definition->get_address().to_string() << ":" + << its_definition->get_port() << " new: " + << _unreliable_address.to_string() << ":" + << _unreliable_port; + } } } } @@ -1901,7 +1913,10 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( if (_reliable_port != ILLEGAL_PORT && !is_reliable_known) { std::shared_ptr<endpoint_definition> endpoint_def = endpoint_definition::get(_reliable_address, _reliable_port, true); - remote_service_info_[_service][_instance][true] = endpoint_def; + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + remote_service_info_[_service][_instance][true] = endpoint_def; + } // check if service was requested and establish TCP connection if necessary { @@ -1933,14 +1948,15 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( } } } - - std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_); - auto found_service2 = specific_endpoint_clients_.find(_service); - if (found_service2 != specific_endpoint_clients_.end()) { - auto found_instance = found_service2->second.find(_instance); - if (found_instance != found_service2->second.end()) { - for (const client_t& c : found_instance->second) { - find_or_create_remote_client(_service, _instance, true, c); + { + std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_); + auto found_service2 = specific_endpoint_clients_.find(_service); + if (found_service2 != specific_endpoint_clients_.end()) { + auto found_instance = found_service2->second.find(_instance); + if (found_instance != found_service2->second.end()) { + for (const client_t& c : found_instance->second) { + find_or_create_remote_client(_service, _instance, true, c); + } } } } @@ -1984,7 +2000,10 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( if (_unreliable_port != ILLEGAL_PORT && !is_unreliable_known) { std::shared_ptr<endpoint_definition> endpoint_def = endpoint_definition::get(_unreliable_address, _unreliable_port, false); - remote_service_info_[_service][_instance][false] = endpoint_def; + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + remote_service_info_[_service][_instance][false] = endpoint_def; + } if (!is_reliable_known) { on_availability(_service, _instance, true, _major, _minor); stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, _major, _minor); @@ -2066,7 +2085,7 @@ std::chrono::milliseconds routing_manager_impl::update_routing_info(std::chrono: for (auto &s : get_services()) { for (auto &i : s.second) { - if (find_local_client(s.first, i.first) != VSOMEIP_ROUTING_CLIENT) { + if (i.second->is_local()) { continue; //don't expire local services } ttl_t its_ttl = i.second->get_ttl(); @@ -2403,7 +2422,10 @@ void routing_manager_impl::on_subscribe_ack(service_t _service, std::shared_ptr<endpoint> its_endpoint = find_or_create_server_endpoint(_port, false, is_someip); if (its_endpoint) { - service_instances_[_service][its_endpoint.get()] = _instance; + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + service_instances_[_service][its_endpoint.get()] = _instance; + } its_endpoint->join(_address.to_string()); } else { VSOMEIP_ERROR<<"Could not find/create multicast endpoint!"; @@ -2458,101 +2480,109 @@ void routing_manager_impl::on_subscribe_nack(client_t _client, bool routing_manager_impl::deliver_specific_endpoint_message(service_t _service, instance_t _instance, const byte_t *_data, length_t _size, endpoint *_receiver) { + client_t its_client(0x0); + // Try to deliver specific endpoint message (for selective subscribers) - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - auto found_servic = remote_services_.find(_service); - if (found_servic != remote_services_.end()) { - auto found_instance = found_servic->second.find(_instance); - if (found_instance != found_servic->second.end()) { - for (auto client_entry : found_instance->second) { - client_t client = client_entry.first; - if (!client) { - continue; - } - auto found_reliability = client_entry.second.find(_receiver->is_reliable()); - if (found_reliability != client_entry.second.end()) { - auto found_enpoint = found_reliability->second; - if (found_enpoint.get() == _receiver) { - if (client != get_client()) { - auto local_endpoint = find_local(client); - if (local_endpoint) { - send_local(local_endpoint, client, _data, _size, _instance, true, - _receiver->is_reliable(), VSOMEIP_SEND); - } - } else { - deliver_message(_data, _size, _instance, _receiver->is_reliable()); + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + auto found_servic = remote_services_.find(_service); + if (found_servic != remote_services_.end()) { + auto found_instance = found_servic->second.find(_instance); + if (found_instance != found_servic->second.end()) { + for (auto client_entry : found_instance->second) { + if (!client_entry.first) { + continue; + } + auto found_reliability = client_entry.second.find(_receiver->is_reliable()); + if (found_reliability != client_entry.second.end()) { + auto found_enpoint = found_reliability->second; + if (found_enpoint.get() == _receiver) { + its_client = client_entry.first; + break; } - return true; } } } } } + if (its_client) { + if (its_client != get_client()) { + auto local_endpoint = find_local(its_client); + if (local_endpoint) { + send_local(local_endpoint, its_client, _data, _size, _instance, true, + _receiver->is_reliable(), VSOMEIP_SEND); + } + } else { + deliver_message(_data, _size, _instance, _receiver->is_reliable()); + } + return true; + } return false; } void routing_manager_impl::clear_client_endpoints(service_t _service, instance_t _instance, bool _reliable) { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - std::shared_ptr<endpoint> deleted_endpoint; - // Clear client endpoints for remote services (generic and specific ones) - if (remote_services_.find(_service) != remote_services_.end()) { - if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) { - auto endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT][_reliable]; - if (endpoint) { - service_instances_[_service].erase(endpoint.get()); - deleted_endpoint = endpoint; - } - remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].erase(_reliable); - auto found_endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].find( - !_reliable); - if (found_endpoint == remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].end()) { - remote_services_[_service][_instance].erase(VSOMEIP_ROUTING_CLIENT); - } - } - } + std::unordered_set<client_t> its_specific_endpoint_clients; { std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_); auto found_service = specific_endpoint_clients_.find(_service); if(found_service != specific_endpoint_clients_.end()){ auto found_instance = found_service->second.find(_instance); if(found_instance != found_service->second.end()) { - for (const client_t& client : found_instance->second) { - if (remote_services_.find(_service) != remote_services_.end()) { - if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) { - auto endpoint = remote_services_[_service][_instance][client][_reliable]; - if (endpoint) { - service_instances_[_service].erase(endpoint.get()); - endpoint->stop(); - } - remote_services_[_service][_instance][client].erase(_reliable); - auto found_endpoint = remote_services_[_service][_instance][client].find(!_reliable); - if (found_endpoint == remote_services_[_service][_instance][client].end()) { - remote_services_[_service][_instance].erase(client); - } - } + its_specific_endpoint_clients = found_instance->second; + } + } + } + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + std::shared_ptr<endpoint> deleted_endpoint; + // Clear client endpoints for remote services (generic and specific ones) + if (remote_services_.find(_service) != remote_services_.end()) { + if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) { + auto endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT][_reliable]; + if (endpoint) { + service_instances_[_service].erase(endpoint.get()); + deleted_endpoint = endpoint; + } + remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].erase(_reliable); + auto found_endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].find( + !_reliable); + if (found_endpoint == remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].end()) { + remote_services_[_service][_instance].erase(VSOMEIP_ROUTING_CLIENT); + } + // erase specific client endpoints + for (const client_t &client : its_specific_endpoint_clients) { + auto endpoint = remote_services_[_service][_instance][client][_reliable]; + if (endpoint) { + service_instances_[_service].erase(endpoint.get()); + endpoint->stop(); + } + remote_services_[_service][_instance][client].erase(_reliable); + auto found_endpoint = remote_services_[_service][_instance][client].find(!_reliable); + if (found_endpoint == remote_services_[_service][_instance][client].end()) { + remote_services_[_service][_instance].erase(client); } } } } - } - if (remote_services_.find(_service) != remote_services_.end()) { - if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) { - if (!remote_services_[_service][_instance].size()) { - remote_services_[_service].erase(_instance); - if (0 >= remote_services_[_service].size()) { - remote_services_.erase(_service); + if (remote_services_.find(_service) != remote_services_.end()) { + if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) { + if (!remote_services_[_service][_instance].size()) { + remote_services_[_service].erase(_instance); + if (0 >= remote_services_[_service].size()) { + remote_services_.erase(_service); + } } } } - } - if (!service_instances_[_service].size()) { - service_instances_.erase(_service); - } - if(deleted_endpoint) { - stop_and_delete_client_endpoint(deleted_endpoint); + if (!service_instances_[_service].size()) { + service_instances_.erase(_service); + } + if(deleted_endpoint) { + stop_and_delete_client_endpoint(deleted_endpoint); + } } } @@ -2828,6 +2858,7 @@ bool routing_manager_impl::send_identify_message(client_t _client, bool routing_manager_impl::supports_selective(service_t _service, instance_t _instance) { bool supports_selective(false); + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); auto its_service = remote_service_info_.find(_service); if (its_service != remote_service_info_.end()) { auto its_instance = its_service->second.find(_instance); @@ -2973,10 +3004,13 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const #endif VSOMEIP_INFO << "vSomeIP " << VSOMEIP_VERSION; - version_log_timer_.expires_from_now( - std::chrono::seconds(configuration_->get_log_version_interval())); - version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk, - this, std::placeholders::_1)); + { + std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_); + version_log_timer_.expires_from_now( + std::chrono::seconds(configuration_->get_log_version_interval())); + version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk, + this, std::placeholders::_1)); + } } } @@ -3008,6 +3042,7 @@ void routing_manager_impl::watchdog_cbk(boost::system::error_code const &_error) void routing_manager_impl::clear_remote_service_info(service_t _service, instance_t _instance, bool _reliable) { // Clear remote_service_info_ + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); if (remote_service_info_.find(_service) != remote_service_info_.end()) { if (remote_service_info_[_service].find(_instance) != remote_service_info_[_service].end()) { remote_service_info_[_service][_instance].erase(_reliable); |