diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_impl.cpp | 3714 |
1 files changed, 1392 insertions, 2322 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index f8dbd14..48e2cb5 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2018 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -15,25 +15,23 @@ #include <time.h> #endif -#ifndef WITHOUT_SYSTEMD -#include <systemd/sd-daemon.h> -#endif - #include <boost/asio/steady_timer.hpp> #include <vsomeip/constants.hpp> -#include <vsomeip/message.hpp> #include <vsomeip/payload.hpp> #include <vsomeip/runtime.hpp> +#include <vsomeip/internal/logger.hpp> #include "../include/event.hpp" #include "../include/eventgroupinfo.hpp" +#include "../include/remote_subscription.hpp" #include "../include/routing_manager_host.hpp" #include "../include/routing_manager_impl.hpp" #include "../include/routing_manager_stub.hpp" #include "../include/serviceinfo.hpp" #include "../../configuration/include/configuration.hpp" -#include "../../configuration/include/internal.hpp" +#include "../../security/include/security.hpp" + #include "../../endpoints/include/endpoint_definition.hpp" #include "../../endpoints/include/local_client_endpoint_impl.hpp" #include "../../endpoints/include/tcp_client_endpoint_impl.hpp" @@ -41,32 +39,38 @@ #include "../../endpoints/include/udp_client_endpoint_impl.hpp" #include "../../endpoints/include/udp_server_endpoint_impl.hpp" #include "../../endpoints/include/virtual_server_endpoint_impl.hpp" -#include "../../logging/include/logger.hpp" #include "../../message/include/deserializer.hpp" +#include "../../message/include/message_impl.hpp" #include "../../message/include/serializer.hpp" #include "../../service_discovery/include/constants.hpp" #include "../../service_discovery/include/defines.hpp" #include "../../service_discovery/include/runtime.hpp" -#include "../../service_discovery/include/service_discovery_impl.hpp" +#include "../../service_discovery/include/service_discovery.hpp" #include "../../utility/include/byteorder.hpp" #include "../../utility/include/utility.hpp" -#include "../../plugin/include/plugin_manager.hpp" +#include "../../plugin/include/plugin_manager_impl.hpp" #ifdef USE_DLT #include "../../tracing/include/connector_impl.hpp" #endif +#ifndef ANDROID #include "../../e2e_protection/include/buffer/buffer.hpp" #include "../../e2e_protection/include/e2exf/config.hpp" -#include "../../e2e_protection/include/e2e/profile/profile01/profile_01.hpp" -#include "../../e2e_protection/include/e2e/profile/profile01/protector.hpp" -#include "../../e2e_protection/include/e2e/profile/profile01/checker.hpp" +#include "../../e2e_protection/include/e2e/profile/e2e_provider.hpp" +#endif -#include "../../e2e_protection/include/e2e/profile/profile_custom/profile_custom.hpp" -#include "../../e2e_protection/include/e2e/profile/profile_custom/protector.hpp" -#include "../../e2e_protection/include/e2e/profile/profile_custom/checker.hpp" +#ifdef USE_DLT +#include "../../tracing/include/connector_impl.hpp" +#endif -namespace vsomeip { +namespace vsomeip_v3 { + +#ifdef ANDROID +namespace sd { +runtime::~runtime() {} +} +#endif routing_manager_impl::routing_manager_impl(routing_manager_host *_host) : routing_manager_base(_host), @@ -74,11 +78,9 @@ routing_manager_impl::routing_manager_impl(routing_manager_host *_host) : if_state_running_(false), sd_route_set_(false), routing_running_(false), -#ifndef WITHOUT_SYSTEMD - watchdog_timer_(_host->get_io()), -#endif status_log_timer_(_host->get_io()), memory_log_timer_(_host->get_io()), + ep_mgr_impl_(std::make_shared<endpoint_manager_impl>(this, io_, configuration_)), pending_remote_offer_id_(0), last_resume_(std::chrono::steady_clock::now().min()), pending_security_update_id_(0) @@ -86,6 +88,8 @@ routing_manager_impl::routing_manager_impl(routing_manager_host *_host) : } routing_manager_impl::~routing_manager_impl() { + utility::remove_lockfile(configuration_); + utility::reset_client_ids(); } boost::asio::io_service & routing_manager_impl::get_io() { @@ -100,22 +104,23 @@ std::set<client_t> routing_manager_impl::find_local_clients(service_t _service, return routing_manager_base::find_local_clients(_service, _instance); } -bool routing_manager_impl::is_subscribe_to_any_event_allowed(client_t _client, +client_t routing_manager_impl::find_local_client(service_t _service, instance_t _instance) { + return routing_manager_base::find_local_client(_service, _instance); +} + +bool routing_manager_impl::is_subscribe_to_any_event_allowed(credentials_t _credentials, client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup) { - return routing_manager_base::is_subscribe_to_any_event_allowed(_client, + return routing_manager_base::is_subscribe_to_any_event_allowed(_credentials, _client, _service, _instance, _eventgroup); } void routing_manager_impl::init() { - routing_manager_base::init(); + routing_manager_base::init(ep_mgr_impl_); // TODO: Only instantiate the stub if needed stub_ = std::make_shared<routing_manager_stub>(this, configuration_); stub_->init(); - // We need to be able to send messages to ourself (for delivering events) - (void)create_local(VSOMEIP_ROUTING_CLIENT); - if (configuration_->is_sd_enabled()) { VSOMEIP_INFO<< "Service Discovery enabled. Trying to load module."; auto its_plugin = plugin_manager::get()->get_plugin( @@ -130,34 +135,29 @@ void routing_manager_impl::init() { } } +#ifndef ANDROID if( configuration_->is_e2e_enabled()) { VSOMEIP_INFO << "E2E protection enabled."; + + const char *its_e2e_module = getenv(VSOMEIP_ENV_E2E_PROTECTION_MODULE); + std::string plugin_name = its_e2e_module != nullptr ? its_e2e_module : VSOMEIP_E2E_LIBRARY; + + auto its_plugin = plugin_manager::get()->get_plugin(plugin_type_e::APPLICATION_PLUGIN, plugin_name); + if (its_plugin) { + VSOMEIP_INFO << "E2E module loaded."; + e2e_provider_ = std::dynamic_pointer_cast<e2e::e2e_provider>(its_plugin); + } + } + + if(e2e_provider_) { std::map<e2exf::data_identifier_t, std::shared_ptr<cfg::e2e>> its_e2e_configuration = configuration_->get_e2e_configuration(); for (auto &identifier : its_e2e_configuration) { - auto its_cfg = identifier.second; - if(its_cfg->profile == "CRC8") { - e2exf::data_identifier_t its_data_identifier = {its_cfg->service_id, its_cfg->event_id}; - e2e::profile01::profile_config its_profile_config = e2e::profile01::profile_config(its_cfg->crc_offset, its_cfg->data_id, - (e2e::profile01::p01_data_id_mode) its_cfg->data_id_mode, its_cfg->data_length, its_cfg->counter_offset, its_cfg->data_id_nibble_offset); - if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) { - custom_protectors[its_data_identifier] = std::make_shared<e2e::profile01::protector>(its_profile_config); - } - if ((its_cfg->variant == "checker") || (its_cfg->variant == "both")) { - custom_checkers[its_data_identifier] = std::make_shared<e2e::profile01::profile_01_checker>(its_profile_config); - } - } else if(its_cfg->profile == "CRC32") { - e2exf::data_identifier_t its_data_identifier = {its_cfg->service_id, its_cfg->event_id}; - e2e::profile_custom::profile_config its_profile_config = e2e::profile_custom::profile_config(its_cfg->crc_offset); - - if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) { - custom_protectors[its_data_identifier] = std::make_shared<e2e::profile_custom::protector>(its_profile_config); - } - if ((its_cfg->variant == "checker") || (its_cfg->variant == "both")) { - custom_checkers[its_data_identifier] = std::make_shared<e2e::profile_custom::profile_custom_checker>(its_profile_config); - } + if(!e2e_provider_->add_configuration(identifier.second)) { + VSOMEIP_INFO << "Unknown E2E profile: " << identifier.second->profile << ", skipping ..."; } } } +#endif } void routing_manager_impl::start() { @@ -186,15 +186,6 @@ void routing_manager_impl::start() { version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk, this, std::placeholders::_1)); } - -#ifndef WITHOUT_SYSTEMD - { - std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); - watchdog_timer_.expires_from_now(std::chrono::seconds(0)); - watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk, - this, std::placeholders::_1)); - } -#endif #ifndef _WIN32 if (configuration_->log_memory()) { std::lock_guard<std::mutex> its_lock(memory_log_timer_mutex_); @@ -216,6 +207,26 @@ void routing_manager_impl::start() { } void routing_manager_impl::stop() { + // Ensure to StopOffer all services that are offered by the application hosting the rm + local_services_map_t its_services; + { + std::lock_guard<std::mutex> its_lock(local_services_mutex_); + for (const auto& s : local_services_) { + for (const auto& i : s.second) { + if (std::get<2>(i.second) == client_) { + its_services[s.first][i.first] = i.second; + } + } + } + + } + for (const auto& s : its_services) { + for (const auto& i : s.second) { + on_stop_offer_service(std::get<2>(i.second), s.first, i.first, + std::get<0>(i.second), std::get<1>(i.second)); + } + } + { std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_); version_log_timer_.cancel(); @@ -236,37 +247,119 @@ void routing_manager_impl::stop() { boost::system::error_code ec; status_log_timer_.cancel(ec); } -#ifndef WITHOUT_SYSTEMD - { - std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); - watchdog_timer_.cancel(); - } - sd_notify(0, "STOPPING=1"); - VSOMEIP_INFO << "Sent STOPPING to systemd watchdog"; -#endif - host_->on_state(state_type_e::ST_DEREGISTERED); if (discovery_) discovery_->stop(); stub_->stop(); - for (auto client: get_connected_clients()) { + for (auto client: ep_mgr_->get_connected_clients()) { if (client != VSOMEIP_ROUTING_CLIENT) { remove_local(client, true); } } } -bool routing_manager_impl::offer_service(client_t _client, service_t _service, - instance_t _instance, major_version_t _major, minor_version_t _minor) { +bool routing_manager_impl::insert_offer_command(service_t _service, instance_t _instance, uint8_t _command, + client_t _client, major_version_t _major, minor_version_t _minor) { + std::lock_guard<std::mutex> its_lock(offer_serialization_mutex_); + // flag to indicate whether caller of this function can start directly processing the command + bool must_process(false); + auto found_service_instance = offer_commands_.find(std::make_pair(_service, _instance)); + if (found_service_instance != offer_commands_.end()) { + // if nothing is queued + if (found_service_instance->second.empty()) { + must_process = true; + } + found_service_instance->second.push_back( + std::make_tuple(_command, _client, _major, _minor)); + } else { + // nothing is queued -> add command to queue and process command directly + offer_commands_[std::make_pair(_service, _instance)].push_back( + std::make_tuple(_command, _client, _major, _minor)); + must_process = true; + } + return must_process; +} + +bool routing_manager_impl::erase_offer_command(service_t _service, instance_t _instance) { + std::lock_guard<std::mutex> its_lock(offer_serialization_mutex_); + auto found_service_instance = offer_commands_.find(std::make_pair(_service, _instance)); + if (found_service_instance != offer_commands_.end()) { + // erase processed command + if (!found_service_instance->second.empty()) { + found_service_instance->second.pop_front(); + if (!found_service_instance->second.empty()) { + // check for other commands to be processed + auto its_command = found_service_instance->second.front(); + if (std::get<0>(its_command) == VSOMEIP_OFFER_SERVICE) { + io_.post([&, its_command, _service, _instance](){ + offer_service(std::get<1>(its_command), _service, _instance, + std::get<2>(its_command), std::get<3>(its_command), false); + }); + } else { + io_.post([&, its_command, _service, _instance](){ + stop_offer_service(std::get<1>(its_command), _service, _instance, + std::get<2>(its_command), std::get<3>(its_command), false); + }); + } + } + } + } + return true; +} + +bool routing_manager_impl::offer_service(client_t _client, + service_t _service, instance_t _instance, + major_version_t _major, minor_version_t _minor) { + + return offer_service(_client, _service, _instance, _major, _minor, true); +} + +bool routing_manager_impl::offer_service(client_t _client, + service_t _service, instance_t _instance, + major_version_t _major, minor_version_t _minor, + bool _must_queue) { + VSOMEIP_INFO << "OFFER(" << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance - << ":" << std::dec << int(_major) << "." << std::dec << _minor << "]"; + << ":" << std::dec << int(_major) << "." << std::dec << _minor << "]" + << " (" << _must_queue << ")"; + + // only queue commands if method was NOT called via erase_offer_command() + if (_must_queue) { + if (!insert_offer_command(_service, _instance, VSOMEIP_OFFER_SERVICE, + _client, _major, _minor)) { + return false; + } + } + + // Check if the application hosted by routing manager is allowed to offer + // offer_service requests of local proxies are checked in rms::on:message + if (_client == get_client()) { +#ifdef _WIN32 + std::uint32_t its_routing_uid = ANY_UID; + std::uint32_t its_routing_gid = ANY_GID; +#else + std::uint32_t its_routing_uid = getuid(); + std::uint32_t its_routing_gid = getgid(); +#endif + if (!security::get()->is_offer_allowed(its_routing_uid, its_routing_gid, + _client, _service, _instance)) { + VSOMEIP_WARNING << "routing_manager_impl::offer_service: " + << std::hex << "Security: Client 0x" << _client + << " isn't allowed to offer the following service/instance " + << _service << "/" << _instance + << " ~> Skip offer!"; + erase_offer_command(_service, _instance); + return false; + } + } - if(!handle_local_offer_service(_client, _service, _instance, _major, _minor)) { + if (!handle_local_offer_service(_client, _service, _instance, _major, _minor)) { + erase_offer_command(_service, _instance); return false; } @@ -282,7 +375,7 @@ bool routing_manager_impl::offer_service(client_t _client, service_t _service, if (discovery_) { std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance); if (its_info) { - discovery_->offer_service(_service, _instance, its_info); + discovery_->offer_service(its_info); } } @@ -290,16 +383,23 @@ bool routing_manager_impl::offer_service(client_t _client, service_t _service, std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); std::set<event_t> its_already_subscribed_events; for (auto &ps : pending_subscriptions_) { - if (ps.service_ == _service && - ps.instance_ == _instance && ps.major_ == _major) { + if (ps.service_ == _service + && ps.instance_ == _instance + && ps.major_ == _major) { insert_subscription(ps.service_, ps.instance_, ps.eventgroup_, ps.event_, client_, &its_already_subscribed_events); - } + VSOMEIP_ERROR << __func__ + << ": event=" + << std::hex << ps.service_ << "." + << std::hex << ps.instance_ << "." + << std::hex << ps.event_; } } + send_pending_subscriptions(_service, _instance, _major); } stub_->on_offer_service(_client, _service, _instance, _major, _minor); on_availability(_service, _instance, true, _major, _minor); + erase_offer_command(_service, _instance); return true; } @@ -307,11 +407,28 @@ void routing_manager_impl::stop_offer_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor) { + stop_offer_service(_client, _service, _instance, _major, _minor, true); +} + +void routing_manager_impl::stop_offer_service(client_t _client, + service_t _service, instance_t _instance, + major_version_t _major, minor_version_t _minor, + bool _must_queue) { + VSOMEIP_INFO << "STOP OFFER(" << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance - << ":" << std::dec << int(_major) << "." << _minor << "]"; + << ":" << std::dec << int(_major) << "." << _minor << "]" + << " (" << _must_queue << ")"; + + if (_must_queue) { + if (!insert_offer_command(_service, _instance, VSOMEIP_STOP_OFFER_SERVICE, + _client, _major, _minor)) { + return; + } + } + bool is_local(false); { std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance); @@ -340,12 +457,12 @@ void routing_manager_impl::stop_offer_service(client_t _client, << std::hex << std::setw(4) << std::setfill('0') << _instance << ":" << std::dec << int(_major) << "." << _minor << "] " << "for remote service --> ignore"; + erase_offer_command(_service, _instance); } } void routing_manager_impl::request_service(client_t _client, service_t _service, - instance_t _instance, major_version_t _major, minor_version_t _minor, - bool _use_exclusive_proxy) { + instance_t _instance, major_version_t _major, minor_version_t _minor) { VSOMEIP_INFO << "REQUEST(" << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" @@ -353,8 +470,8 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, << std::hex << std::setw(4) << std::setfill('0') << _instance << ":" << std::dec << int(_major) << "." << std::dec << _minor << "]"; - routing_manager_base::request_service(_client, _service, _instance, _major, - _minor, _use_exclusive_proxy); + routing_manager_base::request_service(_client, + _service, _instance, _major, _minor); auto its_info = find_service(_service, _instance); if (!its_info) { @@ -381,27 +498,22 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, || _minor == ANY_MINOR)) { if(!its_info->is_local()) { requested_service_add(_client, _service, _instance, _major, _minor); - its_info->add_client(_client); - find_or_create_remote_client(_service, _instance, true, VSOMEIP_ROUTING_CLIENT); - if (_use_exclusive_proxy) { - std::shared_ptr<endpoint> its_endpoint = its_info->get_endpoint(true); - if(its_endpoint) { - find_or_create_remote_client(_service, _instance, true, _client); - } + if (discovery_) { + // Non local service instance ~> tell SD to find it! + discovery_->request_service(_service, _instance, _major, + _minor, DEFAULT_TTL); } + its_info->add_client(_client); + ep_mgr_impl_->find_or_create_remote_client( + _service, _instance, true, VSOMEIP_ROUTING_CLIENT); } } } - if (_use_exclusive_proxy) { - std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_); - specific_endpoint_clients_[_service][_instance].insert(_client); - } - if (_client == get_client()) { stub_->create_local_receiver(); - service_data_t request = { _service, _instance, _major, _minor, _use_exclusive_proxy }; + service_data_t request = { _service, _instance, _major, _minor }; std::set<service_data_t> requests; requests.insert(request); stub_->handle_requests(_client, requests); @@ -424,36 +536,28 @@ void routing_manager_impl::release_service(client_t _client, service_t _service, requested_service_remove(_client, _service, _instance); std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance)); - if(its_info && !its_info->is_local()) { - unsubscribe_specific_client_at_sd(_service, _instance, _client); - if(!its_info->get_requesters_size()) { - if(discovery_) { + if (its_info && !its_info->is_local()) { + if (!its_info->get_requesters_size()) { + if (discovery_) { discovery_->release_service(_service, _instance); - discovery_->unsubscribe_client(_service, _instance, VSOMEIP_ROUTING_CLIENT); + discovery_->unsubscribe_all(_service, _instance); } - clear_client_endpoints(_service, _instance, true); - clear_client_endpoints(_service, _instance, false); + ep_mgr_impl_->clear_client_endpoints(_service, _instance, true); + ep_mgr_impl_->clear_client_endpoints(_service, _instance, false); its_info->set_endpoint(nullptr, true); its_info->set_endpoint(nullptr, false); - clear_identified_clients(_service, _instance); - clear_identifying_clients( _service, _instance); unset_all_eventpayloads(_service, _instance); - } else { - remove_identified_client(_service, _instance, _client); - remove_identifying_client(_service, _instance, _client); - remove_specific_client_endpoint(_service, _instance, _client, true); - remove_specific_client_endpoint(_service, _instance, _client, false); } } else { - if(discovery_) { + if (discovery_) { discovery_->release_service(_service, _instance); } } } -void routing_manager_impl::subscribe(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, - event_t _event, subscription_type_e _subscription_type) { +void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid, + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + major_version_t _major, event_t _event) { VSOMEIP_INFO << "SUBSCRIBE(" << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" @@ -464,14 +568,16 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, << std::dec << (uint16_t)_major << "]"; const client_t its_local_client = find_local_client(_service, _instance); if (get_client() == its_local_client) { +#ifdef VSOMEIP_ENABLE_COMPAT routing_manager_base::set_incoming_subscription_state(_client, _service, _instance, _eventgroup, _event, subscription_state_e::IS_SUBSCRIBING); +#endif auto self = shared_from_this(); - host_->on_subscription(_service, _instance, _eventgroup, _client, true, - [this, self, _client, _service, _instance, _eventgroup, - _event, _major, _subscription_type] + host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, true, + [this, self, _client, _uid, _gid, _service, _instance, _eventgroup, + _event, _major] (const bool _subscription_accepted) { - (void) find_or_create_local(_client); + (void) ep_mgr_->find_or_create_local(_client); if (!_subscription_accepted) { stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup, _event); VSOMEIP_INFO << "Subscription request from client: 0x" << std::hex @@ -481,58 +587,52 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, } else { stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event); } - routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _event, _subscription_type); + routing_manager_base::subscribe(_client, _uid, _gid, _service, _instance, _eventgroup, _major, _event); +#ifdef VSOMEIP_ENABLE_COMPAT send_pending_notify_ones(_service, _instance, _eventgroup, _client); routing_manager_base::erase_incoming_subscription_state(_client, _service, _instance, _eventgroup, _event); +#endif }); } else { if (discovery_) { - client_t subscriber = VSOMEIP_ROUTING_CLIENT; - if (0 == its_local_client) { - subscriber = is_specific_endpoint_client(_client, _service, _instance); - if (subscriber != VSOMEIP_ROUTING_CLIENT) { - if (supports_selective(_service, _instance)) { - identify_for_subscribe(_client, _service, _instance, - _major, _subscription_type); - } else { - VSOMEIP_INFO << "Subcribe to legacy selective service: " << std::hex - << _service << ":" << _instance << "."; - } - } - } - std::unique_lock<std::mutex> eventgroup_lock; - auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); - if (its_eventgroup) { - eventgroup_lock = its_eventgroup->get_subscription_lock(); - } std::set<event_t> its_already_subscribed_events; + + // Note: The calls to insert_subscription & handle_subscription_state must not + // run concurrently to a call to on_subscribe_ack. Therefore the lock is acquired + // before calling insert_subscription and released after the call to + // handle_subscription_state. + std::unique_lock<std::mutex> its_critical(remote_subscription_state_mutex_); bool inserted = insert_subscription(_service, _instance, _eventgroup, _event, _client, &its_already_subscribed_events); if (inserted) { if (0 == its_local_client) { handle_subscription_state(_client, _service, _instance, _eventgroup, _event); - if (its_eventgroup) { - eventgroup_lock.unlock(); - } + its_critical.unlock(); static const ttl_t configured_ttl(configuration_->get_sd_ttl()); notify_one_current_value(_client, _service, _instance, _eventgroup, _event, its_already_subscribed_events); - discovery_->subscribe(_service, _instance, _eventgroup, - _major, configured_ttl, subscriber, _subscription_type); + + auto its_info = find_eventgroup(_service, _instance, _eventgroup); + if (its_info) { + discovery_->subscribe(_service, _instance, _eventgroup, + _major, configured_ttl, + its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT); + } } else { + its_critical.unlock(); if (is_available(_service, _instance, _major)) { - stub_->send_subscribe(find_local(_service, _instance), - _client, _service, _instance, _eventgroup, _major, _event, DEFAULT_SUBSCRIPTION); + stub_->send_subscribe(ep_mgr_->find_local(_service, _instance), + _client, _service, _instance, _eventgroup, _major, _event, + PENDING_SUBSCRIPTION_ID); } } - } else if (its_eventgroup) { - eventgroup_lock.unlock(); } if (get_client() == _client) { std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); - subscription_data_t subscription = { _service, _instance, _eventgroup, _major, - _event, _subscription_type}; + subscription_data_t subscription = { + _service, _instance, _eventgroup, _major, _event, _uid, _gid + }; pending_subscriptions_.insert(subscription); } } else { @@ -541,8 +641,8 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, } } -void routing_manager_impl::unsubscribe(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, event_t _event) { +void routing_manager_impl::unsubscribe(client_t _client, uid_t _uid, gid_t _gid, + service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event) { VSOMEIP_INFO << "UNSUBSCRIBE(" << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" @@ -552,14 +652,16 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, << std::hex << std::setw(4) << std::setfill('0') << _event << "]"; bool last_subscriber_removed(true); + + auto its_event = find_event(_service, _instance, _event); std::shared_ptr<eventgroupinfo> its_info = find_eventgroup(_service, _instance, _eventgroup); if (its_info) { - for (auto e : its_info->get_events()) { + for (const auto& e : its_info->get_events()) { if (e->get_event() == _event || ANY_EVENT == _event) e->remove_subscriber(_eventgroup, _client); } - for (auto e : its_info->get_events()) { + for (const auto& e : its_info->get_events()) { if (e->has_subscriber(_eventgroup, ANY_CLIENT)) { last_subscriber_removed = false; break; @@ -568,40 +670,36 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, } if (discovery_) { - host_->on_subscription(_service, _instance, _eventgroup, _client, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; }); + host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, false, + [](const bool _subscription_accepted){ (void)_subscription_accepted; }); if (0 == find_local_client(_service, _instance)) { - client_t subscriber = is_specific_endpoint_client(_client, _service, _instance); if (last_subscriber_removed) { unset_all_eventpayloads(_service, _instance, _eventgroup); - } - if (subscriber == VSOMEIP_ROUTING_CLIENT && last_subscriber_removed) { { - auto tuple = std::make_tuple(_service, _instance, _eventgroup, subscriber); + auto tuple = std::make_tuple(_service, _instance, _eventgroup, _client); std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); remote_subscription_state_.erase(tuple); } - // for normal subscribers only unsubscribe via SD if last - // subscriber was removed - discovery_->unsubscribe(_service, _instance, _eventgroup, subscriber); - } else if (subscriber != VSOMEIP_ROUTING_CLIENT) { - { - auto tuple = std::make_tuple(_service, _instance, _eventgroup, subscriber); - std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); - remote_subscription_state_.erase(tuple); + } + + if (last_subscriber_removed + || (its_event && its_event->get_type() == event_type_e::ET_SELECTIVE_EVENT)) { + if (its_info) { + discovery_->unsubscribe(_service, _instance, _eventgroup, + its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT); } - // for selective subscribers always unsubscribe at the SD - discovery_->unsubscribe(_service, _instance, _eventgroup, subscriber); } } else { if (get_client() == _client) { std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); remove_pending_subscription(_service, _instance, _eventgroup, _event); - stub_->send_unsubscribe(find_local(_service, _instance), + stub_->send_unsubscribe( + ep_mgr_->find_local(_service, _instance), _client, _service, _instance, _eventgroup, _event, - DEFAULT_SUBSCRIPTION); + PENDING_SUBSCRIPTION_ID); } } - clear_multicast_endpoints(_service, _instance); + ep_mgr_impl_->clear_multicast_endpoints(_service, _instance); } else { VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!"; @@ -609,18 +707,21 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, } bool routing_manager_impl::send(client_t _client, - std::shared_ptr<message> _message, bool _flush) { - return routing_manager_base::send(_client, _message, _flush); + std::shared_ptr<message> _message) { + return routing_manager_base::send(_client, _message); } bool routing_manager_impl::send(client_t _client, const byte_t *_data, - length_t _size, instance_t _instance, bool _flush, bool _reliable, - client_t _bound_client, bool _is_valid_crc, bool _sent_from_remote) { + length_t _size, instance_t _instance, bool _reliable, + client_t _bound_client, + credentials_t _credentials, + uint8_t _status_check, bool _sent_from_remote) { bool is_sent(false); if (_size > VSOMEIP_MESSAGE_TYPE_POS) { std::shared_ptr<endpoint> its_target; bool is_request = utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS]); bool is_notification = utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS]); + bool is_response = utility::is_response(_data[VSOMEIP_MESSAGE_TYPE_POS]); client_t its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]); @@ -629,21 +730,14 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, method_t its_method = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); - bool is_service_discovery = (its_service == vsomeip::sd::service - && its_method == vsomeip::sd::method); - -#ifdef USE_DLT - bool is_response(false); -#endif + bool is_service_discovery + = (its_service == sd::service && its_method == sd::method); if (is_request) { - its_target = find_local(its_service, _instance); + its_target = ep_mgr_->find_local(its_service, _instance); } else if (!is_notification) { -#ifdef USE_DLT - is_response = true; -#endif its_target = find_local(its_client); - } else if (is_notification && _client) { // Selective notifications! + } else if (is_notification && _client && !is_service_discovery) { // Selective notifications! if (_client == get_client()) { #ifdef USE_DLT const uint16_t its_data_size @@ -654,7 +748,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); #endif - deliver_message(_data, _size, _instance, _reliable, _bound_client, _is_valid_crc, _sent_from_remote); + deliver_message(_data, _size, _instance, _reliable, _bound_client, _credentials, _status_check, _sent_from_remote); return true; } its_target = find_local(_client); @@ -673,36 +767,39 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, _data, its_data_size); } #endif - is_sent = send_local(its_target, get_client(), _data, _size, _instance, _flush, _reliable, VSOMEIP_SEND, _is_valid_crc); + is_sent = send_local(its_target, get_client(), _data, _size, _instance, _reliable, VSOMEIP_SEND, _status_check); } else { // Check whether hosting application should get the message // If not, check routes to external - if ((its_client == host_->get_client() && !is_request) + if ((its_client == host_->get_client() && is_response) || (find_local_client(its_service, _instance) == host_->get_client() && is_request)) { - // TODO: find out how to handle session id here - is_sent = deliver_message(_data, _size, _instance, _reliable, _bound_client, _is_valid_crc, _sent_from_remote); + // TODO: Find out how to handle session id here + is_sent = deliver_message(_data, _size, _instance, _reliable, VSOMEIP_ROUTING_CLIENT, _credentials, _status_check); } else { e2e_buffer outputBuffer; - if( configuration_->is_e2e_enabled()) { + if (e2e_provider_) { if ( !is_service_discovery) { service_t its_service = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); method_t its_method = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); - if( custom_protectors.count({its_service, its_method})) { +#ifndef ANDROID + if (e2e_provider_->is_protected({its_service, its_method})) { outputBuffer.assign(_data, _data + VSOMEIP_PAYLOAD_POS); e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data +_size); - custom_protectors[{its_service, its_method}]->protect( inputBuffer); + e2e_provider_->protect({its_service, its_method}, inputBuffer); outputBuffer.resize(inputBuffer.size() + VSOMEIP_PAYLOAD_POS); std::copy(inputBuffer.begin(), inputBuffer.end(), outputBuffer.begin() + VSOMEIP_PAYLOAD_POS); _data = outputBuffer.data(); } +#endif } } if (is_request) { - client_t client = is_specific_endpoint_client(its_client, its_service, _instance); - its_target = find_or_create_remote_client(its_service, _instance, _reliable, client); + client_t client = VSOMEIP_ROUTING_CLIENT; + its_target = ep_mgr_impl_->find_or_create_remote_client( + its_service, _instance, _reliable, client); if (its_target) { #ifdef USE_DLT const uint16_t its_data_size @@ -713,7 +810,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); #endif - is_sent = its_target->send(_data, _size, _flush); + is_sent = its_target->send(_data, _size); } else { const session_t its_session = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SESSION_POS_MIN], @@ -729,7 +826,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, std::shared_ptr<serviceinfo> its_info(find_service(its_service, _instance)); if (its_info || is_service_discovery) { if (is_notification && !is_service_discovery) { - send_local_notification(get_client(), _data, _size, _instance, _flush, _reliable, _is_valid_crc); + send_local_notification(get_client(), _data, _size, _instance, _reliable, _status_check); method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); std::shared_ptr<event> its_event = find_event(its_service, _instance, its_method); @@ -741,42 +838,49 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, // we need both endpoints as clients can subscribe to events via TCP and UDP std::shared_ptr<endpoint> its_udp_server_endpoint = its_info->get_endpoint(false); std::shared_ptr<endpoint> its_tcp_server_endpoint = its_info->get_endpoint(true); - bool is_offered_both(false); - if (its_udp_server_endpoint && its_tcp_server_endpoint) { - is_offered_both = true; - } + if (its_udp_server_endpoint || its_tcp_server_endpoint) { + const auto its_reliability = its_event->get_reliability(); for (auto its_group : its_event->get_eventgroups()) { auto its_eventgroup = find_eventgroup(its_service, _instance, its_group); if (its_eventgroup) { // Unicast targets - for (const auto &its_remote : its_eventgroup->get_targets()) { - if(its_remote.endpoint_->is_reliable() && its_tcp_server_endpoint) { - if (!is_offered_both || (is_offered_both && its_event->is_reliable())) { - its_targets.insert(its_remote.endpoint_); + for (const auto &its_remote : its_eventgroup->get_unicast_targets()) { + if (its_remote->is_reliable() && its_tcp_server_endpoint) { + if (its_reliability == reliability_type_e::RT_RELIABLE + || its_reliability == reliability_type_e::RT_BOTH) { + its_targets.insert(its_remote); } } else if (its_udp_server_endpoint && !its_eventgroup->is_sending_multicast()) { - if (!is_offered_both || (is_offered_both && !its_event->is_reliable())) { - its_targets.insert(its_remote.endpoint_); + if (its_reliability == reliability_type_e::RT_UNRELIABLE + || its_reliability == reliability_type_e::RT_BOTH) { + its_targets.insert(its_remote); } } } // Send to multicast targets if subscribers are still interested if (its_eventgroup->is_sending_multicast()) { - for (auto its_multicast_target : its_eventgroup->get_multicast_targets()) { - if (!is_offered_both || (is_offered_both && !its_event->is_reliable())) { - its_targets.insert(its_multicast_target.endpoint_); + if (its_reliability == reliability_type_e::RT_UNRELIABLE + || its_reliability == reliability_type_e::RT_BOTH) { + boost::asio::ip::address its_address; + uint16_t its_port; + if (its_eventgroup->get_multicast(its_address, its_port)) { + std::shared_ptr<endpoint_definition> its_multicast_target; + its_multicast_target = endpoint_definition::get(its_address, + its_port, false, its_service, _instance); + its_targets.insert(its_multicast_target); } } } } } } + for (auto const &target : its_targets) { if (target->is_reliable()) { - its_tcp_server_endpoint->send_to(target, _data, _size, _flush); + its_tcp_server_endpoint->send_to(target, _data, _size); } else { - its_udp_server_endpoint->send_to(target, _data, _size, _flush); + its_udp_server_endpoint->send_to(target, _data, _size); } #ifdef USE_DLT has_sent = true; @@ -798,7 +902,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, if ((utility::is_response(_data[VSOMEIP_MESSAGE_TYPE_POS]) || utility::is_error(_data[VSOMEIP_MESSAGE_TYPE_POS])) && !its_info->is_local()) { - // we received a response/error but neither the hosting application + // We received a response/error but neither the hosting application // nor another local client could be found --> drop const session_t its_session = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SESSION_POS_MIN], @@ -824,7 +928,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); #endif - is_sent = its_target->send(_data, _size, _flush); + is_sent = its_target->send(_data, _size); } else { const session_t its_session = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SESSION_POS_MIN], @@ -860,30 +964,43 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, } bool routing_manager_impl::send_to( + const client_t _client, const std::shared_ptr<endpoint_definition> &_target, - std::shared_ptr<message> _message, bool _flush) { + std::shared_ptr<message> _message) { bool is_sent(false); - std::lock_guard<std::mutex> its_lock(serialize_mutex_); - if (serializer_->serialize(_message.get())) { - const byte_t *_data = serializer_->get_data(); - length_t _size = serializer_->get_size(); - e2e_buffer outputBuffer; - if( configuration_->is_e2e_enabled()) { + + std::shared_ptr<serializer> its_serializer(get_serializer()); + if (its_serializer->serialize(_message.get())) { + const byte_t *its_data = its_serializer->get_data(); + length_t its_size = its_serializer->get_size(); + e2e_buffer its_output_buffer; + if (e2e_provider_) { service_t its_service = VSOMEIP_BYTES_TO_WORD( - _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); + its_data[VSOMEIP_SERVICE_POS_MIN], + its_data[VSOMEIP_SERVICE_POS_MAX]); method_t its_method = VSOMEIP_BYTES_TO_WORD( - _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); - if( custom_protectors.count({its_service, its_method})) { - outputBuffer.assign(_data, _data + VSOMEIP_PAYLOAD_POS); - e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data +_size); - custom_protectors[{its_service, its_method}]->protect( inputBuffer); - outputBuffer.resize(inputBuffer.size() + VSOMEIP_PAYLOAD_POS); - std::copy(inputBuffer.begin(), inputBuffer.end(), outputBuffer.begin() + VSOMEIP_PAYLOAD_POS); - _data = outputBuffer.data(); + its_data[VSOMEIP_METHOD_POS_MIN], + its_data[VSOMEIP_METHOD_POS_MAX]); +#ifndef ANDROID + if(e2e_provider_->is_protected({its_service, its_method})) { + its_output_buffer.assign(its_data, its_data + VSOMEIP_PAYLOAD_POS); + e2e_buffer its_input_buffer(its_data + VSOMEIP_PAYLOAD_POS, its_data + its_size); + e2e_provider_->protect({its_service, its_method}, its_input_buffer); + its_output_buffer.resize(its_input_buffer.size() + VSOMEIP_PAYLOAD_POS); + std::copy(its_input_buffer.begin(), its_input_buffer.end(), + its_output_buffer.begin() + VSOMEIP_PAYLOAD_POS); + its_data = its_output_buffer.data(); } +#endif } - is_sent = send_to(_target, _data, _size, _message->get_instance(), _flush); - serializer_->reset(); + + const_cast<byte_t*>(its_data)[VSOMEIP_CLIENT_POS_MIN] = VSOMEIP_WORD_BYTE1(_client); + const_cast<byte_t*>(its_data)[VSOMEIP_CLIENT_POS_MAX] = VSOMEIP_WORD_BYTE0(_client); + + is_sent = send_to(_target, its_data, its_size, _message->get_instance()); + + its_serializer->reset(); + put_serializer(its_serializer); } else { VSOMEIP_ERROR<< "routing_manager_impl::send_to: serialization failed."; } @@ -892,10 +1009,10 @@ bool routing_manager_impl::send_to( bool routing_manager_impl::send_to( const std::shared_ptr<endpoint_definition> &_target, - const byte_t *_data, uint32_t _size, instance_t _instance, - bool _flush) { - std::shared_ptr<endpoint> its_endpoint = find_server_endpoint( - _target->get_remote_port(), _target->is_reliable()); + const byte_t *_data, uint32_t _size, instance_t _instance) { + std::shared_ptr<endpoint> its_endpoint = + ep_mgr_impl_->find_server_endpoint( + _target->get_remote_port(), _target->is_reliable()); if (its_endpoint) { #ifdef USE_DLT @@ -909,26 +1026,30 @@ bool routing_manager_impl::send_to( #else (void) _instance; #endif - return its_endpoint->send_to(_target, _data, _size, _flush); + return its_endpoint->send_to(_target, _data, _size); } return false; } -bool routing_manager_impl::send_to( +bool routing_manager_impl::send_via_sd( const std::shared_ptr<endpoint_definition> &_target, const byte_t *_data, uint32_t _size, uint16_t _sd_port) { - std::shared_ptr<endpoint> its_endpoint = find_server_endpoint( - _sd_port, _target->is_reliable()); + std::shared_ptr<endpoint> its_endpoint = + ep_mgr_impl_->find_server_endpoint(_sd_port, + _target->is_reliable()); if (its_endpoint) { #ifdef USE_DLT - const uint16_t its_data_size - = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); + if (tc_->is_sd_enabled()) { + const uint16_t its_data_size + = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - trace::header its_header; - if (its_header.prepare(its_endpoint, true, 0x0)) - tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, - _data, its_data_size); + trace::header its_header; + if (its_header.prepare(its_endpoint, true, 0x0)) + tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, + _data, its_data_size); + + } #endif return its_endpoint->send_to(_target, _data, _size); } @@ -936,13 +1057,16 @@ bool routing_manager_impl::send_to( return false; } -void routing_manager_impl::register_event( - client_t _client, service_t _service, instance_t _instance, - event_t _event, const std::set<eventgroup_t> &_eventgroups, - bool _is_field, std::chrono::milliseconds _cycle, - bool _change_resets_cycle, epsilon_change_func_t _epsilon_change_func, +void routing_manager_impl::register_event(client_t _client, + service_t _service, instance_t _instance, + event_t _notifier, + const std::set<eventgroup_t> &_eventgroups, const event_type_e _type, + reliability_type_e _reliability, + std::chrono::milliseconds _cycle, bool _change_resets_cycle, + bool _update_on_change, + epsilon_change_func_t _epsilon_change_func, bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) { - auto its_event = find_event(_service, _instance, _event); + auto its_event = find_event(_service, _instance, _notifier); bool is_first(false); if (its_event) { if (!its_event->has_ref(_client, _is_provided)) { @@ -952,8 +1076,11 @@ void routing_manager_impl::register_event( is_first = true; } if (is_first) { - routing_manager_base::register_event(_client, _service, _instance, - _event, _eventgroups, _is_field, _cycle, _change_resets_cycle, + routing_manager_base::register_event(_client, + _service, _instance, + _notifier, + _eventgroups, _type, _reliability, + _cycle, _change_resets_cycle, _update_on_change, _epsilon_change_func, _is_provided, _is_shadow, _is_cache_placeholder); } @@ -961,17 +1088,20 @@ void routing_manager_impl::register_event( << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _event + << std::hex << std::setw(4) << std::setfill('0') << _notifier << ":is_provider=" << _is_provided << "]"; } void routing_manager_impl::register_shadow_event(client_t _client, service_t _service, instance_t _instance, - event_t _event, const std::set<eventgroup_t> &_eventgroups, - bool _is_field, bool _is_provided) { - routing_manager_base::register_event(_client, _service, _instance, - _event, _eventgroups, _is_field, - std::chrono::milliseconds::zero(), false, + event_t _notifier, + const std::set<eventgroup_t> &_eventgroups, event_type_e _type, + reliability_type_e _reliability, bool _is_provided) { + routing_manager_base::register_event(_client, + _service, _instance, + _notifier, + _eventgroups, _type, _reliability, + std::chrono::milliseconds::zero(), false, true, nullptr, _is_provided, true); } @@ -985,56 +1115,49 @@ void routing_manager_impl::unregister_shadow_event(client_t _client, void routing_manager_impl::notify_one(service_t _service, instance_t _instance, event_t _event, std::shared_ptr<payload> _payload, client_t _client, - bool _force, bool _flush, bool _remote_subscriber) { + bool _force +#ifdef VSOMEIP_ENABLE_COMPAT + , bool _remote_subscriber +#endif + ) { if (find_local(_client)) { routing_manager_base::notify_one(_service, _instance, _event, _payload, - _client, _force, _flush, _remote_subscriber); + _client, _force +#ifdef VSOMEIP_ENABLE_COMPAT + , _remote_subscriber +#endif + ); } else { std::shared_ptr<event> its_event = find_event(_service, _instance, _event); if (its_event) { - // Event is valid for service/instance - bool found_eventgroup(false); - // Iterate over all groups of the event to ensure at least - // one valid eventgroup for service/instance exists. - for (auto its_group : its_event->get_eventgroups()) { - auto its_eventgroup = find_eventgroup(_service, _instance, its_group); + std::set<std::shared_ptr<endpoint_definition> > its_targets; + const auto its_reliability = its_event->get_reliability(); + for (const auto g : its_event->get_eventgroups()) { + const auto its_eventgroup = find_eventgroup(_service, _instance, g); if (its_eventgroup) { - // Eventgroup is valid for service/instance - found_eventgroup = true; - break; - } - } - if (found_eventgroup) { - const bool is_offered_both = - (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT && - configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT); - - std::set<std::shared_ptr<endpoint_definition>> its_targets; - // Now set event's payload! - // Either with endpoint_definition (remote) or with client (local). - { - std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); - auto its_service = remote_subscribers_.find(_service); - if (its_service != remote_subscribers_.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - auto its_subscriber = its_instance->second.find(_client); - if (its_subscriber != its_instance->second.end()) { - its_targets = its_subscriber->second; + const auto its_subscriptions = its_eventgroup->get_remote_subscriptions(); + for (const auto &s : its_subscriptions) { + if (s->has_client(_client)) { + if (its_reliability == reliability_type_e::RT_RELIABLE + || its_reliability == reliability_type_e::RT_BOTH) { + const auto its_reliable = s->get_reliable(); + if (its_reliable) + its_targets.insert(its_reliable); + } + if (its_reliability == reliability_type_e::RT_UNRELIABLE + || its_reliability == reliability_type_e::RT_BOTH) { + const auto its_unreliable = s->get_unreliable(); + if (its_unreliable) + its_targets.insert(its_unreliable); } } } } + } + + if (its_targets.size() > 0) { for (const auto &its_target : its_targets) { - if (!is_offered_both) { - its_event->set_payload(_payload, its_target, _force, _flush); - } else { - if (its_event->is_reliable() && its_target->is_reliable()) { - its_event->set_payload(_payload, its_target, _force, _flush); - } else if (!its_event->is_reliable() && !its_target->is_reliable()) { - its_event->set_payload(_payload, its_target, _force, _flush); - } - } + its_event->set_payload(_payload, _client, its_target, _force); } } } else { @@ -1050,24 +1173,6 @@ void routing_manager_impl::on_availability(service_t _service, instance_t _insta host_->on_availability(_service, _instance, _is_available, _major, _minor); } -void routing_manager_impl::on_error( - const byte_t *_data, length_t _length, endpoint *_receiver, - const boost::asio::ip::address &_remote_address, - std::uint16_t _remote_port) { - instance_t its_instance = 0; - if (_length >= VSOMEIP_SERVICE_POS_MAX) { - service_t its_service = VSOMEIP_BYTES_TO_WORD( - _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); - its_instance = find_instance(its_service, _receiver); - } - send_error(return_code_e::E_MALFORMED_MESSAGE, _data, _length, its_instance, - _receiver->is_reliable(), _receiver, _remote_address, _remote_port); -} - -void routing_manager_impl::release_port(uint16_t _port, bool _reliable) { - std::lock_guard<std::mutex> its_lock(used_client_ports_mutex_); - used_client_ports_[_reliable].erase(_port); -} bool routing_manager_impl::offer_service_remotely(service_t _service, instance_t _instance, @@ -1156,7 +1261,7 @@ bool routing_manager_impl::stop_offer_service_remotely(service_t _service, clear_remote_subscriber(_service, _instance); if (discovery_ && its_info) { - discovery_->stop_offer_service(_service, _instance, its_info); + discovery_->stop_offer_service(its_info); its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable); } } else { @@ -1168,7 +1273,7 @@ bool routing_manager_impl::stop_offer_service_remotely(service_t _service, // ensure to not send StopOffer for endpoint on which the service is // still offered its_copied_info->set_endpoint(std::shared_ptr<endpoint>(), !_reliable); - discovery_->stop_offer_service(_service, _instance, its_copied_info); + discovery_->stop_offer_service(its_copied_info); } } @@ -1178,7 +1283,7 @@ bool routing_manager_impl::stop_offer_service_remotely(service_t _service, void routing_manager_impl::on_message(const byte_t *_data, length_t _size, endpoint *_receiver, const boost::asio::ip::address &_destination, - client_t _bound_client, + client_t _bound_client, credentials_t _credentials, const boost::asio::ip::address &_remote_address, std::uint16_t _remote_port) { #if 0 @@ -1191,7 +1296,7 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, (void)_bound_client; service_t its_service; method_t its_method; - bool its_is_crc_valid(true); + uint8_t its_check_status = e2e::profile_interface::generic_check_status::E2E_OK; instance_t its_instance(0x0); #ifdef USE_DLT bool is_forwarded(true); @@ -1215,7 +1320,7 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, } } } else { - its_instance = find_instance(its_service, _receiver); + its_instance = ep_mgr_impl_->find_instance(its_service, _receiver); if (its_instance == 0xFFFF) { its_method = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_METHOD_POS_MIN], @@ -1256,7 +1361,7 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, } // Security checks if enabled! - if (configuration_->is_security_enabled()) { + if (security::get()->is_enabled()) { if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) { client_t requester = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_CLIENT_POS_MIN], @@ -1276,7 +1381,7 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, << " which is already used locally ~> Skip message!"; return; } - if (!configuration_->is_remote_client_allowed()) { + if (!security::get()->is_remote_client_allowed()) { // check if policy allows remote requests. VSOMEIP_WARNING << "routing_manager_impl::on_message: " << std::hex << "Security: Remote client with client ID 0x" << requester @@ -1287,37 +1392,28 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, } } } - if( configuration_->is_e2e_enabled()) { + if (e2e_provider_) { its_method = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); - if( custom_checkers.count({its_service, its_method})) { +#ifndef ANDROID + if( e2e_provider_->is_checked({its_service, its_method})) { e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data + _size); - e2e::profile_interface::generic_check_status check_status; - custom_checkers[{its_service, its_method}]->check( inputBuffer, check_status); + e2e_provider_->check({its_service, its_method}, inputBuffer, its_check_status); - if ( check_status != e2e::profile_interface::generic_check_status::E2E_OK ) { + if ( its_check_status != e2e::profile_interface::generic_check_status::E2E_OK ) { VSOMEIP_INFO << std::hex << "E2E protection: CRC check failed for service: " << its_service << " method: " << its_method; - its_is_crc_valid = false; } } +#endif } - if (!deliver_specific_endpoint_message( - its_service, its_instance, _data, _size, _receiver)) { - // set client ID to zero for all messages - if( utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) { - byte_t *its_data = const_cast<byte_t *>(_data); - its_data[VSOMEIP_CLIENT_POS_MIN] = 0x0; - its_data[VSOMEIP_CLIENT_POS_MAX] = 0x0; - } - // Common way of message handling + + // Common way of message handling #ifdef USE_DLT - is_forwarded = + is_forwarded = #endif - on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(), - _bound_client, its_is_crc_valid, true); - - } + on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(), + _bound_client, _credentials, its_check_status, true); } } #ifdef USE_DLT @@ -1345,7 +1441,8 @@ bool routing_manager_impl::on_message( service_t _service, instance_t _instance, const byte_t *_data, length_t _size, bool _reliable, client_t _bound_client, - bool _is_valid_crc, + credentials_t _credentials, + uint8_t _check_status, bool _is_from_remote) { #if 0 std::stringstream msg; @@ -1367,15 +1464,15 @@ bool routing_manager_impl::on_message( _data[VSOMEIP_CLIENT_POS_MAX]); } - if (its_client == VSOMEIP_ROUTING_CLIENT - && utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) { + if (utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) { is_forwarded = deliver_notification(_service, _instance, _data, _size, - _reliable, _bound_client, _is_valid_crc, _is_from_remote); + _reliable, _bound_client, _credentials, _check_status, _is_from_remote); } else if (its_client == host_->get_client()) { deliver_message(_data, _size, _instance, - _reliable, _bound_client, _is_valid_crc, _is_from_remote); + _reliable, _bound_client, _credentials, _check_status, _is_from_remote); } else { - send(its_client, _data, _size, _instance, true, _reliable, _bound_client, _is_valid_crc, _is_from_remote); //send to proxy + send(its_client, _data, _size, _instance, _reliable, + _bound_client, _credentials, _check_status, _is_from_remote); //send to proxy } return is_forwarded; } @@ -1384,9 +1481,7 @@ void routing_manager_impl::on_notification(client_t _client, service_t _service, instance_t _instance, const byte_t *_data, length_t _size, bool _notify_one) { event_t its_event_id = VSOMEIP_BYTES_TO_WORD( - _data[VSOMEIP_METHOD_POS_MIN], - _data[VSOMEIP_METHOD_POS_MAX]); - + _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); std::shared_ptr<event> its_event = find_event(_service, _instance, its_event_id); if (its_event) { uint32_t its_length = utility::get_payload_size(_data, _size); @@ -1396,197 +1491,37 @@ void routing_manager_impl::on_notification(client_t _client, its_length); if (_notify_one) { - notify_one(_service, _instance, its_event->get_event(), its_payload, _client, true, true, false); + notify_one(_service, _instance, its_event->get_event(), + its_payload, _client, true +#ifdef VSOMEIP_ENABLE_COMPAT + , false +#endif + ); } else { if (its_event->is_field()) { - if (its_event->is_set()) { - its_event->set_payload(its_payload, false, true); - } else { - // new subscribers will be notified by SD via sent_initiale_events; - if (its_event->get_remote_notification_pending()) { - // Set payload first time ~> notify all remote subscriber per unicast (initial field) - std::vector<std::unique_lock<std::mutex>> its_locks; - std::vector<std::shared_ptr<eventgroupinfo>> its_eventgroupinfos; - for (auto its_group : its_event->get_eventgroups()) { - auto its_eventgroup = find_eventgroup(_service, _instance, its_group); - if (its_eventgroup) { - its_locks.push_back(its_eventgroup->get_subscription_lock()); - its_eventgroupinfos.push_back(its_eventgroup); - } - } - - bool is_offered_both(false); - if (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT && - configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT) { - is_offered_both = true; - } - for (const auto &its_eventgroup : its_eventgroupinfos) { - //Unicast targets - for (auto its_remote : its_eventgroup->get_targets()) { - if (!is_offered_both) { - its_event->set_payload(its_payload, its_remote.endpoint_, true, true); - } else { - bool was_set(false); - if (its_event->is_reliable() && its_remote.endpoint_->is_reliable()) { - its_event->set_payload(its_payload, its_remote.endpoint_, true, true); - was_set = true; - } - if (!its_event->is_reliable() && !its_remote.endpoint_->is_reliable()) { - its_event->set_payload(its_payload, its_remote.endpoint_, true, true); - was_set = true; - } - if (!was_set) { - its_event->set_payload_dont_notify(its_payload); - } - } - } - } - its_event->set_remote_notification_pending(false); - } else { - its_event->set_payload_dont_notify(its_payload); - } + if (!its_event->set_payload_notify_pending(its_payload)) { + its_event->set_payload(its_payload, false); } } else { - its_event->set_payload(its_payload, false, true); + its_event->set_payload(its_payload, false, true); } } } } -void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) { - // Is called when endpoint->connect succeeded! - struct service_info { - service_t service_id_; - instance_t instance_id_; - major_version_t major_; - minor_version_t minor_; - bool reliable_; - std::shared_ptr<endpoint> endpoint_; - }; - - // Set to state CONNECTED as connection is not yet fully established in remote side POV - // but endpoint is ready to send / receive. Set to ESTABLISHED after timer expires - // to prevent inserting subscriptions twice or send out subscription before remote side - // is finished with TCP 3 way handshake - _endpoint->set_connected(true); - - std::forward_list<struct service_info> services_to_report_; - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - for (auto &its_service : remote_services_) { - for (auto &its_instance : its_service.second) { - for (auto &its_client : its_instance.second) { - if (its_client.first == VSOMEIP_ROUTING_CLIENT || - its_client.first == get_client()) { - auto found_endpoint = its_client.second.find(false); - if (found_endpoint != its_client.second.end()) { - if (found_endpoint->second.get() == _endpoint.get()) { - std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first)); - if (!its_info) { - _endpoint->set_established(true); - return; - } - services_to_report_.push_front( - { its_service.first, - its_instance.first, - its_info->get_major(), - its_info->get_minor(), - false, _endpoint }); - } - } - found_endpoint = its_client.second.find(true); - if (found_endpoint != its_client.second.end()) { - if (found_endpoint->second.get() == _endpoint.get()) { - std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first)); - if (!its_info) { - _endpoint->set_established(true); - return; - } - services_to_report_.push_front( - { its_service.first, - its_instance.first, - its_info->get_major(), - its_info->get_minor(), - true, _endpoint }); - } - } - } - } - } +bool routing_manager_impl::is_last_stop_callback(const uint32_t _callback_id) { + bool last_callback(false); + auto found_id = callback_counts_.find(_callback_id); + if (found_id != callback_counts_.end()) { + found_id->second--; + if (found_id->second == 0) { + last_callback = true; } } - - for (const auto &s : services_to_report_) { - on_availability(s.service_id_, s.instance_id_, true, s.major_, s.minor_); - if (s.reliable_) { - stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, s.service_id_, - s.instance_id_, s.major_, s.minor_); - } - std::shared_ptr<boost::asio::steady_timer> its_timer = - std::make_shared<boost::asio::steady_timer>(io_); - boost::system::error_code ec; - its_timer->expires_from_now(std::chrono::milliseconds(3), ec); - if (!ec) { - its_timer->async_wait( - std::bind( - &routing_manager_impl::call_sd_endpoint_connected, - std::static_pointer_cast<routing_manager_impl>( - shared_from_this()), - std::placeholders::_1, s.service_id_, - s.instance_id_, s.endpoint_, its_timer)); - } else { - VSOMEIP_ERROR<< "routing_manager_impl::on_connect: " << ec.message(); - } - } - if (services_to_report_.empty()) { - _endpoint->set_established(true); - } -} - -void routing_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) { - // Is called when endpoint->connect fails! - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - for (auto &its_service : remote_services_) { - for (auto &its_instance : its_service.second) { - for (auto &its_client : its_instance.second) { - if (its_client.first == VSOMEIP_ROUTING_CLIENT || - its_client.first == get_client()) { - auto found_endpoint = its_client.second.find(false); - if (found_endpoint != its_client.second.end()) { - if (found_endpoint->second.get() == _endpoint.get()) { - - std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first)); - if(!its_info){ - return; - } - on_availability(its_service.first, its_instance.first, - false, its_info->get_major(), its_info->get_minor()); - } - } - found_endpoint = its_client.second.find(true); - if (found_endpoint != its_client.second.end()) { - if (found_endpoint->second.get() == _endpoint.get()) { - - std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first)); - if(!its_info){ - return; - } - on_availability(its_service.first, its_instance.first, - false, its_info->get_major(), its_info->get_minor()); - stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, - its_service.first, its_instance.first, - its_info->get_major(), - its_info->get_minor()); - VSOMEIP_WARNING << __func__ - << ": lost connection to remote service: " - << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "." - << std::hex << std::setw(4) << std::setfill('0') << its_instance.first; - } - } - } - } - } + if (last_callback) { + callback_counts_.erase(_callback_id); } + return last_callback; } void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _service, @@ -1643,47 +1578,133 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se if (its_info) { its_reliable_endpoint = its_info->get_endpoint(true); its_unreliable_endpoint = its_info->get_endpoint(false); - } + static std::atomic<uint32_t> callback_id(0); + const uint32_t its_callback_id = ++callback_id; + + struct ready_to_stop_t { + ready_to_stop_t() : reliable_(false), unreliable_(false){} + std::atomic<bool> reliable_; + std::atomic<bool> unreliable_; + }; + auto ready_to_stop = std::make_shared<ready_to_stop_t>(); + auto ptr = shared_from_this(); + + auto callback = [&, its_callback_id, ptr, its_info, its_reliable_endpoint, its_unreliable_endpoint, + ready_to_stop, _service, _instance, _major, _minor] + (std::shared_ptr<endpoint> _endpoint, service_t _stopped_service) { + (void)_stopped_service; + if (its_reliable_endpoint && its_reliable_endpoint == _endpoint) { + ready_to_stop->reliable_ = true; + } + if (its_unreliable_endpoint && its_unreliable_endpoint == _endpoint) { + ready_to_stop->unreliable_ = true; + } + if ((its_unreliable_endpoint && !ready_to_stop->unreliable_) || + (its_reliable_endpoint && !ready_to_stop->reliable_)) { + { + std::lock_guard<std::mutex> its_lock(callback_counts_mutex_); + if (is_last_stop_callback(its_callback_id)) { + erase_offer_command(_service, _instance); + } + } + return; + } - if (discovery_) { - if (its_info) { - if (its_info->get_major() == _major && its_info->get_minor() == _minor) { - discovery_->stop_offer_service(_service, _instance, its_info); + if (discovery_) { + if (its_info->get_major() == _major && its_info->get_minor() == _minor) { + discovery_->stop_offer_service(its_info); + } } - } - } - del_routing_info(_service, _instance, (its_reliable_endpoint != nullptr), - (its_unreliable_endpoint != nullptr)); + del_routing_info(_service, _instance, (its_reliable_endpoint != nullptr), + (its_unreliable_endpoint != nullptr)); - // Cleanup reliable & unreliable server endpoints hold before - if (its_info) { - if (its_unreliable_endpoint) { - cleanup_server_endpoint(_service, its_unreliable_endpoint); - // Clear service info and service group - clear_service_info(_service, _instance, false); + for (const auto& ep: {its_reliable_endpoint, its_unreliable_endpoint}) { + if (ep) { + if (ep_mgr_impl_->remove_instance(_service, ep.get())) { + { + std::lock_guard<std::mutex> its_lock(callback_counts_mutex_); + callback_counts_[its_callback_id]++; + } + // last instance -> pass ANY_INSTANCE and shutdown completely + ep->prepare_stop( + [&, _service, _instance, its_callback_id, ptr, its_reliable_endpoint, its_unreliable_endpoint] + (std::shared_ptr<endpoint> _endpoint, + service_t _stopped_service) { + (void)_stopped_service; + if (ep_mgr_impl_->remove_server_endpoint( + _endpoint->get_local_port(), + _endpoint->is_reliable())) { + _endpoint->stop(); + } + { + std::lock_guard<std::mutex> its_lock(callback_counts_mutex_); + if (is_last_stop_callback(its_callback_id)) { + erase_offer_command(_service, _instance); + } + } + }, ANY_SERVICE); + } + // Clear service info and service group + clear_service_info(_service, _instance, ep->is_reliable()); + } + } + { + std::lock_guard<std::mutex> its_lock(callback_counts_mutex_); + if (is_last_stop_callback(its_callback_id)) { + erase_offer_command(_service, _instance); + } + } + }; + + // determine callback count + for (const auto& ep : {its_reliable_endpoint, its_unreliable_endpoint}) { + if (ep) { + std::lock_guard<std::mutex> its_lock(callback_counts_mutex_); + auto found_id = callback_counts_.find(its_callback_id); + if (found_id != callback_counts_.end()) { + found_id->second++; + } else { + callback_counts_[its_callback_id] = 1; + } + } } - if (its_reliable_endpoint) { - cleanup_server_endpoint(_service, its_reliable_endpoint); - // Clear service info and service group - clear_service_info(_service, _instance, true); + for (const auto& ep : {its_reliable_endpoint, its_unreliable_endpoint}) { + if (ep) { + ep->prepare_stop(callback, _service); + } + } + + if (!its_reliable_endpoint && !its_unreliable_endpoint) { + { + std::lock_guard<std::mutex> its_lock(callback_counts_mutex_); + callback_counts_.erase(its_callback_id); + } + erase_offer_command(_service, _instance); } + } else { + erase_offer_command(_service, _instance); } } bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, - instance_t _instance, bool _reliable, client_t _bound_client, bool _is_valid_crc, bool _is_from_remote) { + instance_t _instance, bool _reliable, client_t _bound_client, credentials_t _credentials, + uint8_t _status_check, bool _is_from_remote) { bool is_delivered(false); + std::uint32_t its_sender_uid = std::get<0>(_credentials); + std::uint32_t its_sender_gid = std::get<1>(_credentials); - auto a_deserializer = get_deserializer(); - a_deserializer->set_data(_data, _size); - std::shared_ptr<message> its_message(a_deserializer->deserialize_message()); - a_deserializer->reset(); - put_deserializer(a_deserializer); + auto its_deserializer = get_deserializer(); + its_deserializer->set_data(_data, _size); + std::shared_ptr<message_impl> its_message(its_deserializer->deserialize_message()); + its_deserializer->reset(); + put_deserializer(its_deserializer); if (its_message) { its_message->set_instance(_instance); its_message->set_reliable(_reliable); - its_message->set_is_valid_crc(_is_valid_crc); + its_message->set_check_result(_status_check); + its_message->set_uid(std::get<0>(_credentials)); + its_message->set_gid(std::get<1>(_credentials)); if (!_is_from_remote) { if (utility::is_notification(its_message->get_message_type())) { @@ -1698,7 +1719,8 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, << " ~> Skip message!"; return false; } else { - if (!configuration_->is_client_allowed(get_client(), its_message->get_service(), + if (!security::get()->is_client_allowed(own_uid_, own_gid_, + get_client(), its_message->get_service(), its_message->get_instance(), its_message->get_method())) { VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() << " : routing_manager_impl::deliver_message: " @@ -1711,7 +1733,7 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, } } } else if (utility::is_request(its_message->get_message_type())) { - if (configuration_->is_security_enabled() + if (security::get()->is_enabled() && its_message->get_client() != _bound_client) { VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() << " : routing_manager_impl::deliver_message:" @@ -1720,12 +1742,13 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, << its_message->get_service() << "/" << its_message->get_instance() << "/" << its_message->get_method() << " which doesn't match the bound client 0x" << std::setw(4) << std::setfill('0') << _bound_client - << " ~> skip message!"; + << " ~> Skip message!"; return false; } - if (!configuration_->is_client_allowed(its_message->get_client(), - its_message->get_service(), its_message->get_instance(), its_message->get_method())) { + if (!security::get()->is_client_allowed(its_sender_uid, its_sender_gid, + its_message->get_client(), its_message->get_service(), + its_message->get_instance(), its_message->get_method())) { VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() << " : routing_manager_impl::deliver_message: " << " isn't allowed to send a request to service/instance/method " @@ -1746,8 +1769,9 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, << " ~> Skip message!"; return false; } else { - if (!configuration_->is_client_allowed(get_client(), its_message->get_service(), - its_message->get_instance(), its_message->get_method())) { + if (!security::get()->is_client_allowed(own_uid_, own_gid_, + get_client(), its_message->get_service(), + its_message->get_instance(), its_message->get_method())) { VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() << " : routing_manager_impl::deliver_message: " << " isn't allowed to receive a response from service/instance/method " @@ -1760,7 +1784,7 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, } } } else { - if (!configuration_->is_remote_client_allowed()) { + if (!security::get()->is_remote_client_allowed()) { // if the message is from remote, check if // policy allows remote requests. VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() @@ -1773,7 +1797,8 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, << " ~> Skip message!"; return false; } else if (utility::is_notification(its_message->get_message_type())) { - if (!configuration_->is_client_allowed(get_client(), its_message->get_service(), + if (!security::get()->is_client_allowed(own_uid_, own_gid_, + get_client(), its_message->get_service(), its_message->get_instance(), its_message->get_method())) { VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() << " : routing_manager_impl::deliver_message: " @@ -1800,11 +1825,14 @@ bool routing_manager_impl::deliver_notification( service_t _service, instance_t _instance, const byte_t *_data, length_t _length, bool _reliable, client_t _bound_client, - bool _is_valid_crc, bool _is_from_remote) { - method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], - _data[VSOMEIP_METHOD_POS_MAX]); + credentials_t _credentials, + uint8_t _status_check, bool _is_from_remote) { + event_t its_event_id = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); + client_t its_client_id = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]); - std::shared_ptr<event> its_event = find_event(_service, _instance, its_method); + std::shared_ptr<event> its_event = find_event(_service, _instance, its_event_id); if (its_event) { if (!its_event->is_provided()) { if (its_event->get_subscribers().size() == 0) { @@ -1843,17 +1871,46 @@ bool routing_manager_impl::deliver_notification( } } - for (const auto its_local_client : its_event->get_subscribers()) { - if (its_local_client == host_->get_client()) { - deliver_message(_data, _length, _instance, _reliable, _bound_client, _is_valid_crc, _is_from_remote); - } else { - std::shared_ptr<endpoint> its_local_target = find_local(its_local_client); - if (its_local_target) { - send_local(its_local_target, VSOMEIP_ROUTING_CLIENT, - _data, _length, _instance, true, _reliable, VSOMEIP_SEND, _is_valid_crc); + if (its_event->get_type() != event_type_e::ET_SELECTIVE_EVENT) { + for (const auto its_local_client : its_event->get_subscribers()) { + if (its_local_client == host_->get_client()) { + deliver_message(_data, _length, _instance, _reliable, + _bound_client, _credentials, _status_check, _is_from_remote); + } else { + std::shared_ptr<endpoint> its_local_target = find_local(its_local_client); + if (its_local_target) { + send_local(its_local_target, VSOMEIP_ROUTING_CLIENT, + _data, _length, _instance, _reliable, VSOMEIP_SEND, _status_check); + } + } + } + } else { + // TODO: Check whether it makes more sense to set the client id + // for internal selective events. This would create some extra + // effort but we could avoid this hack. + if (its_client_id == VSOMEIP_ROUTING_CLIENT) + its_client_id = get_client(); + + auto its_subscribers = its_event->get_subscribers(); + if (its_subscribers.find(its_client_id) != its_subscribers.end()) { + if (its_client_id == host_->get_client()) { + deliver_message(_data, _length, _instance, _reliable, + _bound_client, _credentials, _status_check, _is_from_remote); + } else { + std::shared_ptr<endpoint> its_local_target = find_local(its_client_id); + if (its_local_target) { + send_local(its_local_target, VSOMEIP_ROUTING_CLIENT, + _data, _length, _instance, _reliable, VSOMEIP_SEND, _status_check); + } } } } + } else { + VSOMEIP_WARNING << __func__ << ": Event [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_event_id << "]" + << " is not registered. The message is dropped."; } return true; @@ -1865,34 +1922,40 @@ std::shared_ptr<eventgroupinfo> routing_manager_impl::find_eventgroup( return routing_manager_base::find_eventgroup(_service, _instance, _eventgroup); } -const std::shared_ptr<configuration> routing_manager_impl::get_configuration() const { - return (host_->get_configuration()); -} - std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoint( const std::string &_address, uint16_t _port, bool _reliable) { - std::shared_ptr<endpoint> its_service_endpoint = find_server_endpoint(_port, - _reliable); + std::shared_ptr<endpoint> its_service_endpoint = + ep_mgr_impl_->find_server_endpoint(_port, _reliable); if (!its_service_endpoint) { try { - its_service_endpoint = create_server_endpoint(_port, _reliable, - true); + its_service_endpoint = + ep_mgr_impl_->create_server_endpoint(_port, + _reliable, true); if (its_service_endpoint) { - sd_info_ = std::make_shared<serviceinfo>(ANY_MAJOR, ANY_MINOR, - DEFAULT_TTL, false); // false, because we do _not_ want to announce it... + sd_info_ = std::make_shared<serviceinfo>( + VSOMEIP_SD_SERVICE, VSOMEIP_SD_INSTANCE, + ANY_MAJOR, ANY_MINOR, DEFAULT_TTL, + false); // false, because we do _not_ want to announce it... sd_info_->set_endpoint(its_service_endpoint, _reliable); its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE, _address, _port); - its_service_endpoint->join(_address); + if (!_reliable) { +#if defined(_WIN32) || defined(ANDROID) + dynamic_cast<udp_server_endpoint_impl*>( + its_service_endpoint.get())->join(_address); +#else + reinterpret_cast<udp_server_endpoint_impl*>( + its_service_endpoint.get())->join(_address); +#endif + } } else { VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. " "Please check your network configuration."; } } catch (const std::exception &e) { - host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED); - VSOMEIP_ERROR << "Service Discovery endpoint could not be created: " - << e.what(); + VSOMEIP_ERROR << "Server endpoint creation failed: Service " + "Discovery endpoint could not be created: " << e.what(); } } return its_service_endpoint; @@ -1900,8 +1963,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoin services_t routing_manager_impl::get_offered_services() const { services_t its_services; - for (auto s : get_services()) { - for (auto i : s.second) { + for (const auto& s : get_services()) { + for (const auto& i : s.second) { if (i.second->is_local()) { its_services[s.first][i.first] = i.second; } @@ -1926,7 +1989,7 @@ routing_manager_impl::get_offered_service_instances(service_t _service) const { const services_t its_services(get_services()); const auto found_service = its_services.find(_service); if (found_service != its_services.end()) { - for (const auto i : found_service->second) { + for (const auto& i : found_service->second) { if (i.second->is_local()) { its_instances[i.first] = i.second; } @@ -1935,25 +1998,6 @@ routing_manager_impl::get_offered_service_instances(service_t _service) const { return its_instances; } -std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client( - service_t _service, instance_t _instance, bool _reliable, client_t _client) { - std::shared_ptr<endpoint> its_endpoint; - bool start_endpoint(false); - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - its_endpoint = find_remote_client(_service, _instance, _reliable, _client); - if (!its_endpoint) { - its_endpoint = create_remote_client(_service, _instance, _reliable, _client); - start_endpoint = true; - } - } - if (start_endpoint && its_endpoint - && configuration_->is_someip(_service, _instance)) { - its_endpoint->start(); - } - return its_endpoint; -} - /////////////////////////////////////////////////////////////////////////////// // PRIVATE /////////////////////////////////////////////////////////////////////////////// @@ -1969,39 +2013,29 @@ void routing_manager_impl::init_service_info( return; } if (configuration_) { - std::shared_ptr<endpoint> its_reliable_endpoint; - std::shared_ptr<endpoint> its_unreliable_endpoint; - - uint16_t its_reliable_port = configuration_->get_reliable_port(_service, - _instance); - uint16_t its_unreliable_port = configuration_->get_unreliable_port( - _service, _instance); - - bool is_someip = configuration_->is_someip(_service, _instance); - // Create server endpoints for local services only if (_is_local_service) { + const bool is_someip = configuration_->is_someip(_service, _instance); + uint16_t its_reliable_port = configuration_->get_reliable_port( + _service, _instance); if (ILLEGAL_PORT != its_reliable_port) { - its_reliable_endpoint = find_or_create_server_endpoint( - its_reliable_port, true, is_someip); + std::shared_ptr<endpoint> its_reliable_endpoint = + ep_mgr_impl_->find_or_create_server_endpoint( + its_reliable_port, true, is_someip, _service, + _instance); if (its_reliable_endpoint) { its_info->set_endpoint(its_reliable_endpoint, true); - its_reliable_endpoint->increment_use_count(); - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - service_instances_[_service][its_reliable_endpoint.get()] = - _instance; } } - + uint16_t its_unreliable_port = configuration_->get_unreliable_port( + _service, _instance); if (ILLEGAL_PORT != its_unreliable_port) { - its_unreliable_endpoint = find_or_create_server_endpoint( - its_unreliable_port, false, is_someip); + std::shared_ptr<endpoint> its_unreliable_endpoint = + ep_mgr_impl_->find_or_create_server_endpoint( + its_unreliable_port, false, is_someip, _service, + _instance); if (its_unreliable_endpoint) { its_info->set_endpoint(its_unreliable_endpoint, false); - its_unreliable_endpoint->increment_use_count(); - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - service_instances_[_service][its_unreliable_endpoint.get()] = - _instance; } } @@ -2013,143 +2047,8 @@ void routing_manager_impl::init_service_info( } } } else { - host_->on_error(error_code_e::CONFIGURATION_MISSING); - } -} - -std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint( - const boost::asio::ip::address &_address, - uint16_t _local_port, uint16_t _remote_port, - bool _reliable, client_t _client) { - (void)_client; - - std::shared_ptr<endpoint> its_endpoint; - try { - if (_reliable) { - its_endpoint = std::make_shared<tcp_client_endpoint_impl>( - shared_from_this(), - boost::asio::ip::tcp::endpoint( - (_address.is_v4() ? - boost::asio::ip::tcp::v4() : - boost::asio::ip::tcp::v6()), - _local_port), - boost::asio::ip::tcp::endpoint(_address, _remote_port), - io_, - configuration_->get_max_message_size_reliable( - _address.to_string(), _remote_port), - configuration_->get_buffer_shrink_threshold(), - // send timeout after 2/3 of configured ttl, warning after 1/3 - std::chrono::milliseconds(configuration_->get_sd_ttl() * 666), - configuration_->get_endpoint_queue_limit( - _address.to_string(), _remote_port), - configuration_->get_max_tcp_restart_aborts(), - configuration_->get_max_tcp_connect_time()); - - if (configuration_->has_enabled_magic_cookies(_address.to_string(), - _remote_port)) { - its_endpoint->enable_magic_cookies(); - } - } else { - its_endpoint = std::make_shared<udp_client_endpoint_impl>( - shared_from_this(), - boost::asio::ip::udp::endpoint( - (_address.is_v4() ? - boost::asio::ip::udp::v4() : - boost::asio::ip::udp::v6()), - _local_port), - boost::asio::ip::udp::endpoint(_address, _remote_port), - io_, configuration_->get_endpoint_queue_limit( - _address.to_string(), _remote_port), - configuration_->get_udp_receive_buffer_size()); - } - } catch (...) { - host_->on_error(error_code_e::CLIENT_ENDPOINT_CREATION_FAILED); + VSOMEIP_ERROR << "Missing vsomeip configuration."; } - - return (its_endpoint); -} - -std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( - uint16_t _port, bool _reliable, bool _start) { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - std::shared_ptr<endpoint> its_endpoint; - try { - boost::asio::ip::address its_unicast = configuration_->get_unicast_address(); - if (_start) { - if (_reliable) { - its_endpoint = std::make_shared<tcp_server_endpoint_impl>( - shared_from_this(), - boost::asio::ip::tcp::endpoint(its_unicast, _port), io_, - configuration_->get_max_message_size_reliable( - its_unicast.to_string(), _port), - configuration_->get_buffer_shrink_threshold(), - // send timeout after 2/3 of configured ttl, warning after 1/3 - std::chrono::milliseconds(configuration_->get_sd_ttl() * 666), - configuration_->get_endpoint_queue_limit( - its_unicast.to_string(), _port)); - if (configuration_->has_enabled_magic_cookies( - its_unicast.to_string(), _port) || - configuration_->has_enabled_magic_cookies( - "local", _port)) { - its_endpoint->enable_magic_cookies(); - } - } else { - configuration::endpoint_queue_limit_t its_limit = - configuration_->get_endpoint_queue_limit( - its_unicast.to_string(), _port); -#ifndef _WIN32 - if (its_unicast.is_v4()) { - its_unicast = boost::asio::ip::address_v4::any(); - } else if (its_unicast.is_v6()) { - its_unicast = boost::asio::ip::address_v6::any(); - } -#endif - boost::asio::ip::udp::endpoint ep(its_unicast, _port); - its_endpoint = std::make_shared<udp_server_endpoint_impl>( - shared_from_this(), ep, io_, its_limit, - configuration_->get_udp_receive_buffer_size()); - } - - } else { - its_endpoint = std::make_shared<virtual_server_endpoint_impl>( - its_unicast.to_string(), _port, _reliable); - } - - if (its_endpoint) { - server_endpoints_[_port][_reliable] = its_endpoint; - its_endpoint->start(); - } - } catch (const std::exception &e) { - host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED); - VSOMEIP_ERROR << e.what(); - } - - return (its_endpoint); -} - -std::shared_ptr<endpoint> routing_manager_impl::find_server_endpoint( - uint16_t _port, bool _reliable) const { - std::shared_ptr<endpoint> its_endpoint; - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - auto found_port = server_endpoints_.find(_port); - if (found_port != server_endpoints_.end()) { - auto found_endpoint = found_port->second.find(_reliable); - if (found_endpoint != found_port->second.end()) { - its_endpoint = found_endpoint->second; - } - } - - return (its_endpoint); -} - -std::shared_ptr<endpoint> routing_manager_impl::find_or_create_server_endpoint( - uint16_t _port, bool _reliable, bool _start) { - std::shared_ptr<endpoint> its_endpoint = find_server_endpoint(_port, - _reliable); - if (!its_endpoint) { - its_endpoint = create_server_endpoint(_port, _reliable, _start); - } - return (its_endpoint); } void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) { @@ -2167,8 +2066,8 @@ void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) { std::lock_guard<std::mutex> its_lock(requested_services_mutex_); auto its_client = requested_services_.find(_client); if (its_client != requested_services_.end()) { - for (auto its_service : its_client->second) { - for (auto its_instance : its_service.second) { + for (const auto& its_service : its_client->second) { + for (const auto& its_instance : its_service.second) { services_to_release_.push_front( { its_service.first, its_instance.first }); } @@ -2180,162 +2079,6 @@ void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) { } } -instance_t routing_manager_impl::find_instance(service_t _service, - endpoint * _endpoint) { - instance_t its_instance(0xFFFF); - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - auto found_service = service_instances_.find(_service); - if (found_service != service_instances_.end()) { - auto found_endpoint = found_service->second.find(_endpoint); - if (found_endpoint != found_service->second.end()) { - its_instance = found_endpoint->second; - } - } - } - return (its_instance); -} - -std::shared_ptr<endpoint> routing_manager_impl::create_remote_client( - service_t _service, instance_t _instance, bool _reliable, client_t _client) { - std::shared_ptr<endpoint> its_endpoint; - std::shared_ptr<endpoint_definition> its_endpoint_def; - uint16_t its_local_port; - uint16_t its_remote_port = ILLEGAL_PORT; - - auto found_service = remote_service_info_.find(_service); - if (found_service != remote_service_info_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_reliability = found_instance->second.find(_reliable); - if (found_reliability != found_instance->second.end()) { - its_endpoint_def = found_reliability->second; - its_remote_port = its_endpoint_def->get_port(); - } - } - } - - if( its_remote_port != ILLEGAL_PORT) { - // if client port range for remote service port range is configured - // and remote port is in range, determine unused client port - std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_); - if (configuration_->get_client_port(_service, _instance, its_remote_port, _reliable, - used_client_ports_, its_local_port)) { - if(its_endpoint_def) { - its_endpoint = create_client_endpoint( - its_endpoint_def->get_address(), - its_local_port, - its_endpoint_def->get_port(), - _reliable, _client - ); - } - - if (its_endpoint) { - used_client_ports_[_reliable].insert(its_local_port); - its_lock.unlock(); - service_instances_[_service][its_endpoint.get()] = _instance; - remote_services_[_service][_instance][_client][_reliable] = its_endpoint; - if (_client == VSOMEIP_ROUTING_CLIENT) { - client_endpoints_by_ip_[its_endpoint_def->get_address()] - [its_endpoint_def->get_port()] - [_reliable] = its_endpoint; - // Set the basic route to the service in the service info - auto found_service_info = find_service(_service, _instance); - if (found_service_info) { - found_service_info->set_endpoint(its_endpoint, _reliable); - } - } - } - } - } - return its_endpoint; -} - - -std::shared_ptr<endpoint> routing_manager_impl::find_remote_client( - service_t _service, instance_t _instance, bool _reliable, client_t _client) { - std::shared_ptr<endpoint> its_endpoint; - auto found_service = remote_services_.find(_service); - if (found_service != remote_services_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_client = found_instance->second.find(_client); - if (found_client != found_instance->second.end()) { - auto found_reliability = found_client->second.find(_reliable); - if (found_reliability != found_client->second.end()) { - its_endpoint = found_reliability->second; - } - } - } - } - if (its_endpoint || _client != VSOMEIP_ROUTING_CLIENT) { - return its_endpoint; - } - - // If another service is hosted on the same server_endpoint - // reuse the existing client_endpoint. - auto found_service_info = remote_service_info_.find(_service); - if(found_service_info != remote_service_info_.end()) { - auto found_instance = found_service_info->second.find(_instance); - if(found_instance != found_service_info->second.end()) { - auto found_reliable = found_instance->second.find(_reliable); - if(found_reliable != found_instance->second.end()) { - std::shared_ptr<endpoint_definition> its_ep_def = - found_reliable->second; - auto found_address = client_endpoints_by_ip_.find( - its_ep_def->get_address()); - if(found_address != client_endpoints_by_ip_.end()) { - auto found_port = found_address->second.find( - its_ep_def->get_remote_port()); - if(found_port != found_address->second.end()) { - auto found_reliable2 = found_port->second.find( - _reliable); - if(found_reliable2 != found_port->second.end()) { - its_endpoint = found_reliable2->second; - // store the endpoint under this service/instance id - // as well - needed for later cleanup - remote_services_[_service][_instance][_client][_reliable] = - its_endpoint; - service_instances_[_service][its_endpoint.get()] = _instance; - // add endpoint to serviceinfo object - auto found_service_info = find_service(_service,_instance); - if (found_service_info) { - found_service_info->set_endpoint(its_endpoint, _reliable); - } - } - } - } - } - } - } - return its_endpoint; -} - -client_t routing_manager_impl::find_client( - service_t _service, instance_t _instance, - const std::shared_ptr<eventgroupinfo> &_eventgroup, - const std::shared_ptr<endpoint_definition> &_target) const { - client_t its_client = VSOMEIP_ROUTING_CLIENT; - if (!_eventgroup->is_multicast()) { - if (!_target->is_reliable()) { - uint16_t unreliable_port = configuration_->get_unreliable_port(_service, _instance); - auto endpoint = find_server_endpoint(unreliable_port, false); - if (endpoint) { - its_client = std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint)-> - get_client(_target); - } - } else { - uint16_t reliable_port = configuration_->get_reliable_port(_service, _instance); - auto endpoint = find_server_endpoint(reliable_port, true); - if (endpoint) { - its_client = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(endpoint)-> - get_client(_target); - } - } - } - return its_client; -} - bool routing_manager_impl::is_field(service_t _service, instance_t _instance, event_t _event) const { std::lock_guard<std::mutex> its_lock(events_mutex_); @@ -2398,60 +2141,12 @@ void routing_manager_impl::add_routing_info( // Check whether remote services are unchanged bool is_reliable_known(false); bool is_unreliable_known(false); - - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - auto found_service = remote_service_info_.find(_service); - if (found_service != remote_service_info_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - std::shared_ptr<endpoint_definition> its_definition; - if (_reliable_port != ILLEGAL_PORT) { - auto found_reliable = found_instance->second.find(true); - if (found_reliable != found_instance->second.end()) { - its_definition = found_reliable->second; - if (its_definition->get_address() == _reliable_address - && its_definition->get_port() == _reliable_port) { - is_reliable_known = true; - } else { - VSOMEIP_WARNING << "Reliable service endpoint has changed: [" - << std::hex << std::setfill('0') << std::setw(4) << _service << "." - << std::hex << std::setfill('0') << std::setw(4) << _instance << "." - << std::dec << static_cast<std::uint32_t>(_major) << "." - << std::dec << _minor << "] old: " - << its_definition->get_address().to_string() << ":" - << its_definition->get_port() << " new: " - << _reliable_address.to_string() << ":" - << _reliable_port; - } - } - } - if (_unreliable_port != ILLEGAL_PORT) { - auto found_unreliable = found_instance->second.find(false); - if (found_unreliable != found_instance->second.end()) { - its_definition = found_unreliable->second; - if (its_definition->get_address() == _unreliable_address - && its_definition->get_port() == _unreliable_port) { - is_unreliable_known = true; - } else { - VSOMEIP_WARNING << "Unreliable service endpoint has changed: [" - << std::hex << std::setfill('0') << std::setw(4) << _service << "." - << std::hex << std::setfill('0') << std::setw(4) << _instance << "." - << std::dec << static_cast<std::uint32_t>(_major) << "." - << std::dec << _minor << "] old: " - << its_definition->get_address().to_string() << ":" - << its_definition->get_port() << " new: " - << _unreliable_address.to_string() << ":" - << _unreliable_port; - } - } - } - } - } - } + ep_mgr_impl_->is_remote_service_known(_service, _instance, _major, + _minor, _reliable_address, _reliable_port, &is_reliable_known, + _unreliable_address, _unreliable_port, &is_unreliable_known); bool udp_inserted(false); - + bool tcp_inserted(false); // Add endpoint(s) if necessary if (_reliable_port != ILLEGAL_PORT && !is_reliable_known) { std::shared_ptr<endpoint_definition> endpoint_def_tcp @@ -2459,15 +2154,14 @@ void routing_manager_impl::add_routing_info( if (_unreliable_port != ILLEGAL_PORT && !is_unreliable_known) { std::shared_ptr<endpoint_definition> endpoint_def_udp = endpoint_definition::get(_unreliable_address, _unreliable_port, false, _service, _instance); - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - remote_service_info_[_service][_instance][false] = endpoint_def_udp; - remote_service_info_[_service][_instance][true] = endpoint_def_tcp; - } + ep_mgr_impl_->add_remote_service_info(_service, _instance, + endpoint_def_tcp, endpoint_def_udp); udp_inserted = true; + tcp_inserted = true; } else { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - remote_service_info_[_service][_instance][true] = endpoint_def_tcp; + ep_mgr_impl_->add_remote_service_info(_service, _instance, + endpoint_def_tcp); + tcp_inserted = true; } // check if service was requested and establish TCP connection if necessary @@ -2489,11 +2183,13 @@ void routing_manager_impl::add_routing_info( // SWS_SD_00376 establish TCP connection to service // service is marked as available later in on_connect() if(!connected) { - find_or_create_remote_client(_service, _instance, - true, VSOMEIP_ROUTING_CLIENT); if (udp_inserted) { - find_or_create_remote_client(_service, _instance, - false, VSOMEIP_ROUTING_CLIENT); + // atomically create reliable and unreliable endpoint + ep_mgr_impl_->find_or_create_remote_client( + _service, _instance, VSOMEIP_ROUTING_CLIENT); + } else { + ep_mgr_impl_->find_or_create_remote_client( + _service, _instance, true, VSOMEIP_ROUTING_CLIENT); } connected = true; } @@ -2505,10 +2201,6 @@ void routing_manager_impl::add_routing_info( } } } - auto specific_endpoint_clients = get_specific_endpoint_clients(_service, _instance); - for (const client_t& c : specific_endpoint_clients) { - find_or_create_remote_client(_service, _instance, true, c); - } } else if (_reliable_port != ILLEGAL_PORT && is_reliable_known) { std::lock_guard<std::mutex> its_lock(requested_services_mutex_); for(const auto &client_id : requested_services_) { @@ -2523,7 +2215,9 @@ void routing_manager_impl::add_routing_info( && (major_minor_pair.second <= _minor || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { - if (!stub_->contained_in_routing_info( + std::shared_ptr<endpoint> ep = its_info->get_endpoint(true); + if (ep && ep->is_established() && + !stub_->contained_in_routing_info( VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor())) { @@ -2534,12 +2228,8 @@ void routing_manager_impl::add_routing_info( its_info->get_major(), its_info->get_minor()); if (discovery_) { - std::shared_ptr<endpoint> ep = its_info->get_endpoint(true); - if (ep && ep->is_established()) { - discovery_->on_endpoint_connected( - _service, _instance, - ep); - } + discovery_->on_endpoint_connected( + _service, _instance, ep); } } break; @@ -2554,10 +2244,7 @@ void routing_manager_impl::add_routing_info( if (!udp_inserted) { std::shared_ptr<endpoint_definition> endpoint_def = endpoint_definition::get(_unreliable_address, _unreliable_port, false, _service, _instance); - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - remote_service_info_[_service][_instance][false] = endpoint_def; - } + ep_mgr_impl_->add_remote_service_info(_service, _instance, endpoint_def); // check if service was requested and increase requester count if necessary { bool connected(false); @@ -2577,7 +2264,7 @@ void routing_manager_impl::add_routing_info( || major_minor_pair.second == ANY_MINOR)) { if(!connected) { - find_or_create_remote_client(_service, _instance, + ep_mgr_impl_->find_or_create_remote_client(_service, _instance, false, VSOMEIP_ROUTING_CLIENT); connected = true; } @@ -2590,7 +2277,8 @@ void routing_manager_impl::add_routing_info( } } } - if (!is_reliable_known) { + if (!is_reliable_known && !tcp_inserted) { + // UDP only service can be marked as available instantly on_availability(_service, _instance, true, _major, _minor); stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, _major, _minor); } @@ -2649,26 +2337,81 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst if(!its_info) return; - on_availability(_service, _instance, false, its_info->get_major(), its_info->get_minor()); - stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor()); + on_availability(_service, _instance, false, + its_info->get_major(), its_info->get_minor()); + stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, + its_info->get_major(), its_info->get_minor()); // Implicit unsubscribe - clear_targets_and_pending_sub_from_eventgroups(_service, _instance); - clear_identified_clients( _service, _instance); - clear_identifying_clients( _service, _instance); + std::vector<std::shared_ptr<event>> its_events; + { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + auto found_service = eventgroups_.find(_service); + if (found_service != eventgroups_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + for (auto &its_eventgroup : found_instance->second) { + // As the service is gone, all subscriptions to its events + // do no longer exist and the last received payload is no + // longer valid. + for (auto &its_event : its_eventgroup.second->get_events()) { + const auto its_subscribers = its_event->get_subscribers(); + for (const auto its_subscriber : its_subscribers) { + if (its_subscriber != get_client()) { + its_event->remove_subscriber( + its_eventgroup.first, its_subscriber); + } + } + its_events.push_back(its_event); + remove_pending_subscription(_service, _instance, + its_eventgroup.first, its_event->get_event()); + } + + } + } + } + } + for (const auto& e : its_events) { + e->unset_payload(true); + } - clear_remote_subscriber(_service, _instance); + { + std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); + std::set<std::tuple< + service_t, instance_t, eventgroup_t, client_t> > its_invalid; + + for (const auto its_state : remote_subscription_state_) { + if (std::get<0>(its_state.first) == _service + && std::get<1>(its_state.first) == _instance) { + its_invalid.insert(its_state.first); + } + } + + for (const auto its_key : its_invalid) + remote_subscription_state_.erase(its_key); + } + + { + std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); + auto found_service = remote_subscribers_.find(_service); + if (found_service != remote_subscribers_.end()) { + if (found_service->second.erase(_instance) > 0 && + !found_service->second.size()) { + remote_subscribers_.erase(found_service); + } + } + } if (_has_reliable) { - clear_client_endpoints(_service, _instance, true); - clear_remote_service_info(_service, _instance, true); + ep_mgr_impl_->clear_client_endpoints(_service, _instance, true); + ep_mgr_impl_->clear_remote_service_info(_service, _instance, true); } if (_has_unreliable) { - clear_client_endpoints(_service, _instance, false); - clear_remote_service_info(_service, _instance, false); + ep_mgr_impl_->clear_client_endpoints(_service, _instance, false); + ep_mgr_impl_->clear_remote_service_info(_service, _instance, false); } - clear_multicast_endpoints(_service, _instance); + ep_mgr_impl_->clear_multicast_endpoints(_service, _instance); if (_has_reliable) clear_service_info(_service, _instance, true); @@ -2677,8 +2420,8 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst // For expired services using only unreliable endpoints that have never been created before if (!_has_reliable && !_has_unreliable) { - clear_remote_service_info(_service, _instance, true); - clear_remote_service_info(_service, _instance, false); + ep_mgr_impl_->clear_remote_service_info(_service, _instance, true); + ep_mgr_impl_->clear_remote_service_info(_service, _instance, false); clear_service_info(_service, _instance, true); clear_service_info(_service, _instance, false); } @@ -2764,83 +2507,57 @@ void routing_manager_impl::expire_services(const boost::asio::ip::address &_addr } } -void routing_manager_impl::expire_subscriptions(const boost::asio::ip::address &_address) { - struct subscriptions_info { - service_t service_id_; - instance_t instance_id_; - eventgroup_t eventgroup_id_; - std::shared_ptr<endpoint_definition> invalid_endpoint_; - client_t client_; - std::set<std::shared_ptr<event>> events_; - std::shared_ptr<eventgroupinfo> eventgroupinfo_; - }; - std::vector<struct subscriptions_info> subscriptions_to_expire_; +void +routing_manager_impl::expire_subscriptions( + const boost::asio::ip::address &_address) { { std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - for (auto &its_service : eventgroups_) { - for (auto &its_instance : its_service.second) { - for (auto &its_eventgroup : its_instance.second) { - std::set<std::shared_ptr<endpoint_definition>> its_invalid_endpoints; - for (auto &its_target : its_eventgroup.second->get_targets()) { - if (its_target.endpoint_->get_address() == _address) - its_invalid_endpoints.insert(its_target.endpoint_); - } + for (const auto &its_service : eventgroups_) { + for (const auto &its_instance : its_service.second) { + for (const auto &its_eventgroup : its_instance.second) { + const auto its_info = its_eventgroup.second; + for (auto its_subscription + : its_info->get_remote_subscriptions()) { + // Note: get_remote_subscription delivers a copied + // set of subscriptions. Thus, its is possible to + // to remove them within the loop. + const auto its_reliable = its_subscription->get_reliable(); + const auto its_unreliable = its_subscription->get_unreliable(); + if ((its_reliable && its_reliable->get_address() == _address) + || (its_unreliable && its_unreliable->get_address() == _address)) { + + // TODO: Check whether subscriptions to different hosts are valid. + // IF yes, we probably need to simply reset the corresponding + // endpoint instead of removing the subscription... + + if (its_reliable) { + VSOMEIP_ERROR << __func__ + << ": removing subscription to " + << std::hex << its_info->get_service() << "." + << std::hex << its_info->get_instance() << "." + << std::hex << its_info->get_eventgroup() + << " from target " + << its_reliable->get_address().to_string() << ":" + << std::dec << its_reliable->get_port(); + } + if (its_unreliable) { + VSOMEIP_ERROR << __func__ + << ": removing subscription to " + << std::hex << its_info->get_service() << "." + << std::hex << its_info->get_instance() << "." + << std::hex << its_info->get_eventgroup() + << " from target " + << its_unreliable->get_address().to_string() << ":" + << std::dec << its_unreliable->get_port(); + } - for (auto &its_endpoint : its_invalid_endpoints) { - its_eventgroup.second->remove_target(its_endpoint); - client_t its_client = find_client(its_service.first, - its_instance.first, its_eventgroup.second, - its_endpoint); - clear_remote_subscriber(its_service.first, - its_instance.first, its_client, its_endpoint); - - std::set<std::shared_ptr<event> > its_events; - if (its_eventgroup.second->get_targets().size() == 0) { - its_events = its_eventgroup.second->get_events(); + on_remote_unsubscribe(its_subscription); } - subscriptions_to_expire_.push_back({its_service.first, - its_instance.first, - its_eventgroup.first, - its_endpoint, - its_client, - its_events, - its_eventgroup.second}); - } - if(its_eventgroup.second->is_multicast() && its_invalid_endpoints.size() && - 0 == its_eventgroup.second->get_unreliable_target_count() ) { - //clear multicast targets if no subscriber is left for multicast eventgroup - its_eventgroup.second->clear_multicast_targets(); } } } } } - - for (const auto &s : subscriptions_to_expire_) { - if (s.invalid_endpoint_) { - for (const auto e: s.events_) { - if (e->is_shadow()) { - e->unset_payload(); - } - } - const client_t its_hosting_client = find_local_client( - s.service_id_, s.instance_id_); - if (its_hosting_client != VSOMEIP_ROUTING_CLIENT) { - const pending_subscription_t its_pending_unsubscription( - std::shared_ptr<sd_message_identifier_t>(), - s.invalid_endpoint_, s.invalid_endpoint_, - 0, s.client_); - pending_subscription_id_t its_pending_unsubscription_id = - s.eventgroupinfo_->add_pending_subscription( - its_pending_unsubscription); - if (its_pending_unsubscription_id != DEFAULT_SUBSCRIPTION) { - send_unsubscription(its_hosting_client, s.client_, - s.service_id_, s.instance_id_, s.eventgroup_id_, - its_pending_unsubscription_id); - } - } - } - } } void routing_manager_impl::init_routing_info() { @@ -2863,200 +2580,194 @@ void routing_manager_impl::init_routing_info() { its_address, its_unreliable_port); if(its_reliable_port != ILLEGAL_PORT) { - find_or_create_remote_client(i.first, i.second, true, VSOMEIP_ROUTING_CLIENT); + ep_mgr_impl_->find_or_create_remote_client( + i.first, i.second, true, VSOMEIP_ROUTING_CLIENT); } if(its_unreliable_port != ILLEGAL_PORT) { - find_or_create_remote_client(i.first, i.second, false, VSOMEIP_ROUTING_CLIENT); + ep_mgr_impl_->find_or_create_remote_client( + i.first, i.second, false, VSOMEIP_ROUTING_CLIENT); } } } } -void routing_manager_impl::on_remote_subscription( - service_t _service, instance_t _instance, eventgroup_t _eventgroup, - const std::shared_ptr<endpoint_definition> &_subscriber, - const std::shared_ptr<endpoint_definition> &_target, ttl_t _ttl, - const std::shared_ptr<sd_message_identifier_t> &_sd_message_id, - const std::function<void(remote_subscription_state_e, client_t)>& _callback) { - std::shared_ptr<eventgroupinfo> its_eventgroup - = find_eventgroup(_service, _instance, _eventgroup); - client_t its_subscribing_client(ILLEGAL_CLIENT); - if (!its_eventgroup) { - VSOMEIP_ERROR << "REMOTE SUBSCRIBE: attempt to subscribe to unknown eventgroup [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" - << " from " << _subscriber->get_address().to_string() - << ":" << std::dec << _subscriber->get_port() - << (_subscriber->is_reliable() ? " reliable" : " unreliable"); - _callback(remote_subscription_state_e::SUBSCRIPTION_ERROR, its_subscribing_client); +void routing_manager_impl::on_remote_subscribe( + std::shared_ptr<remote_subscription> &_subscription, + const remote_subscription_callback_t &_callback) { + auto its_eventgroupinfo = _subscription->get_eventgroupinfo(); + if (!its_eventgroupinfo) { + VSOMEIP_ERROR << __func__ << " eventgroupinfo is invalid"; return; } - // find out client id for selective subscriber - if (!_subscriber->is_reliable()) { - uint16_t unreliable_port = configuration_->get_unreliable_port(_service, _instance); - _subscriber->set_remote_port(unreliable_port); - if (!its_eventgroup->is_multicast()) { - auto endpoint = find_server_endpoint(unreliable_port, false); - if (endpoint) { - its_subscribing_client = std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint)-> - get_client(_subscriber); - } - } - } else { - uint16_t reliable_port = configuration_->get_reliable_port(_service, _instance); - _subscriber->set_remote_port(reliable_port); - auto endpoint = find_server_endpoint(reliable_port, true); - if (endpoint) { - its_subscribing_client = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(endpoint)-> - get_client(_subscriber); - } - } - std::unique_lock<std::mutex> eventgroup_lock(its_eventgroup->get_subscription_lock()); - const std::chrono::steady_clock::time_point its_expiration = - std::chrono::steady_clock::now() + std::chrono::seconds(_ttl); - if (its_eventgroup->update_target(_subscriber, its_expiration)) { - _callback(remote_subscription_state_e::SUBSCRIPTION_ACKED,its_subscribing_client); - return; - } else { - const pending_subscription_id_t its_subscription_id = - its_eventgroup->add_pending_subscription( - pending_subscription_t(_sd_message_id, _subscriber, - _target, _ttl, its_subscribing_client)); - if (its_subscription_id != DEFAULT_SUBSCRIPTION) { - // only sent subscription to rm_proxy / hosting application if there's - // no subscription for this eventgroup from the same remote subscriber - // already pending - const client_t its_offering_client = find_local_client(_service, _instance); - send_subscription(its_offering_client, its_subscribing_client, - _service, _instance, _eventgroup, - its_eventgroup->get_major(), its_subscription_id); - } else { - VSOMEIP_WARNING << __func__ << " a remote subscription is already pending [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" - << " from " << _subscriber->get_address().to_string() - << ":" << std::dec << _subscriber->get_port() - << (_subscriber->is_reliable() ? " reliable" : " unreliable"); - } - _callback(remote_subscription_state_e::SUBSCRIPTION_PENDING, its_subscribing_client); + const ttl_t its_ttl = _subscription->get_ttl(); + + const auto its_service = its_eventgroupinfo->get_service(); + const auto its_instance = its_eventgroupinfo->get_instance(); + const auto its_eventgroup = its_eventgroupinfo->get_eventgroup(); + const auto its_major = its_eventgroupinfo->get_major(); + + // Get remote port(s) + auto its_reliable = _subscription->get_reliable(); + if (its_reliable) { + uint16_t its_port + = configuration_->get_reliable_port(its_service, its_instance); + its_reliable->set_remote_port(its_port); + } + + auto its_unreliable = _subscription->get_unreliable(); + if (its_unreliable) { + uint16_t its_port + = configuration_->get_unreliable_port(its_service, its_instance); + its_unreliable->set_remote_port(its_port); + } + + // Calculate expiration time + const std::chrono::steady_clock::time_point its_expiration + = std::chrono::steady_clock::now() + std::chrono::seconds(its_ttl); + + // Try to update the subscription. This will fail, if the subscription does + // not exist or is still (partly) pending. + remote_subscription_id_t its_id; + std::set<client_t> its_added; + auto its_result = its_eventgroupinfo->update_remote_subscription( + _subscription, its_expiration, its_added, its_id, true); + if (its_result) { + if (!_subscription->is_pending()) { // resubscription without change + _callback(_subscription); + } else if (!its_added.empty()) { // new clients for a selective subscription + const client_t its_offering_client + = find_local_client(its_service, its_instance); + send_subscription(its_offering_client, + its_service, its_instance, its_eventgroup, its_major, + its_added, _subscription->get_id()); + } else { // identical subscription is not yet processed + std::stringstream its_warning; + its_warning << __func__ << " a remote subscription is already pending [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]" + << " from "; + if (its_reliable && its_unreliable) + its_warning << "["; + if (its_reliable) + its_warning << its_reliable->get_address().to_string() + << ":" << std::dec << its_reliable->get_port(); + if (its_reliable && its_unreliable) + its_warning << ", "; + if (its_unreliable) + its_warning << its_unreliable->get_address().to_string() + << ":" << std::dec << its_unreliable->get_port(); + if (its_reliable && its_unreliable) + its_warning << "]"; + VSOMEIP_WARNING << its_warning.str(); + } + } else { // new subscription + auto its_id + = its_eventgroupinfo->add_remote_subscription(_subscription); + + const client_t its_offering_client + = find_local_client(its_service, its_instance); + send_subscription(its_offering_client, + its_service, its_instance, its_eventgroup, its_major, + _subscription->get_clients(), its_id); + } +} + +void routing_manager_impl::on_remote_unsubscribe( + std::shared_ptr<remote_subscription> &_subscription) { + std::shared_ptr<eventgroupinfo> its_info + = _subscription->get_eventgroupinfo(); + if (!its_info) { + VSOMEIP_ERROR << __func__ + << ": Received Unsubscribe for unregistered eventgroup."; return; } - _callback(remote_subscription_state_e::SUBSCRIPTION_ERROR, its_subscribing_client); -} -void routing_manager_impl::on_unsubscribe(service_t _service, - instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr<endpoint_definition> _target) { - std::shared_ptr<eventgroupinfo> its_eventgroup = find_eventgroup(_service, - _instance, _eventgroup); - if (its_eventgroup) { - client_t its_client = find_client(_service, _instance, its_eventgroup, _target); - const pending_subscription_t its_pending_unsubscription( - std::shared_ptr<sd_message_identifier_t>(), _target, _target, - 0, its_client); - pending_subscription_id_t its_pending_unsubscription_id = - its_eventgroup->add_pending_subscription( - its_pending_unsubscription); - - its_eventgroup->remove_target(_target); - clear_remote_subscriber(_service, _instance, its_client, _target); - - if (its_pending_unsubscription_id != DEFAULT_SUBSCRIPTION) { - // there are no pending (un)subscriptions - const client_t its_offering_client = find_local_client(_service, _instance); - send_unsubscription(its_offering_client, its_client, _service, - _instance, _eventgroup, its_pending_unsubscription_id); - } + const auto its_service = its_info->get_service(); + const auto its_instance = its_info->get_instance(); + const auto its_eventgroup = its_info->get_eventgroup(); + const auto its_major = its_info->get_major(); - if (its_eventgroup->get_targets().size() == 0) { - std::set<std::shared_ptr<event> > its_events - = its_eventgroup->get_events(); - for (auto e : its_events) { - if (e->is_shadow()) { - e->unset_payload(); - } - } - } - VSOMEIP_INFO << "REMOTE UNSUBSCRIBE(" - << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" - << " from " << _target->get_address().to_string() - << ":" << std::dec <<_target->get_port() - << (_target->is_reliable() ? " reliable" : " unreliable"); + auto its_subscriber = _subscription->get_subscriber(); - } else { - VSOMEIP_ERROR << "REMOTE UNSUBSCRIBE: attempt to subscribe to unknown eventgroup [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" - << " from " << _target->get_address().to_string() - << ":" << std::dec <<_target->get_port() - << (_target->is_reliable() ? " reliable" : " unreliable"); + // Get remote port(s) + auto its_reliable = _subscription->get_reliable(); + if (its_reliable) { + uint16_t its_port + = configuration_->get_reliable_port(its_service, its_instance); + its_reliable->set_remote_port(its_port); } -} - -void routing_manager_impl::on_subscribe_ack(service_t _service, - instance_t _instance, const boost::asio::ip::address &_address, - uint16_t _port) { - bool multicast_known(false); - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - const auto found_service = multicast_info.find(_service); - if (found_service != multicast_info.end()) { - const auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - const auto& endpoint_def = found_instance->second; - if (endpoint_def->get_address() == _address && - endpoint_def->get_port() == _port) { - // Multicast info and endpoint already created before - // This can happen when more than one client subscribe on the same instance! - multicast_known = true; - } - } - } - if (!multicast_known) { - // Save multicast info to be able to delete the endpoint - // as soon as the instance stops offering its service - std::shared_ptr<endpoint_definition> endpoint_def = - endpoint_definition::get(_address, _port, false, _service, _instance); - multicast_info[_service][_instance] = endpoint_def; - } + auto its_unreliable = _subscription->get_unreliable(); + if (its_unreliable) { + uint16_t its_port + = configuration_->get_unreliable_port(its_service, its_instance); + its_unreliable->set_remote_port(its_port); } - const bool is_someip = configuration_->is_someip(_service, _instance); - // Create multicast endpoint & join multicase group - std::shared_ptr<endpoint> its_endpoint - = find_or_create_server_endpoint(_port, false, is_someip); - if (its_endpoint) { - if (!multicast_known) { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - service_instances_[_service][its_endpoint.get()] = _instance; - } - its_endpoint->join(_address.to_string()); - } else { - VSOMEIP_ERROR<<"Could not find/create multicast endpoint!"; + remote_subscription_id_t its_id(0); + std::set<client_t> its_removed; + auto its_result = its_info->update_remote_subscription( + _subscription, std::chrono::steady_clock::now(), + its_removed, its_id, false); + + if (its_result) { + const client_t its_offering_client + = find_local_client(its_service, its_instance); + send_unsubscription(its_offering_client, + its_service, its_instance, its_eventgroup, its_major, + its_removed, its_id); } } +void routing_manager_impl::on_subscribe_ack_with_multicast( + service_t _service, instance_t _instance, + const boost::asio::ip::address &_address, uint16_t _port) { + ep_mgr_impl_->find_or_create_multicast_endpoint(_service, + _instance, _address, _port); +} + void routing_manager_impl::on_subscribe_ack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event, pending_subscription_id_t _subscription_id) { - client_t its_client = is_specific_endpoint_client(_client, _service, _instance); - bool specific_endpoint_client = its_client != VSOMEIP_ROUTING_CLIENT; + event_t _event, remote_subscription_id_t _id) { + std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); if (its_eventgroup) { - std::unique_lock<std::mutex> eventgroup_lock(its_eventgroup->get_subscription_lock()); - if (_subscription_id == DEFAULT_SUBSCRIPTION) { - // ACK coming in via SD from remote or as answer to a subscription - // of the application hosting the rm_impl to a local service - auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, its_client); - std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); - auto its_state = remote_subscription_state_.find(its_tuple); + auto its_subscription = its_eventgroup->get_remote_subscription(_id); + if (its_subscription) { + its_subscription->set_client_state(_client, + remote_subscription_state_e::SUBSCRIPTION_ACKED); + + auto its_parent = its_subscription->get_parent(); + if (its_parent) { + its_parent->set_client_state(_client, + remote_subscription_state_e::SUBSCRIPTION_ACKED); + if (!its_subscription->is_pending()) { + its_eventgroup->remove_remote_subscription(_id); + } + } + + if (discovery_) { + std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); + remote_subscribers_[_service][_instance][VSOMEIP_ROUTING_CLIENT].insert( + its_subscription->get_subscriber()); + discovery_->update_remote_subscription(its_subscription); + + VSOMEIP_INFO << "REMOTE SUBSCRIBE(" + << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " from " << its_subscription->get_subscriber()->get_address() + << ":" << std::dec << its_subscription->get_subscriber()->get_port() + << (its_subscription->get_subscriber()->is_reliable() ? " reliable" : " unreliable") + << " was accepted"; + + return; + } + } else { + const auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _client); + const auto its_state = remote_subscription_state_.find(its_tuple); if (its_state != remote_subscription_state_.end()) { if (its_state->second == subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) { // Already notified! @@ -3064,385 +2775,79 @@ void routing_manager_impl::on_subscribe_ack(client_t _client, } } remote_subscription_state_[its_tuple] = subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED; - } else { // ACK sent back from local client as answer to a remote subscription - const client_t its_offering_client = find_local_client(_service, _instance); - if (its_offering_client == VSOMEIP_ROUTING_CLIENT) { - // service was stopped while subscription was pending - // send subscribe_nack back instead - eventgroup_lock.unlock(); - on_subscribe_nack(_client, _service, _instance, _eventgroup, - _event, _subscription_id); - return; - } - if (discovery_) { - std::vector<pending_subscription_t> its_pending_subscriptions = - its_eventgroup->remove_pending_subscription(_subscription_id); - for (const pending_subscription_t& its_sd_message_id : its_pending_subscriptions) { - if (its_sd_message_id.ttl_ > 0) { - if (its_sd_message_id.sd_message_identifier_ - && its_sd_message_id.subscriber_ - && its_sd_message_id.target_) { - { - std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); - remote_subscribers_[_service][_instance][_client].insert(its_sd_message_id.subscriber_); - } - const std::chrono::steady_clock::time_point its_expiration = - std::chrono::steady_clock::now() - + std::chrono::seconds( - its_sd_message_id.ttl_); - // IP address of target is a multicast address if the event is in a multicast eventgroup - if (its_eventgroup->is_multicast() - && !its_sd_message_id.subscriber_->is_reliable()) { - // Event is in multicast eventgroup and subscribe for UDP - its_eventgroup->add_target( - { its_sd_message_id.target_, its_expiration }, - { its_sd_message_id.subscriber_, its_expiration }); - } else { - // subscribe for TCP or UDP - its_eventgroup->add_target( - { its_sd_message_id.subscriber_, its_expiration }); - } - discovery_->remote_subscription_acknowledge(_service, - _instance, _eventgroup, _client, true, - its_sd_message_id.sd_message_identifier_); + } - VSOMEIP_INFO << "REMOTE SUBSCRIBE(" - << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" - << " from " << its_sd_message_id.subscriber_->get_address().to_string() - << ":" << std::dec << its_sd_message_id.subscriber_->get_port() - << (its_sd_message_id.subscriber_->is_reliable() ? " reliable" : " unreliable") - << " was accepted"; - } - } else { // unsubscription was queued while subscription was pending -> send it to client - send_unsubscription(its_offering_client, - its_sd_message_id.subscribing_client_, - _service, _instance, _eventgroup, - its_sd_message_id.pending_subscription_id_); - } + std::set<client_t> subscribed_clients; + if (_client == VSOMEIP_ROUTING_CLIENT) { + for (const auto &its_event : its_eventgroup->get_events()) { + if (_event == ANY_EVENT || _event == its_event->get_event()) { + const auto &its_subscribers = its_event->get_subscribers(); + subscribed_clients.insert(its_subscribers.begin(), its_subscribers.end()); } } - return; + } else { + subscribed_clients.insert(_client); } - if (specific_endpoint_client) { - if (_client == get_client()) { - host_->on_subscription_error(_service, _instance, _eventgroup, 0x0 /*OK*/); + for (const auto &its_subscriber : subscribed_clients) { + if (its_subscriber == get_client()) { if (_event == ANY_EVENT) { - for (const auto &its_event : its_eventgroup->get_events()) - host_->on_subscription_status(_service, _instance, - _eventgroup, its_event->get_event(), 0x0 /*OK*/); - } else { - host_->on_subscription_status(_service, _instance, - _eventgroup, _event, 0x0 /*OK*/); - } - } else { - stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, - _event); - } - } else { - std::set<client_t> subscribed_clients; - for (auto its_event : its_eventgroup->get_events()) { - for (auto its_client : its_event->get_subscribers()) { - subscribed_clients.insert(its_client); - } - } - for (auto its_subscriber : subscribed_clients) { - if (its_subscriber == get_client()) { - host_->on_subscription_error(_service, _instance, - _eventgroup, 0x0 /*OK*/); - for (auto its_event : its_eventgroup->get_events()) { + for (const auto &its_event : its_eventgroup->get_events()) { host_->on_subscription_status(_service, _instance, _eventgroup, its_event->get_event(), 0x0 /*OK*/); } } else { - stub_->send_subscribe_ack(its_subscriber, _service, - _instance, _eventgroup, _event); - } - } - } - } -} - -void routing_manager_impl::on_subscribe_nack(client_t _client, - service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event, pending_subscription_id_t _subscription_id) { - client_t its_client = is_specific_endpoint_client(_client, _service, _instance); - bool specific_endpoint_client = its_client != VSOMEIP_ROUTING_CLIENT; - auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); - if (its_eventgroup) { - std::unique_lock<std::mutex> eventgroup_lock(its_eventgroup->get_subscription_lock()); - if (_subscription_id == DEFAULT_SUBSCRIPTION) { - // NACK coming in via SD from remote or as answer to a subscription - // of the application hosting the rm_impl to a local service - auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, its_client); - std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); - auto its_state = remote_subscription_state_.find(its_tuple); - if (its_state != remote_subscription_state_.end()) { - if (its_state->second == subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED) { - // Already notified! - return; - } - } - remote_subscription_state_[its_tuple] = subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED; - } else { // NACK sent back from local client as answer to a remote subscription - if (discovery_) { - std::vector<pending_subscription_t> its_pending_subscriptions = - its_eventgroup->remove_pending_subscription(_subscription_id); - for (const pending_subscription_t& its_sd_message_id : its_pending_subscriptions) { - if (its_sd_message_id.ttl_ > 0) { - if (its_sd_message_id.sd_message_identifier_ && its_sd_message_id.subscriber_) { - discovery_->remote_subscription_acknowledge(_service, _instance, - _eventgroup, _client, false, its_sd_message_id.sd_message_identifier_); - VSOMEIP_INFO << "REMOTE SUBSCRIBE(" - << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" - << " from " << its_sd_message_id.subscriber_->get_address().to_string() - << ":" << std::dec <<its_sd_message_id.subscriber_->get_port() - << (its_sd_message_id.subscriber_->is_reliable() ? " reliable" : " unreliable") - << " was not accepted"; - } - } else { // unsubscription was queued while subscription was pending -> send it to client - const client_t its_offering_client = find_local_client(_service, _instance); - send_unsubscription(its_offering_client, - its_sd_message_id.subscribing_client_, _service, - _instance, _eventgroup, - its_sd_message_id.pending_subscription_id_); - } + host_->on_subscription_status(_service, _instance, + _eventgroup, _event, 0x0 /*OK*/); } - } - return; - } - if (specific_endpoint_client) { - if (_client == get_client()) { - host_->on_subscription_error(_service, _instance, _eventgroup, 0x7 /*Rejected*/); - host_->on_subscription_status(_service, _instance, _eventgroup, _event, 0x7 /*Rejected*/); } else { - stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup, - _event); - } - } else { - std::set<client_t> subscribed_clients; - for (auto its_event : its_eventgroup->get_events()) { - for (auto its_client : its_event->get_subscribers()) { - subscribed_clients.insert(its_client); - } - } - for (auto its_subscriber : subscribed_clients) { - if (its_subscriber == get_client()) { - host_->on_subscription_error(_service, _instance, - _eventgroup, 0x7 /*Rejected*/); - for (auto its_event : its_eventgroup->get_events()) { - host_->on_subscription_status(_service, _instance, - _eventgroup, its_event->get_event(), - 0x7 /*Rejected*/); - } - } else { - stub_->send_subscribe_nack(its_subscriber, _service, - _instance, _eventgroup, _event); - } + stub_->send_subscribe_ack(its_subscriber, _service, + _instance, _eventgroup, _event); } } - } + } } -bool routing_manager_impl::deliver_specific_endpoint_message(service_t _service, - instance_t _instance, const byte_t *_data, length_t _size, endpoint *_receiver) { - client_t its_client(0x0); - - // Try to deliver specific endpoint message (for selective subscribers) - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - auto found_servic = remote_services_.find(_service); - if (found_servic != remote_services_.end()) { - auto found_instance = found_servic->second.find(_instance); - if (found_instance != found_servic->second.end()) { - for (auto client_entry : found_instance->second) { - if (!client_entry.first) { - continue; - } - auto found_reliability = client_entry.second.find(_receiver->is_reliable()); - if (found_reliability != client_entry.second.end()) { - auto found_enpoint = found_reliability->second; - if (found_enpoint.get() == _receiver) { - its_client = client_entry.first; - break; - } - } - } - } - } - } - if (its_client) { - if (its_client != get_client()) { - auto local_endpoint = find_local(its_client); - if (local_endpoint) { - send_local(local_endpoint, its_client, _data, _size, _instance, true, - _receiver->is_reliable(), VSOMEIP_SEND); - } - } else { - deliver_message(_data, _size, _instance, _receiver->is_reliable(), VSOMEIP_ROUTING_CLIENT, true, true); - } - return true; - } - - return false; +std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client( + service_t _service, instance_t _instance, bool _reliable, + client_t _client) { + return ep_mgr_impl_->find_or_create_remote_client(_service, + _instance, _reliable, _client); } -void routing_manager_impl::clear_client_endpoints(service_t _service, instance_t _instance, - bool _reliable) { - auto its_specific_endpoint_clients = get_specific_endpoint_clients(_service, _instance); - std::shared_ptr<endpoint> endpoint_to_delete; - bool other_services_reachable_through_endpoint(false); - std::vector<std::shared_ptr<endpoint>> its_specific_endpoints; - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - // Clear client endpoints for remote services (generic and specific ones) - if (remote_services_.find(_service) != remote_services_.end()) { - if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) { - auto endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT][_reliable]; - if (endpoint) { - service_instances_[_service].erase(endpoint.get()); - endpoint_to_delete = endpoint; - } - remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].erase(_reliable); - auto found_endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].find( - !_reliable); - if (found_endpoint == remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].end()) { - remote_services_[_service][_instance].erase(VSOMEIP_ROUTING_CLIENT); - } - // erase specific client endpoints - for (const client_t &client : its_specific_endpoint_clients) { - auto endpoint = remote_services_[_service][_instance][client][_reliable]; - if (endpoint) { - service_instances_[_service].erase(endpoint.get()); - its_specific_endpoints.push_back(endpoint); - } - remote_services_[_service][_instance][client].erase(_reliable); - auto found_endpoint = remote_services_[_service][_instance][client].find(!_reliable); - if (found_endpoint == remote_services_[_service][_instance][client].end()) { - remote_services_[_service][_instance].erase(client); - } - } - } - } - if (remote_services_.find(_service) != remote_services_.end()) { - if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) { - if (!remote_services_[_service][_instance].size()) { - remote_services_[_service].erase(_instance); - if (0 >= remote_services_[_service].size()) { - remote_services_.erase(_service); - } - } - } - } - - if (!service_instances_[_service].size()) { - service_instances_.erase(_service); - } - - // Only stop and delete the endpoint if none of the services - // reachable through it is online anymore. - if (endpoint_to_delete) { - for (const auto& service : remote_services_) { - for (const auto& instance : service.second) { - const auto& client = instance.second.find(VSOMEIP_ROUTING_CLIENT); - if (client != instance.second.end()) { - for (const auto& reliable : client->second) { - if (reliable.second == endpoint_to_delete) { - other_services_reachable_through_endpoint = true; - break; - } - } - } - if (other_services_reachable_through_endpoint) { break; } - } - if (other_services_reachable_through_endpoint) { break; } - } +void routing_manager_impl::on_subscribe_nack(client_t _client, + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + event_t _event, remote_subscription_id_t _id) { + (void)_event; // TODO: Remove completely? - if (!other_services_reachable_through_endpoint) { - std::uint16_t its_port(0); - boost::asio::ip::address its_address; - if (_reliable) { - std::shared_ptr<tcp_client_endpoint_impl> ep = - std::dynamic_pointer_cast<tcp_client_endpoint_impl>(endpoint_to_delete); - if (ep) { - its_port = ep->get_remote_port(); - ep->get_remote_address(its_address); - } - } else { - std::shared_ptr<udp_client_endpoint_impl> ep = - std::dynamic_pointer_cast<udp_client_endpoint_impl>(endpoint_to_delete); - if (ep) { - its_port = ep->get_remote_port(); - ep->get_remote_address(its_address); - } - } - const auto found_ip = client_endpoints_by_ip_.find(its_address); - if (found_ip != client_endpoints_by_ip_.end()) { - const auto found_port = found_ip->second.find(its_port); - if (found_port != found_ip->second.end()) { - const auto found_reliable = found_port->second.find(_reliable); - if (found_reliable != found_port->second.end()) { - if (found_reliable->second == endpoint_to_delete) { - found_port->second.erase(_reliable); - // delete if necessary - if (!found_port->second.size()) { - found_ip->second.erase(found_port); - if (!found_ip->second.size()) { - client_endpoints_by_ip_.erase(found_ip); - } - } - } - } - } + auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); + if (its_eventgroup) { + auto its_subscription = its_eventgroup->get_remote_subscription(_id); + if (its_subscription) { + its_subscription->set_client_state(_client, + remote_subscription_state_e::SUBSCRIPTION_NACKED); + + auto its_parent = its_subscription->get_parent(); + if (its_parent) { + its_parent->set_client_state(_client, + remote_subscription_state_e::SUBSCRIPTION_NACKED); + if (!its_subscription->is_pending()) { + its_eventgroup->remove_remote_subscription(_id); } } - } - } - if (!other_services_reachable_through_endpoint && endpoint_to_delete) { - endpoint_to_delete->stop(); - } - for (const auto &specific_endpoint : its_specific_endpoints) { - specific_endpoint->stop(); - } -} -void routing_manager_impl::clear_multicast_endpoints(service_t _service, instance_t _instance) { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - // Clear multicast info and endpoint and multicast instance (remote service) - if (multicast_info.find(_service) != multicast_info.end()) { - if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) { - std::string address = multicast_info[_service][_instance]->get_address().to_string(); - uint16_t port = multicast_info[_service][_instance]->get_port(); - std::shared_ptr<endpoint> multicast_endpoint; - auto found_port = server_endpoints_.find(port); - if (found_port != server_endpoints_.end()) { - auto found_unreliable = found_port->second.find(false); - if (found_unreliable != found_port->second.end()) { - multicast_endpoint = found_unreliable->second; - multicast_endpoint->leave(address); - multicast_endpoint->stop(); - server_endpoints_[port].erase(false); - } - if (found_port->second.find(true) == found_port->second.end()) { - server_endpoints_.erase(port); - } - } - multicast_info[_service].erase(_instance); - if (0 >= multicast_info[_service].size()) { - multicast_info.erase(_service); - } - // Clear service_instances_ for multicase endpoint - if (1 >= service_instances_[_service].size()) { - service_instances_.erase(_service); - } else if (multicast_endpoint) { - service_instances_[_service].erase(multicast_endpoint.get()); + if (discovery_) { + discovery_->update_remote_subscription(its_subscription); + VSOMEIP_INFO << "REMOTE SUBSCRIBE(" + << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " from " << its_subscription->get_subscriber()->get_address() + << ":" << std::dec << its_subscription->get_subscriber()->get_port() + << (its_subscription->get_subscriber()->is_reliable() ? " reliable" : " unreliable") + << " was not accepted"; } } } @@ -3495,7 +2900,7 @@ return_code_e routing_manager_impl::check_error(const byte_t *_data, length_t _s void routing_manager_impl::send_error(return_code_e _return_code, const byte_t *_data, length_t _size, instance_t _instance, bool _reliable, - endpoint *_receiver, + endpoint* const _receiver, const boost::asio::ip::address &_remote_address, std::uint16_t _remote_port) { @@ -3535,150 +2940,39 @@ void routing_manager_impl::send_error(return_code_e _return_code, error_message->set_service(its_service); error_message->set_session(its_session); { - std::lock_guard<std::mutex> its_lock(serialize_mutex_); - if (serializer_->serialize(error_message.get())) { + std::shared_ptr<serializer> its_serializer(get_serializer()); + if (its_serializer->serialize(error_message.get())) { if (_receiver) { auto its_endpoint_def = std::make_shared<endpoint_definition>( _remote_address, _remote_port, _receiver->is_reliable()); its_endpoint_def->set_remote_port(_receiver->get_local_port()); - send_to(its_endpoint_def, serializer_->get_data(), - serializer_->get_size(), _instance, true); - } - serializer_->reset(); - } else { - VSOMEIP_ERROR<< "Failed to serialize error message."; - } - } -} - -void routing_manager_impl::on_identify_response(client_t _client, service_t _service, - instance_t _instance, bool _reliable) { - { - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - auto its_service = identifying_clients_.find(_service); - if (its_service != identifying_clients_.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - auto its_reliable = its_instance->second.find(_reliable); - if (its_reliable != its_instance->second.end()) { - its_reliable->second.erase(_client); - } - } - } - identified_clients_[_service][_instance][_reliable].insert(_client); - } - discovery_->send_subscriptions(_service, _instance, _client, _reliable); -} - -void routing_manager_impl::identify_for_subscribe(client_t _client, - service_t _service, instance_t _instance, major_version_t _major, - subscription_type_e _subscription_type) { - (void)_subscription_type; - if (!has_identified(_client, _service, _instance, false) - && !is_identifying(_client, _service, _instance, false)) { - send_identify_message(_client, _service, _instance, _major, false); - } - if (!has_identified(_client, _service, _instance, true) - && !is_identifying(_client, _service, _instance, true)) { - send_identify_message(_client, _service, _instance, _major, true); - } -} - -bool routing_manager_impl::send_identify_message(client_t _client, - service_t _service, - instance_t _instance, - major_version_t _major, - bool _reliable) { - auto its_endpoint = find_or_create_remote_client(_service, _instance, - _reliable, _client); - if (!its_endpoint) { - VSOMEIP_WARNING << "routing_manager_impl::send_identify_message: " - << "No " << (_reliable ? "reliable" : "unreliable") - << " route for identify message to service/instance " - << std::hex << _service << "/" << _instance << " for client " - << _client; - return false; - } - { - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - identifying_clients_[_service][_instance][_reliable].insert(_client); - } - - if (_client == get_client()) { - send_identify_request(_service, _instance, _major, _reliable); - } else { - stub_->send_identify_request_command(find_local(_client), - _service, _instance, _major, _reliable); - } - - return true; -} - - -bool routing_manager_impl::supports_selective(service_t _service, instance_t _instance) { - bool supports_selective(false); - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - auto its_service = remote_service_info_.find(_service); - if (its_service != remote_service_info_.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - for (auto its_reliable : its_instance->second) { - supports_selective |= configuration_-> - supports_selective_broadcasts( - its_reliable.second->get_address()); - } - } - } - return supports_selective; -} - -bool routing_manager_impl::is_identifying(client_t _client, service_t _service, - instance_t _instance, bool _reliable) { - if (!supports_selective(_service, _instance)) { - // For legacy selective services clients can't be identified! - return false; - } - bool is_identifieing(false); - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - auto its_service = identifying_clients_.find(_service); - if (its_service != identifying_clients_.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - auto its_reliable = its_instance->second.find(_reliable); - if (its_reliable != its_instance->second.end()) { - auto its_client = its_reliable->second.find(_client); - if (its_client != its_reliable->second.end()) { - is_identifieing = true; - } - } - } - } - return is_identifieing; -} + std::shared_ptr<endpoint> its_endpoint = + ep_mgr_impl_->find_server_endpoint( + its_endpoint_def->get_remote_port(), + its_endpoint_def->is_reliable()); + if (its_endpoint) { + #ifdef USE_DLT + const uint16_t its_data_size + = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); -bool routing_manager_impl::has_identified(client_t _client, service_t _service, - instance_t _instance, bool _reliable) { - if (!supports_selective(_service, _instance)) { - // For legacy selective services clients can't be identified! - return true; - } - bool has_identified(false); - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - auto its_service = identified_clients_.find(_service); - if (its_service != identified_clients_.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - auto its_reliable = its_instance->second.find(_reliable); - if (its_reliable != its_instance->second.end()) { - auto its_client = its_reliable->second.find(_client); - if (its_client != its_reliable->second.end()) { - has_identified = true; + trace::header its_header; + if (its_header.prepare(its_endpoint, true, _instance)) + tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, + _data, its_data_size); + #else + (void) _instance; + #endif + its_endpoint->send_error(its_endpoint_def, + its_serializer->get_data(), its_serializer->get_size()); } } + its_serializer->reset(); + put_serializer(its_serializer); + } else { + VSOMEIP_ERROR<< "Failed to serialize error message."; } } - return has_identified; } void routing_manager_impl::clear_remote_subscriber( @@ -3703,19 +2997,12 @@ void routing_manager_impl::clear_remote_subscriber( std::chrono::steady_clock::time_point routing_manager_impl::expire_subscriptions(bool _force) { - struct subscriptions_info { - service_t service_id_; - instance_t instance_id_; - eventgroup_t eventgroup_id_; - std::shared_ptr<endpoint_definition> invalid_endpoint_; - client_t client_; - std::set<std::shared_ptr<event>> events_; - std::shared_ptr<eventgroupinfo> eventgroupinfo_; - }; - std::vector<struct subscriptions_info> subscriptions_to_expire_; + std::map<std::shared_ptr<remote_subscription>, + std::set<client_t> > its_expired_subscriptions; + std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); - std::chrono::steady_clock::time_point next_expiration + std::chrono::steady_clock::time_point its_next_expiration = std::chrono::steady_clock::now() + std::chrono::hours(24); { std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); @@ -3723,86 +3010,70 @@ routing_manager_impl::expire_subscriptions(bool _force) { for (auto &its_service : eventgroups_) { for (auto &its_instance : its_service.second) { for (auto &its_eventgroup : its_instance.second) { - std::set<std::shared_ptr<endpoint_definition>> its_expired_endpoints; - for (auto &its_target : its_eventgroup.second->get_targets()) { - if (_force) { - its_expired_endpoints.insert(its_target.endpoint_); - } else { - if (its_target.expiration_ < now) { - its_expired_endpoints.insert(its_target.endpoint_); - } else if (its_target.expiration_ < next_expiration) { - next_expiration = its_target.expiration_; + auto its_subscriptions + = its_eventgroup.second->get_remote_subscriptions(); + for (auto &s : its_subscriptions) { + for (auto its_client : s->get_clients()) { + if (_force) { + its_expired_subscriptions[s].insert(its_client); + } else { + auto its_expiration = s->get_expiration(its_client); + if (its_expiration != std::chrono::steady_clock::time_point()) { + if (its_expiration < now) { + its_expired_subscriptions[s].insert(its_client); + } else if (its_expiration < its_next_expiration) { + its_next_expiration = its_expiration; + } + } } } } - - for (auto its_endpoint : its_expired_endpoints) { - its_eventgroup.second->remove_target(its_endpoint); - - client_t its_client - = find_client(its_service.first, its_instance.first, - its_eventgroup.second, its_endpoint); - clear_remote_subscriber(its_service.first, its_instance.first, - its_client, its_endpoint); - - std::set<std::shared_ptr<event> > its_events; - if (its_eventgroup.second->get_targets().size() == 0) { - its_events = its_eventgroup.second->get_events(); - } - subscriptions_to_expire_.push_back({its_service.first, - its_instance.first, - its_eventgroup.first, - its_endpoint, - its_client, - its_events, - its_eventgroup.second}); - } - if(its_eventgroup.second->is_multicast() && its_expired_endpoints.size() && - 0 == its_eventgroup.second->get_unreliable_target_count() ) { - //clear multicast targets if no unreliable subscriber is left for multicast eventgroup - its_eventgroup.second->clear_multicast_targets(); - } } } } } - for (const auto &s : subscriptions_to_expire_) { - if (s.invalid_endpoint_) { - for (const auto e: s.events_) { - if (e->is_shadow()) { - e->unset_payload(); - } + for (auto &s : its_expired_subscriptions) { + auto its_info = s.first->get_eventgroupinfo(); + if (its_info) { + auto its_service = its_info->get_service(); + auto its_instance = its_info->get_instance(); + auto its_eventgroup = its_info->get_eventgroup(); + auto its_major = its_info->get_major(); + + remote_subscription_id_t its_id; + auto its_result = its_info->update_remote_subscription( + s.first, std::chrono::steady_clock::now(), + s.second, its_id, false); + if (its_result) { + const client_t its_offering_client + = find_local_client(its_service, its_instance); + send_unsubscription(its_offering_client, + its_service, its_instance, its_eventgroup, its_major, + s.second, s.first->get_id()); } - const client_t its_hosting_client = find_local_client(s.service_id_, - s.instance_id_); - - if (its_hosting_client != VSOMEIP_ROUTING_CLIENT) { - const pending_subscription_t its_pending_unsubscription( - std::shared_ptr<sd_message_identifier_t>(), - s.invalid_endpoint_, s.invalid_endpoint_, - 0, s.client_); - pending_subscription_id_t its_pending_unsubscription_id = - s.eventgroupinfo_->add_pending_subscription( - its_pending_unsubscription); - if (its_pending_unsubscription_id != DEFAULT_SUBSCRIPTION) { - send_unsubscription(its_hosting_client, s.client_, s.service_id_, - s.instance_id_, s.eventgroup_id_, - its_pending_unsubscription_id); - } + + if (s.first->get_unreliable()) { + VSOMEIP_INFO << "Expired subscription [" + << std::hex << std::setfill('0') << std::setw(4) << its_service << "." + << std::hex << std::setfill('0') << std::setw(4) << its_instance << "." + << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] unreliable from " + << s.first->get_unreliable()->get_address() << ":" + << std::dec << s.first->get_unreliable()->get_port(); } - VSOMEIP_INFO << "Expired subscription [" - << std::hex << std::setfill('0') << std::setw(4) << s.service_id_ << "." - << std::hex << std::setfill('0') << std::setw(4) << s.instance_id_ << "." - << std::hex << std::setfill('0') << std::setw(4) << s.eventgroup_id_ << "] from " - << s.invalid_endpoint_->get_address() << ":" - << std::dec << s.invalid_endpoint_->get_port() - << "(" << std::hex << std::setfill('0') << std::setw(4) << s.client_ << ") " - << _force; + if (s.first->get_reliable()) { + VSOMEIP_INFO << "Expired subscription [" + << std::hex << std::setfill('0') << std::setw(4) << its_service << "." + << std::hex << std::setfill('0') << std::setw(4) << its_instance << "." + << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] reliable from " + << s.first->get_reliable()->get_address() << ":" + << std::dec << s.first->get_reliable()->get_port(); + } } } - return next_expiration; + + return its_next_expiration; } void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const & _error) { @@ -3811,6 +3082,9 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const #ifndef VSOMEIP_VERSION #define VSOMEIP_VERSION "unknown version" #endif + static int counter(0); + static uint32_t its_interval = configuration_->get_log_version_interval(); + bool is_diag_mode(false); if (discovery_) { @@ -3825,62 +3099,24 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const std::chrono::steady_clock::now() - last_resume_).count() << "s"; } } + VSOMEIP_INFO << "vSomeIP " << VSOMEIP_VERSION << " | (" << ((is_diag_mode == true) ? "diagnosis)" : "default)") << its_last_resume.str(); - { - std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_); - version_log_timer_.expires_from_now( - std::chrono::seconds(configuration_->get_log_version_interval())); - version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk, - this, std::placeholders::_1)); - } - } -} - -#ifndef WITHOUT_SYSTEMD -void routing_manager_impl::watchdog_cbk(boost::system::error_code const &_error) { - if (!_error) { - static bool is_ready(false); - static bool has_interval(false); - static uint64_t its_interval(0); - - if (is_ready) { - sd_notify(0, "WATCHDOG=1"); - VSOMEIP_INFO << "Triggered systemd watchdog"; - } else { - is_ready = true; - sd_notify(0, "READY=1"); - VSOMEIP_INFO << "Sent READY to systemd watchdog"; - if (0 < sd_watchdog_enabled(0, &its_interval)) { - has_interval = true; - VSOMEIP_INFO << "systemd watchdog is enabled"; - } - } - if (has_interval) { - std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); - watchdog_timer_.expires_from_now(std::chrono::microseconds(its_interval / 2)); - watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk, - this, std::placeholders::_1)); + counter++; + if (counter == 6) { + ep_mgr_->log_client_states(); + ep_mgr_impl_->log_client_states(); + counter = 0; } - } -} -#endif -void routing_manager_impl::clear_remote_service_info(service_t _service, instance_t _instance, bool _reliable) { - // Clear remote_service_info_ - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - if (remote_service_info_.find(_service) != remote_service_info_.end()) { - if (remote_service_info_[_service].find(_instance) != remote_service_info_[_service].end()) { - remote_service_info_[_service][_instance].erase(_reliable); - auto found_endpoint_def = remote_service_info_[_service][_instance].find(!_reliable); - if (found_endpoint_def == remote_service_info_[_service][_instance].end()) { - remote_service_info_[_service].erase(_instance); - if (0 >= remote_service_info_[_service].size()) { - remote_service_info_.erase(_service); - } - } + { + std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_); + version_log_timer_.expires_from_now(std::chrono::seconds(its_interval)); + version_log_timer_.async_wait( + std::bind(&routing_manager_impl::log_version_timer_cbk, + this, std::placeholders::_1)); } } } @@ -4123,135 +3359,28 @@ void routing_manager_impl::handle_client_error(client_t _client) { } for (const auto &offer : its_offers) { offer_service(std::get<0>(offer), std::get<1>(offer), std::get<2>(offer), - std::get<3>(offer), std::get<4>(offer)); + std::get<3>(offer), std::get<4>(offer), true); } } -void routing_manager_impl::remove_specific_client_endpoint(client_t _client, service_t _service, - instance_t _instance, bool _reliable) { - client_t its_client = is_specific_endpoint_client(_client, _service, _instance); - if (its_client != VSOMEIP_ROUTING_CLIENT) { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - if (remote_services_.find(_service) != remote_services_.end()) { - if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) { - auto endpoint = remote_services_[_service][_instance][_client][_reliable]; - if (endpoint) { - service_instances_[_service].erase(endpoint.get()); - endpoint->stop(); - } - remote_services_[_service][_instance][_client].erase(_reliable); - auto found_endpoint = remote_services_[_service][_instance][_client].find(!_reliable); - if (found_endpoint == remote_services_[_service][_instance][_client].end()) { - remote_services_[_service][_instance].erase(_client); - } - } - } - } -} - -void routing_manager_impl::clear_identified_clients( service_t _service, instance_t _instance) { - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - auto its_service = identified_clients_.find(_service); - if (its_service != identified_clients_.end()) { - auto found_instance = its_service->second.find(_instance); - if (found_instance != its_service->second.end()) { - auto found_reliable = found_instance->second.find(true); - if (found_reliable != found_instance->second.end()) { - found_reliable->second.clear(); - } - auto found_unreliable = found_instance->second.find(false); - if (found_unreliable != found_instance->second.end()) { - found_unreliable->second.clear(); - } - } - } -} - -void routing_manager_impl::clear_identifying_clients( service_t _service, instance_t _instance) { - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - auto its_service = identifying_clients_.find(_service); - if (its_service != identifying_clients_.end()) { - auto found_instance = its_service->second.find(_instance); - if (found_instance != its_service->second.end()) { - auto found_reliable = found_instance->second.find(true); - if (found_reliable != found_instance->second.end()) { - found_reliable->second.clear(); - } - auto found_unreliable = found_instance->second.find(false); - if (found_unreliable != found_instance->second.end()) { - found_unreliable->second.clear(); - } - } - } -} - -void routing_manager_impl::remove_identified_client(service_t _service, instance_t _instance, client_t _client) { - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - auto its_service = identified_clients_.find(_service); - if (its_service != identified_clients_.end()) { - auto found_instance = its_service->second.find(_instance); - if (found_instance != its_service->second.end()) { - auto found_reliable = found_instance->second.find(true); - if (found_reliable != found_instance->second.end()) { - auto found_client = found_reliable->second.find(_client); - if(found_client != found_reliable->second.end()) - found_reliable->second.erase(_client); - } - auto found_unreliable = found_instance->second.find(false); - if (found_unreliable != found_instance->second.end()) { - auto found_client = found_unreliable->second.find(_client); - if(found_client != found_unreliable->second.end()) - found_unreliable->second.erase(_client); - } - } - } -} - -void routing_manager_impl::remove_identifying_client(service_t _service, instance_t _instance, client_t _client) { - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - auto its_service = identifying_clients_.find(_service); - if (its_service != identifying_clients_.end()) { - auto found_instance = its_service->second.find(_instance); - if (found_instance != its_service->second.end()) { - auto found_reliable = found_instance->second.find(true); - if (found_reliable != found_instance->second.end()) { - auto found_client = found_reliable->second.find(_client); - if(found_client != found_reliable->second.end()) - found_reliable->second.erase(_client); - } - auto found_unreliable = found_instance->second.find(false); - if (found_unreliable != found_instance->second.end()) { - auto found_client = found_unreliable->second.find(_client); - if(found_client != found_unreliable->second.end()) - found_unreliable->second.erase(_client); - } - } - } -} - -void routing_manager_impl::unsubscribe_specific_client_at_sd( - service_t _service, instance_t _instance, client_t _client) { - client_t subscriber = is_specific_endpoint_client(_client, _service, _instance); - if (subscriber != VSOMEIP_ROUTING_CLIENT && discovery_) { - discovery_->unsubscribe_client(_service, _instance, _client); - } +std::shared_ptr<endpoint_manager_impl> routing_manager_impl::get_endpoint_manager() const { + return ep_mgr_impl_; } void routing_manager_impl::send_subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, - event_t _event, subscription_type_e _subscription_type) { - (void)_subscription_type; - auto endpoint = find_local(_service, _instance); + event_t _event) { + auto endpoint = ep_mgr_->find_local(_service, _instance); if (endpoint) { stub_->send_subscribe(endpoint, _client, - _service, _instance, _eventgroup, _major, _event, DEFAULT_SUBSCRIPTION); + _service, _instance, _eventgroup, _major, _event, PENDING_SUBSCRIPTION_ID); } } void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { if(discovery_) { switch (_routing_state) { - case vsomeip::routing_state_e::RS_SUSPENDED: + case routing_state_e::RS_SUSPENDED: { VSOMEIP_INFO << "Set routing to suspend mode, diagnosis mode is " << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); @@ -4271,7 +3400,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << " still offered by " << std::hex << std::setw(4) << std::setfill('0') << its_client; } - discovery_->stop_offer_service(its_service.first, its_instance.first, its_instance.second); + discovery_->stop_offer_service(its_instance.second); } } { @@ -4289,13 +3418,6 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { // determine existing subscriptions to remote service and send StopSubscribe for (auto its_eventgroup : get_subscribed_eventgroups(s.first, i.first)) { discovery_->unsubscribe(s.first, i.first, its_eventgroup, VSOMEIP_ROUTING_CLIENT); - auto specific_endpoint_clients = get_specific_endpoint_clients(s.first, i.first); - for (auto its_client : specific_endpoint_clients) { - discovery_->unsubscribe(s.first, i.first, its_eventgroup, its_client); - } - for (const auto &e : find_events(s.first, i.first, its_eventgroup)) { - e->clear_subscribers(); - } } const bool has_reliable(i.second->get_endpoint(true)); @@ -4308,7 +3430,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } break; } - case vsomeip::routing_state_e::RS_RESUMED: + case routing_state_e::RS_RESUMED: { VSOMEIP_INFO << "Set routing to resume mode, diagnosis mode was " << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); @@ -4337,8 +3459,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { // Trigger initial offer phase for relevant services for (const auto &its_service : get_offered_services()) { for (const auto &its_instance : its_service.second) { - discovery_->offer_service(its_service.first, - its_instance.first, its_instance.second); + discovery_->offer_service(its_instance.second); } } break; @@ -4353,8 +3474,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { for (const auto &its_instance : its_service.second) { if (host_->get_configuration()->is_someip( its_service.first, its_instance.first)) { - discovery_->stop_offer_service( - its_service.first, its_instance.first, its_instance.second); + discovery_->stop_offer_service(its_instance.second); } } } @@ -4382,8 +3502,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { for (const auto &its_instance : its_service.second) { if (host_->get_configuration()->is_someip( its_service.first, its_instance.first)) { - discovery_->offer_service(its_service.first, - its_instance.first, its_instance.second); + discovery_->offer_service(its_instance.second); } } } @@ -4446,6 +3565,10 @@ void routing_manager_impl::on_net_interface_or_route_state_changed( } void routing_manager_impl::start_ip_routing() { +#ifdef _WIN32 + if_state_running_ = true; +#endif + if (routing_ready_handler_) { routing_ready_handler_(); } @@ -4506,8 +3629,8 @@ routing_manager_impl::get_subscribed_eventgroups( if (found_service != eventgroups_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { - for (auto its_group : found_instance->second) { - for (auto its_event : its_group.second->get_events()) { + for (const auto& its_group : found_instance->second) { + for (const auto& its_event : its_group.second->get_events()) { if (its_event->has_subscriber(its_group.first, ANY_CLIENT)) { its_eventgroups.insert(its_group.first); } @@ -4540,7 +3663,7 @@ void routing_manager_impl::clear_targets_and_pending_sub_from_eventgroups( its_eventgroup.first, its_subscriber); } - client_t its_client = is_specific_endpoint_client(its_subscriber, _service, _instance); + client_t its_client = VSOMEIP_ROUTING_CLIENT; //is_specific_endpoint_client(its_subscriber, _service, _instance); { std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); const auto its_tuple = @@ -4551,8 +3674,9 @@ void routing_manager_impl::clear_targets_and_pending_sub_from_eventgroups( } its_events.push_back(its_event); } - its_eventgroup.second->clear_targets(); - its_eventgroup.second->clear_pending_subscriptions(); + // TODO dn: find out why this was commented out + //its_eventgroup.second->clear_targets(); + //its_eventgroup.second->clear_pending_subscriptions(); } } } @@ -4578,7 +3702,7 @@ void routing_manager_impl::clear_remote_subscriber(service_t _service, void routing_manager_impl::call_sd_endpoint_connected( const boost::system::error_code& _error, service_t _service, instance_t _instance, - std::shared_ptr<endpoint> _endpoint, + const std::shared_ptr<endpoint>& _endpoint, std::shared_ptr<boost::asio::steady_timer> _timer) { (void)_timer; if (_error) { @@ -4604,14 +3728,20 @@ bool routing_manager_impl::create_placeholder_event_and_subscribe( if (its_local_client == host_->get_client()) { // received subscription for event of a service instance hosted by // application acting as rm_impl register with own client id and shadow = false - register_event(host_->get_client(), _service, _instance, _event, - its_eventgroups, true, std::chrono::milliseconds::zero(), false, + register_event(host_->get_client(), + _service, _instance, + _event, + its_eventgroups, event_type_e::ET_UNKNOWN, reliability_type_e::RT_UNKNOWN, + std::chrono::milliseconds::zero(), false, true, nullptr, false, false, true); } else if (its_local_client != VSOMEIP_ROUTING_CLIENT) { // received subscription for event of a service instance hosted on // this node register with client id of local_client and set shadow to true - register_event(its_local_client, _service, _instance, _event, - its_eventgroups, true, std::chrono::milliseconds::zero(), false, + register_event(its_local_client, + _service, _instance, + _event, its_eventgroups, event_type_e::ET_UNKNOWN, + reliability_type_e::RT_UNKNOWN, + std::chrono::milliseconds::zero(), false, true, nullptr, false, true, true); } else { // received subscription for event of a unknown or remote service instance @@ -4620,9 +3750,12 @@ bool routing_manager_impl::create_placeholder_event_and_subscribe( if (its_info && !its_info->is_local()) { // remote service, register shadow event with client ID of subscriber // which should have called register_event - register_event(_client, _service, _instance, _event, - its_eventgroups, true, std::chrono::milliseconds::zero(), - false, nullptr, false, true, true); + register_event(_client, + _service, _instance, + _event, its_eventgroups, event_type_e::ET_UNKNOWN, + reliability_type_e::RT_UNKNOWN, + std::chrono::milliseconds::zero(), + false, true, nullptr, false, true, true); } else { VSOMEIP_WARNING << "routing_manager_impl::create_placeholder_event_and_subscribe(" @@ -4642,126 +3775,84 @@ bool routing_manager_impl::create_placeholder_event_and_subscribe( return is_inserted; } -void routing_manager_impl::handle_subscription_state(client_t _client, service_t _service, instance_t _instance, +void routing_manager_impl::handle_subscription_state( + client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event) { +#if 0 + VSOMEIP_ERROR << "routing_manager_impl::" << __func__ + << "(" << std::hex << _client << "): " + << "event=" + << std::hex << _service << "." + << std::hex << _instance << "." + << std::hex << _eventgroup << "." + << std::hex << _event + << " me=" + << std::hex << get_client(); +#endif + // Note: remote_subscription_state_mutex_ is already locked as this + // method builds a critical section together with insert_subscription + // from routing_manager_base. + // Todo: Improve this situation... + auto its_event = find_event(_service, _instance, _event); + client_t its_client(VSOMEIP_ROUTING_CLIENT); + if (its_event && + its_event->get_type() == event_type_e::ET_SELECTIVE_EVENT) { + its_client = _client; + } - client_t subscriber = is_specific_endpoint_client(_client, _service, _instance); - auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, subscriber); - - std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); + auto its_tuple + = std::make_tuple(_service, _instance, _eventgroup, its_client); auto its_state = remote_subscription_state_.find(its_tuple); if (its_state != remote_subscription_state_.end()) { +#if 0 + VSOMEIP_ERROR << "routing_manager_impl::" << __func__ + << "(" << std::hex << _client << "): " + << "event=" + << std::hex << _service << "." + << std::hex << _instance << "." + << std::hex << _eventgroup << "." + << std::hex << _event + << " state=" << std::hex << (int)its_state->second + << " me=" + << std::hex << get_client(); +#endif if (its_state->second == subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) { // Subscription already acknowledged! if (_client == get_client()) { - host_->on_subscription_error(_service, _instance, _eventgroup, 0x0 /*OK*/); host_->on_subscription_status(_service, _instance, _eventgroup, _event, 0x0 /*OK*/); } else { stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event); } } - } else { - remote_subscription_state_[its_tuple] = subscription_state_e::IS_SUBSCRIBING; } } -client_t routing_manager_impl::is_specific_endpoint_client(client_t _client, - service_t _service, instance_t _instance) { - client_t result = VSOMEIP_ROUTING_CLIENT; - { - std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_); - auto found_service = specific_endpoint_clients_.find(_service); - if (found_service != specific_endpoint_clients_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_client = found_instance->second.find(_client); - if(found_client != found_instance->second.end()) { - result = _client; - } - } - } - } - // A client_t != VSOMEIP_ROUTING_CLIENT implies true - return result; -} - -std::unordered_set<client_t> routing_manager_impl::get_specific_endpoint_clients( - service_t _service, instance_t _instance) { - std::unordered_set<client_t> result; - { - std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_); - auto found_service = specific_endpoint_clients_.find(_service); - if (found_service != specific_endpoint_clients_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - result = found_instance->second; - } - } - } - return result; -} - -void routing_manager_impl::send_initial_events( - service_t _service, instance_t _instance, eventgroup_t _eventgroup, - const std::shared_ptr<endpoint_definition> &_subscriber) { - std::shared_ptr<eventgroupinfo> its_eventgroup = find_eventgroup(_service, - _instance, _eventgroup); - if (!its_eventgroup) { - return; - } - bool is_offered_both(false); - if (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT && - configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT) { - is_offered_both = true; - } - // send initial events if we already have a cached field (is_set) - for (const auto &its_event : its_eventgroup->get_events()) { - if (its_event->is_field()) { - if (its_event->is_set()) { - if (!is_offered_both) { - its_event->notify_one(_subscriber, true); - } else { - if (its_event->is_reliable() && _subscriber->is_reliable()) { - its_event->notify_one(_subscriber, true); - } - if (!its_event->is_reliable() && !_subscriber->is_reliable()) { - its_event->notify_one(_subscriber, true); - } - } - } else { - // received a subscription but can't notify due to missing payload - its_event->set_remote_notification_pending(true); - } - } - } -} - -void routing_manager_impl::register_offer_acceptance_handler( - vsomeip::offer_acceptance_handler_t _handler) const { +void routing_manager_impl::register_sd_acceptance_handler( + const sd_acceptance_handler_t& _handler) const { if (discovery_) { - discovery_->register_offer_acceptance_handler(_handler); + discovery_->register_sd_acceptance_handler(_handler); } } void routing_manager_impl::register_reboot_notification_handler( - vsomeip::reboot_notification_handler_t _handler) const { + const reboot_notification_handler_t& _handler) const { if (discovery_) { discovery_->register_reboot_notification_handler(_handler); } } void routing_manager_impl::register_routing_ready_handler( - routing_ready_handler_t _handler) { + const routing_ready_handler_t& _handler) { routing_ready_handler_ = _handler; } void routing_manager_impl::register_routing_state_handler( - routing_state_handler_t _handler) { + const routing_state_handler_t& _handler) { routing_state_handler_ = _handler; } -void routing_manager_impl::offer_acceptance_enabled( - boost::asio::ip::address _address) { +void routing_manager_impl::sd_acceptance_enabled( + const boost::asio::ip::address& _address) { boost::system::error_code ec; VSOMEIP_INFO << "ipsec-plugin-mgu: expire subscriptions and services: " << _address.to_string(ec); @@ -4775,7 +3866,7 @@ void routing_manager_impl::memory_log_timer_cbk( return; } #ifndef _WIN32 - static const std::uint32_t its_pagesize = getpagesize() / 1024; + static const std::uint32_t its_pagesize = static_cast<std::uint32_t>(getpagesize() / 1024); #else static const std::uint32_t its_pagesize = 4096 / 1024; #endif @@ -4836,75 +3927,7 @@ void routing_manager_impl::status_log_timer_cbk( return; } - // local client endpoints - { - std::map<client_t, std::shared_ptr<endpoint>> lces = get_local_endpoints(); - VSOMEIP_INFO << "status local client endpoints: " << std::dec << lces.size(); - for (const auto lce : lces) { - lce.second->print_status(); - } - } - - // udp and tcp client endpoints - { - client_endpoints_by_ip_t client_endpoints_by_ip; - remote_services_t remote_services; - server_endpoints_t server_endpoints; - { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - client_endpoints_by_ip = client_endpoints_by_ip_; - remote_services = remote_services_; - server_endpoints = server_endpoints_; - } - VSOMEIP_INFO << "status start remote client endpoints:"; - std::uint32_t num_remote_client_endpoints(0); - // normal endpoints - for (const auto &a : client_endpoints_by_ip) { - for (const auto p : a.second) { - for (const auto ru : p.second) { - ru.second->print_status(); - num_remote_client_endpoints++; - } - } - } - VSOMEIP_INFO << "status end remote client endpoints: " << std::dec - << num_remote_client_endpoints; - - // selective client endpoints - VSOMEIP_INFO << "status start selective remote client endpoints:"; - std::uint32_t num_remote_selectiv_client_endpoints(0); - for (const auto s : remote_services) { - for (const auto i : s.second) { - for (const auto c : i.second) { - if (c.first != VSOMEIP_ROUTING_CLIENT) { - for (const auto ur : c.second) { - ur.second->print_status(); - num_remote_selectiv_client_endpoints++; - } - } - } - } - } - VSOMEIP_INFO << "status end selective remote client endpoints: " - << std::dec << num_remote_selectiv_client_endpoints; - - VSOMEIP_INFO << "status start server endpoints:"; - std::uint32_t num_server_endpoints(1); - // local server endpoints - stub_->print_endpoint_status(); - - // server endpoints - for (const auto p : server_endpoints) { - for (const auto ru : p.second ) { - ru.second->print_status(); - num_server_endpoints++; - } - } - VSOMEIP_INFO << "status end server endpoints:" - << std::dec << num_server_endpoints; - } - - + ep_mgr_impl_->print_status(); { std::lock_guard<std::mutex> its_lock(status_log_timer_mutex_); boost::system::error_code ec; @@ -4916,138 +3939,107 @@ void routing_manager_impl::status_log_timer_cbk( } } -void routing_manager_impl::on_unsubscribe_ack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, - pending_subscription_id_t _unsubscription_id) { - std::shared_ptr<eventgroupinfo> its_eventgroup = find_eventgroup(_service, - _instance, _eventgroup); - if (!its_eventgroup) { - VSOMEIP_ERROR << __func__ << ": Received UNSUBSCRIBE_ACK for unknown " - << "eventgroup: (" - << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; - return; - } - // there will only be one or zero subscriptions returned here - std::vector<pending_subscription_t> its_pending_subscriptions = - its_eventgroup->remove_pending_subscription(_unsubscription_id); - for (const pending_subscription_t& its_sd_message_id : its_pending_subscriptions) { - if (its_sd_message_id.pending_subscription_id_ == _unsubscription_id) { - its_eventgroup->remove_target(its_sd_message_id.target_); - clear_remote_subscriber(_service, _instance, - its_sd_message_id.subscribing_client_, - its_sd_message_id.target_); - if (its_eventgroup->get_targets().size() == 0) { - for (auto e : its_eventgroup->get_events()) { - if (e->is_shadow()) { - e->unset_payload(); +void +routing_manager_impl::on_unsubscribe_ack(client_t _client, + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + remote_subscription_id_t _id) { + std::shared_ptr<eventgroupinfo> its_info + = find_eventgroup(_service, _instance, _eventgroup); + if (its_info) { + const auto its_subscription = its_info->get_remote_subscription(_id); + if (its_subscription) { + its_info->remove_remote_subscription(_id); + + std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); + remote_subscribers_[_service][_instance].erase(_client); + + if (its_info->get_remote_subscriptions().size() == 0) { + for (const auto &its_event : its_info->get_events()) { + bool has_remote_subscriber(false); + for (const auto &its_eventgroup : its_event->get_eventgroups()) { + const auto its_eventgroup_info + = find_eventgroup(_service, _instance, its_eventgroup); + if (its_eventgroup_info + && its_eventgroup_info->get_remote_subscriptions().size() > 0) { + has_remote_subscriber = true; + } + } + + if (!has_remote_subscriber && its_event->is_shadow()) { + its_event->unset_payload(); } } } } else { - const pending_subscription_id_t its_subscription_id = - its_sd_message_id.pending_subscription_id_; - const client_t its_subscribing_client = its_sd_message_id.subscribing_client_; - const client_t its_offering_client = find_local_client(_service, _instance); - send_subscription(its_offering_client, its_subscribing_client, _service, - _instance, _eventgroup, its_eventgroup->get_major(), - its_subscription_id); + VSOMEIP_ERROR << __func__ + << ": Unknown StopSubscribe " << std::dec << _id << " for eventgroup [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } + } else { + VSOMEIP_ERROR << __func__ + << ": Received StopSubscribe for unknown eventgroup: (" + << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } } -void routing_manager_impl::send_unsubscription( - client_t _offering_client, client_t _subscribing_client, - service_t _service, instance_t _instance, eventgroup_t _eventgroup, - pending_subscription_id_t _pending_unsubscription_id) { +void routing_manager_impl::on_connect(const std::shared_ptr<endpoint>& _endpoint) { + (void)_endpoint; +} +void routing_manager_impl::on_disconnect(const std::shared_ptr<endpoint>& _endpoint) { + (void)_endpoint; +} +void routing_manager_impl::send_subscription( + const client_t _offering_client, + const service_t _service, const instance_t _instance, + const eventgroup_t _eventgroup, const major_version_t _major, + const std::set<client_t> &_clients, + const remote_subscription_id_t _id) { if (host_->get_client() == _offering_client) { auto self = shared_from_this(); - host_->on_subscription(_service, _instance, _eventgroup, - _subscribing_client, false, - [this, self, _service, _instance, _eventgroup, - _subscribing_client, _pending_unsubscription_id, _offering_client] - (const bool _subscription_accepted) { - (void)_subscription_accepted; + for (const auto its_client : _clients) { + host_->on_subscription(_service, _instance, _eventgroup, its_client, own_uid_, own_gid_, true, + [this, self, _service, _instance, _eventgroup, its_client, _id] + (const bool _is_accepted) { try { - const auto its_callback = std::bind( - &routing_manager_stub_host::on_unsubscribe_ack, - std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), - _offering_client, _service, _instance, - _eventgroup, _pending_unsubscription_id); - io_.post(its_callback); + if (!_is_accepted) { + const auto its_callback = std::bind( + &routing_manager_stub_host::on_subscribe_nack, + std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), + its_client, _service, _instance, + _eventgroup, ANY_EVENT, _id); + io_.post(its_callback); + } else { + const auto its_callback = std::bind( + &routing_manager_stub_host::on_subscribe_ack, + std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), + its_client, _service, _instance, + _eventgroup, ANY_EVENT, _id); + io_.post(its_callback); + } } catch (const std::exception &e) { VSOMEIP_ERROR << __func__ << e.what(); } - } - ); - } else { - if (!stub_->send_unsubscribe(find_local(_offering_client), - _subscribing_client, - _service, _instance, _eventgroup, ANY_EVENT, - _pending_unsubscription_id)) { - try { - const auto its_callback = std::bind( - &routing_manager_stub_host::on_unsubscribe_ack, - std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), - _offering_client, _service, _instance, - _eventgroup, _pending_unsubscription_id); - io_.post(its_callback); - } catch (const std::exception &e) { - VSOMEIP_ERROR << __func__ << e.what(); - } + }); } - } -} - -void routing_manager_impl::send_subscription( - client_t _offering_client, client_t _subscribing_client, - service_t _service, instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, - pending_subscription_id_t _pending_subscription_id) { - if (host_->get_client() == _offering_client) { - auto self = shared_from_this(); - host_->on_subscription(_service, _instance, _eventgroup, - _subscribing_client, true, - [this, self, _service, _instance, - _eventgroup, _subscribing_client, _pending_subscription_id] - (const bool _subscription_accepted) { - try { - if (!_subscription_accepted) { + } else { // service hosted by local client + for (const auto its_client : _clients) { + if (!stub_->send_subscribe(find_local(_offering_client), its_client, + _service, _instance, _eventgroup, _major, ANY_EVENT, _id)) { + try { const auto its_callback = std::bind( &routing_manager_stub_host::on_subscribe_nack, std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), - _subscribing_client, _service, _instance, - _eventgroup, ANY_EVENT, _pending_subscription_id); - io_.post(its_callback); - } else { - const auto its_callback = std::bind( - &routing_manager_stub_host::on_subscribe_ack, - std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), - _subscribing_client, _service, _instance, - _eventgroup, ANY_EVENT, _pending_subscription_id); + its_client, _service, _instance, _eventgroup, + ANY_EVENT, _id); io_.post(its_callback); + } catch (const std::exception &e) { + VSOMEIP_ERROR << __func__ << e.what(); } - } catch (const std::exception &e) { - VSOMEIP_ERROR << __func__ << e.what(); - } - }); - } else { // service hosted by local client - if (!stub_->send_subscribe(find_local(_offering_client), - _subscribing_client, - _service, _instance, _eventgroup, - _major, ANY_EVENT, - _pending_subscription_id)) { - try { - const auto its_callback = std::bind( - &routing_manager_stub_host::on_subscribe_nack, - std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), - _subscribing_client, _service, _instance, - _eventgroup, ANY_EVENT, _pending_subscription_id); - io_.post(its_callback); - } catch (const std::exception &e) { - VSOMEIP_ERROR << __func__ << e.what(); } } } @@ -5056,31 +4048,15 @@ void routing_manager_impl::send_subscription( void routing_manager_impl::cleanup_server_endpoint( service_t _service, const std::shared_ptr<endpoint>& _endpoint) { if (_endpoint) { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - bool reliable = _endpoint->is_reliable(); - // Check whether any service still uses this endpoint - _endpoint->decrement_use_count(); - bool isLastService = (_endpoint->get_use_count() == 0); - - // Clear service_instances_ - if (1 >= service_instances_[_service].size()) { - service_instances_.erase(_service); - } else { - service_instances_[_service].erase(_endpoint.get()); - } - - // Clear server endpoint if no service remains using it - if (isLastService) { - const uint16_t port = _endpoint->get_local_port(); - if (server_endpoints_.find(port) != server_endpoints_.end()) { - server_endpoints_[port].erase(reliable); - if (server_endpoints_[port].find(!reliable) == server_endpoints_[port].end()) { - server_endpoints_.erase(port); - } + // Clear service_instances_, check whether any service still + // uses this endpoint and clear server endpoint if no service + // remains using it + if (ep_mgr_impl_->remove_instance(_service, _endpoint.get())) { + if (ep_mgr_impl_->remove_server_endpoint( + _endpoint->get_local_port(), _endpoint->is_reliable())) { + // Stop endpoint (close socket) to release its async_handlers! + _endpoint->stop(); } - - // Stop endpoint (close socket) to release its async_handlers! - _endpoint->stop(); } } } @@ -5182,16 +4158,18 @@ void routing_manager_impl::on_security_update_timeout( bool routing_manager_impl::update_security_policy_configuration( uint32_t _uid, uint32_t _gid, - ::std::shared_ptr<policy> _policy, std::shared_ptr<payload> _payload, security_update_handler_t _handler) { + const std::shared_ptr<policy>& _policy, + const std::shared_ptr<payload>& _payload, + const security_update_handler_t& _handler) { bool ret(true); // cache security policy payload for later distribution to new registering clients stub_->policy_cache_add(_uid, _payload); // update security policy from configuration - configuration_->update_security_policy(_uid, _gid, _policy); + security::get()->update_security_policy(_uid, _gid, _policy); // determine currently connected clients - std::unordered_set<client_t> its_clients_to_inform = get_connected_clients(); + std::unordered_set<client_t> its_clients_to_inform = ep_mgr_impl_->get_connected_clients(); // add handler pending_security_update_id_t its_id; @@ -5254,13 +4232,13 @@ bool routing_manager_impl::update_security_policy_configuration( } bool routing_manager_impl::remove_security_policy_configuration( - uint32_t _uid, uint32_t _gid, security_update_handler_t _handler) { + uint32_t _uid, uint32_t _gid, const security_update_handler_t& _handler) { bool ret(true); // remove security policy from configuration (only if there was a updateACL call before) if (stub_->is_policy_cached(_uid)) { - if (!configuration_->remove_security_policy(_uid, _gid)) { - _handler(vsomeip::security_update_state_e::SU_UNKNOWN_USER_ID); + if (!security::get()->remove_security_policy(_uid, _gid)) { + _handler(security_update_state_e::SU_UNKNOWN_USER_ID); ret = false; } else { // remove policy from cache to prevent sending it to registering clients @@ -5270,7 +4248,7 @@ bool routing_manager_impl::remove_security_policy_configuration( pending_security_update_id_t its_id; // determine currently connected clients - std::unordered_set<client_t> its_clients_to_inform = get_connected_clients(); + std::unordered_set<client_t> its_clients_to_inform = ep_mgr_impl_->get_connected_clients(); if (!its_clients_to_inform.empty()) { its_id = pending_security_update_add(its_clients_to_inform); @@ -5330,14 +4308,14 @@ bool routing_manager_impl::remove_security_policy_configuration( } } else { - _handler(vsomeip::security_update_state_e::SU_UNKNOWN_USER_ID); + _handler(security_update_state_e::SU_UNKNOWN_USER_ID); ret = false; } return ret; } pending_security_update_id_t routing_manager_impl::pending_security_update_add( - std::unordered_set<client_t> _clients) { + const std::unordered_set<client_t>& _clients) { std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); if (++pending_security_update_id_ == 0) { pending_security_update_id_++; @@ -5423,4 +4401,96 @@ void routing_manager_impl::on_security_update_response( } } -} // namespace vsomeip +void routing_manager_impl::print_stub_status() const { + stub_->print_endpoint_status(); +} + +void routing_manager_impl::service_endpoint_connected( + service_t _service, instance_t _instance, major_version_t _major, + minor_version_t _minor, const std::shared_ptr<endpoint>& _endpoint, + bool _unreliable_only) { + + if (!_unreliable_only) { + // Mark only TCP-only and TCP+UDP services available here + // UDP-only services are already marked as available in add_routing_info + on_availability(_service, _instance, true, _major, _minor); + stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, + _major, _minor); + } + + std::shared_ptr<boost::asio::steady_timer> its_timer = + std::make_shared<boost::asio::steady_timer>(io_); + boost::system::error_code ec; + its_timer->expires_from_now(std::chrono::milliseconds(3), ec); + if (!ec) { + its_timer->async_wait( + std::bind(&routing_manager_impl::call_sd_endpoint_connected, + std::static_pointer_cast<routing_manager_impl>( + shared_from_this()), std::placeholders::_1, + _service, _instance, _endpoint, its_timer)); + } else { + VSOMEIP_ERROR << __func__ << " " << ec.message(); + } +} + +void routing_manager_impl::service_endpoint_disconnected( + service_t _service, instance_t _instance, major_version_t _major, + minor_version_t _minor, const std::shared_ptr<endpoint>& _endpoint) { + (void)_endpoint; + on_availability(_service, _instance, false, _major, _minor); + stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, + _major, _minor); + VSOMEIP_WARNING << __func__ << ": lost connection to remote service: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; +} + +void +routing_manager_impl::send_unsubscription(client_t _offering_client, + service_t _service, instance_t _instance, + eventgroup_t _eventgroup, major_version_t _major, + const std::set<client_t> &_removed, + remote_subscription_id_t _id) { + + (void)_major; // TODO: Remove completely? + + if (host_->get_client() == _offering_client) { + auto self = shared_from_this(); + for (const auto its_client : _removed) { + host_->on_subscription(_service, _instance, + _eventgroup, its_client, own_uid_, own_gid_, false, + [this, self, _service, _instance, _eventgroup, + its_client, _id, _offering_client] + (const bool _is_accepted) { + (void)_is_accepted; + try { + const auto its_callback = std::bind( + &routing_manager_stub_host::on_unsubscribe_ack, + std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), + its_client, _service, _instance, _eventgroup, _id); + io_.post(its_callback); + } catch (const std::exception &e) { + VSOMEIP_ERROR << __func__ << e.what(); + } + } + ); + } + } else { + for (const auto its_client : _removed) { + if (!stub_->send_unsubscribe(find_local(_offering_client), its_client, + _service, _instance, _eventgroup, ANY_EVENT, _id)) { + try { + const auto its_callback = std::bind( + &routing_manager_stub_host::on_unsubscribe_ack, + std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), + its_client, _service, _instance, _eventgroup, _id); + io_.post(its_callback); + } catch (const std::exception &e) { + VSOMEIP_ERROR << __func__ << e.what(); + } + } + } + } +} + +} // namespace vsomeip_v3 |