diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_impl.cpp | 187 |
1 files changed, 125 insertions, 62 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 00108a2..e78f400 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -92,7 +92,7 @@ void routing_manager_impl::init() { } void routing_manager_impl::start() { -#ifndef WIN32 +#ifndef _WIN32 netlink_connector_ = std::make_shared<netlink_connector>(host_->get_io(), configuration_->get_unicast_address()); netlink_connector_->register_net_if_changes_handler( @@ -115,9 +115,12 @@ void routing_manager_impl::start() { } #ifndef WITHOUT_SYSTEMD - watchdog_timer_.expires_from_now(std::chrono::seconds(0)); - watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk, - this, std::placeholders::_1)); + { + std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); + watchdog_timer_.expires_from_now(std::chrono::seconds(0)); + watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk, + this, std::placeholders::_1)); + } #endif } @@ -126,14 +129,17 @@ void routing_manager_impl::stop() { std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_); version_log_timer_.cancel(); } -#ifndef WIN32 +#ifndef _WIN32 if (netlink_connector_) { netlink_connector_->stop(); } #endif #ifndef WITHOUT_SYSTEMD - watchdog_timer_.cancel(); + { + std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); + watchdog_timer_.cancel(); + } sd_notify(0, "STOPPING=1"); #endif @@ -251,10 +257,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, auto its_info = find_service(_service, _instance); if (!its_info) { - { - std::lock_guard<std::mutex> ist_lock(requested_services_mutex_); - requested_services_[_client][_service][_instance].insert({ _major, _minor }); - } + requested_service_add(_client, _service, _instance, _major, _minor); if (discovery_) { if (!configuration_->is_local_service(_service, _instance)) { // Non local service instance ~> tell SD to find it! @@ -270,16 +273,14 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, } } else { if ((_major == its_info->get_major() - || DEFAULT_MAJOR == its_info->get_major()) + || DEFAULT_MAJOR == its_info->get_major() + || ANY_MAJOR == _major) && (_minor <= its_info->get_minor() || DEFAULT_MINOR == its_info->get_minor() || _minor == ANY_MINOR)) { if(!its_info->is_local()) { - { - std::lock_guard<std::mutex> ist_lock(requested_services_mutex_); - requested_services_[_client][_service][_instance].insert({ _major, _minor }); - its_info->add_client(_client); - } + requested_service_add(_client, _service, _instance, _major, _minor); + its_info->add_client(_client); find_or_create_remote_client(_service, _instance, true, VSOMEIP_ROUTING_CLIENT); if (_use_exclusive_proxy) { std::shared_ptr<endpoint> its_endpoint = its_info->get_endpoint(true); @@ -310,20 +311,7 @@ void routing_manager_impl::release_service(client_t _client, service_t _service, << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; routing_manager_base::release_service(_client, _service, _instance); - - { - std::lock_guard<std::mutex> its_lock(requested_services_mutex_); - auto its_client = requested_services_.find(_client); - if (its_client != requested_services_.end()) { - auto its_service = its_client->second.find(_service); - if (its_service != its_client->second.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - its_service->second.erase(_instance); - } - } - } - } + requested_service_remove(_client, _service, _instance); std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance)); if(its_info && !its_info->is_local()) { @@ -339,6 +327,7 @@ void routing_manager_impl::release_service(client_t _client, service_t _service, clear_service_info(_service, _instance, false); clear_identified_clients(_service, _instance); clear_identifying_clients( _service, _instance); + unset_all_eventpayloads(_service, _instance); } else { remove_identified_client(_service, _instance, _client); remove_identifying_client(_service, _instance, _client); @@ -462,9 +451,15 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, _eventgroup); if (found_eventgroup != found_instance->second.end()) { found_eventgroup->second.erase(_client); - if (0 == found_eventgroup->second.size()) { + if (!found_eventgroup->second.size()) { last_subscriber_removed = true; - eventgroup_clients_.erase(_eventgroup); + found_instance->second.erase(_eventgroup); + if (!found_service->second.size()) { + found_service->second.erase(_instance); + if (!found_service->second.size()) { + eventgroup_clients_.erase(_service); + } + } } } } @@ -487,6 +482,9 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, } } } + if (last_subscriber_removed) { + unset_all_eventpayloads(_service, _instance, _eventgroup); + } if (subscriber == VSOMEIP_ROUTING_CLIENT && last_subscriber_removed) { // for normal subscribers only unsubscribe via SD if last // subscriber was removed @@ -751,6 +749,27 @@ bool routing_manager_impl::send_to(const std::shared_ptr<endpoint_definition> &_ return false; } +void routing_manager_impl::register_event( + client_t _client, service_t _service, instance_t _instance, + event_t _event, const std::set<eventgroup_t> &_eventgroups, + bool _is_field, std::chrono::milliseconds _cycle, + bool _change_resets_cycle, epsilon_change_func_t _epsilon_change_func, + bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) { + auto its_event = find_event(_service, _instance, _event); + bool is_first(false); + if (its_event && !its_event->has_ref(_client, _is_provided)) { + is_first = true; + } else { + is_first = true; + } + if (is_first) { + routing_manager_base::register_event(_client, _service, _instance, + _event, _eventgroups, _is_field, _cycle, _change_resets_cycle, + _epsilon_change_func, _is_provided, _is_shadow, + _is_cache_placeholder); + } +} + void routing_manager_impl::register_shadow_event(client_t _client, service_t _service, instance_t _instance, event_t _event, const std::set<eventgroup_t> &_eventgroups, @@ -1549,7 +1568,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( its_endpoint->enable_magic_cookies(); } } else { -#ifndef WIN32 +#ifndef _WIN32 if (its_unicast.is_v4()) { its_unicast = boost::asio::ip::address_v4::any(); } else if (its_unicast.is_v6()) { @@ -1929,7 +1948,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( if (found_instance != found_service->second.end()) { for (const auto &major_minor_pair : found_instance->second) { if ((major_minor_pair.first == _major - || _major == DEFAULT_MAJOR) + || _major == DEFAULT_MAJOR + || major_minor_pair.first == ANY_MAJOR) && (major_minor_pair.second <= _minor || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { @@ -1969,7 +1989,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( if (found_instance != found_service->second.end()) { for (const auto &major_minor_pair : found_instance->second) { if ((major_minor_pair.first == _major - || _major == DEFAULT_MAJOR) + || _major == DEFAULT_MAJOR + || major_minor_pair.first == ANY_MAJOR) && (major_minor_pair.second <= _minor || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { @@ -2339,14 +2360,14 @@ void routing_manager_impl::on_subscribe( get_client(_subscriber); } } - stub_->send_subscribe(find_local(_service, _instance), - client, _service, _instance, _eventgroup, its_eventgroup->get_major(), true); // send initial events if we already have a cached field (is_set) for (auto its_event : its_eventgroup->get_events()) { if (its_event->is_field() && its_event->is_set()) { its_event->notify_one(_subscriber, true); // TODO: use _flush parameter to send all event at once } } + stub_->send_subscribe(find_local(_service, _instance), + client, _service, _instance, _eventgroup, its_eventgroup->get_major(), true); } } } @@ -2397,25 +2418,27 @@ void routing_manager_impl::on_subscribe_ack(service_t _service, instance_t _instance, const boost::asio::ip::address &_address, uint16_t _port) { - if (multicast_info.find(_service) != multicast_info.end()) { - if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) { - auto endpoint_def = multicast_info[_service][_instance]; - if (endpoint_def->get_address() == _address && - endpoint_def->get_port() == _port) { - - // Multicast info and endpoint already created before - // This can happen when more than one client subscribe on the same instance! - return; + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + if (multicast_info.find(_service) != multicast_info.end()) { + if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) { + auto endpoint_def = multicast_info[_service][_instance]; + if (endpoint_def->get_address() == _address && + endpoint_def->get_port() == _port) { + + // Multicast info and endpoint already created before + // This can happen when more than one client subscribe on the same instance! + return; + } } } - } - - // Save multicast info to be able to delete the endpoint - // as soon as the instance stops offering its service - std::shared_ptr<endpoint_definition> endpoint_def = - endpoint_definition::get(_address, _port, false); - multicast_info[_service][_instance] = endpoint_def; + // Save multicast info to be able to delete the endpoint + // as soon as the instance stops offering its service + std::shared_ptr<endpoint_definition> endpoint_def = + endpoint_definition::get(_address, _port, false); + multicast_info[_service][_instance] = endpoint_def; + } bool is_someip = configuration_->is_someip(_service, _instance); // Create multicast endpoint & join multicase group @@ -2645,12 +2668,19 @@ void routing_manager_impl::clear_multicast_endpoints(service_t _service, instanc if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) { std::string address = multicast_info[_service][_instance]->get_address().to_string(); uint16_t port = multicast_info[_service][_instance]->get_port(); - auto multicast_endpoint = server_endpoints_[port][false]; - multicast_endpoint->leave(address); - multicast_endpoint->stop(); - server_endpoints_[port].erase(false); - if (server_endpoints_[port].find(true) == server_endpoints_[port].end()) { - server_endpoints_.erase(port); + std::shared_ptr<endpoint> multicast_endpoint; + auto found_port = server_endpoints_.find(port); + if (found_port != server_endpoints_.end()) { + auto found_unreliable = found_port->second.find(false); + if (found_unreliable != found_port->second.end()) { + multicast_endpoint = found_unreliable->second; + multicast_endpoint->leave(address); + multicast_endpoint->stop(); + server_endpoints_[port].erase(false); + } + if (found_port->second.find(true) == found_port->second.end()) { + server_endpoints_.erase(port); + } } multicast_info[_service].erase(_instance); if (0 >= multicast_info[_service].size()) { @@ -2659,7 +2689,7 @@ void routing_manager_impl::clear_multicast_endpoints(service_t _service, instanc // Clear service_instances_ for multicase endpoint if (1 >= service_instances_[_service].size()) { service_instances_.erase(_service); - } else { + } else if (multicast_endpoint) { service_instances_[_service].erase(multicast_endpoint.get()); } } @@ -3032,6 +3062,7 @@ void routing_manager_impl::watchdog_cbk(boost::system::error_code const &_error) } if (has_interval) { + std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); watchdog_timer_.expires_from_now(std::chrono::microseconds(its_interval / 2)); watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk, this, std::placeholders::_1)); @@ -3501,7 +3532,7 @@ void routing_manager_impl::on_net_if_state_changed(std::string _if, bool _availa if (_available) { VSOMEIP_INFO << "Network interface \"" << _if << "\" is up and running."; start_ip_routing(); -#ifndef WIN32 +#ifndef _WIN32 if (netlink_connector_) { netlink_connector_->unregister_net_if_changes_handler(); } @@ -3526,4 +3557,36 @@ void routing_manager_impl::start_ip_routing() { pending_sd_offers_.clear(); } +void routing_manager_impl::requested_service_add(client_t _client, + service_t _service, + instance_t _instance, + major_version_t _major, + minor_version_t _minor) { + std::lock_guard<std::mutex> ist_lock(requested_services_mutex_); + requested_services_[_client][_service][_instance].insert({ _major, _minor }); +} + +void routing_manager_impl::requested_service_remove(client_t _client, + service_t _service, + instance_t _instance) { + std::lock_guard<std::mutex> ist_lock(requested_services_mutex_); + auto found_client = requested_services_.find(_client); + if (found_client != requested_services_.end()) { + auto found_service = found_client->second.find(_service); + if (found_service != found_client->second.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + // delete all requested major/minor versions + found_service->second.erase(_instance); + if (!found_service->second.size()) { + found_client->second.erase(_service); + if (!found_client->second.size()) { + requested_services_.erase(client_); + } + } + } + } + } +} + } // namespace vsomeip |