// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include #include #include #include "../include/constants.hpp" #include "../include/defines.hpp" #include "../include/deserializer.hpp" #include "../include/enumeration_types.hpp" #include "../include/eventgroupentry_impl.hpp" #include "../include/ipv4_option_impl.hpp" #include "../include/ipv6_option_impl.hpp" #include "../include/message_impl.hpp" #include "../include/request.hpp" #include "../include/runtime.hpp" #include "../include/service_discovery_host.hpp" #include "../include/service_discovery_impl.hpp" #include "../include/serviceentry_impl.hpp" #include "../include/subscription.hpp" #include "../../configuration/include/configuration.hpp" #include "../../configuration/include/internal.hpp" #include "../../endpoints/include/endpoint.hpp" #include "../../endpoints/include/client_endpoint.hpp" #include "../../endpoints/include/endpoint_definition.hpp" #include "../../endpoints/include/tcp_server_endpoint_impl.hpp" #include "../../endpoints/include/udp_server_endpoint_impl.hpp" #include "../../logging/include/logger.hpp" #include "../../message/include/serializer.hpp" #include "../../routing/include/eventgroupinfo.hpp" #include "../../routing/include/serviceinfo.hpp" namespace vsomeip { namespace sd { service_discovery_impl::service_discovery_impl(service_discovery_host *_host) : io_(_host->get_io()), host_(_host), port_(VSOMEIP_SD_DEFAULT_PORT), reliable_(false), serializer_( std::make_shared( host_->get_configuration()->get_buffer_shrink_threshold())), deserializer_( std::make_shared( host_->get_configuration()->get_buffer_shrink_threshold())), ttl_timer_(_host->get_io()), smallest_ttl_(DEFAULT_TTL), ttl_(VSOMEIP_SD_DEFAULT_TTL), subscription_expiration_timer_(_host->get_io()), max_message_size_(VSOMEIP_MAX_UDP_SD_PAYLOAD), initial_delay_(0), offer_debounce_time_(VSOMEIP_SD_DEFAULT_OFFER_DEBOUNCE_TIME), repetitions_base_delay_(VSOMEIP_SD_DEFAULT_REPETITIONS_BASE_DELAY), repetitions_max_(VSOMEIP_SD_DEFAULT_REPETITIONS_MAX), cyclic_offer_delay_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY), offer_debounce_timer_(_host->get_io()), find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME), find_debounce_timer_(_host->get_io()), main_phase_timer_(_host->get_io()), is_suspended_(true) { std::chrono::seconds smallest_ttl(DEFAULT_TTL); smallest_ttl_ = std::chrono::duration_cast(smallest_ttl); // TODO: cleanup start condition! next_subscription_expiration_ = std::chrono::steady_clock::now() + std::chrono::hours(24); } service_discovery_impl::~service_discovery_impl() { } std::shared_ptr service_discovery_impl::get_configuration() const { return host_->get_configuration(); } boost::asio::io_service & service_discovery_impl::get_io() { return io_; } void service_discovery_impl::init() { runtime_ = runtime::get(); std::shared_ptr < configuration > its_configuration = host_->get_configuration(); if (its_configuration) { unicast_ = its_configuration->get_unicast_address(); sd_multicast_ = its_configuration->get_sd_multicast(); port_ = its_configuration->get_sd_port(); reliable_ = (its_configuration->get_sd_protocol() == "tcp"); max_message_size_ = (reliable_ ? VSOMEIP_MAX_TCP_SD_PAYLOAD : VSOMEIP_MAX_UDP_SD_PAYLOAD); ttl_ = its_configuration->get_sd_ttl(); // generate random initial delay based on initial delay min and max std::int32_t initial_delay_min = its_configuration->get_sd_initial_delay_min(); if (initial_delay_min < 0) { initial_delay_min = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MIN; } std::int32_t initial_delay_max = its_configuration->get_sd_initial_delay_max(); if (initial_delay_max < 0) { initial_delay_max = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MAX; } if (initial_delay_min > initial_delay_max) { const std::uint32_t tmp(initial_delay_min); initial_delay_min = initial_delay_max; initial_delay_max = tmp; } std::random_device r; std::mt19937 e(r()); std::uniform_int_distribution distribution( initial_delay_min, initial_delay_max); initial_delay_ = std::chrono::milliseconds(distribution(e)); repetitions_base_delay_ = std::chrono::milliseconds( its_configuration->get_sd_repetitions_base_delay()); repetitions_max_ = its_configuration->get_sd_repetitions_max(); cyclic_offer_delay_ = std::chrono::milliseconds( its_configuration->get_sd_cyclic_offer_delay()); offer_debounce_time_ = std::chrono::milliseconds( its_configuration->get_sd_offer_debounce_time()); } else { VSOMEIP_ERROR << "SD: no configuration found!"; } } void service_discovery_impl::start() { if (!endpoint_) { endpoint_ = host_->create_service_discovery_endpoint( sd_multicast_, port_, reliable_); if (!endpoint_) { VSOMEIP_ERROR << "Couldn't start service discovery"; return; } } is_suspended_ = false; start_main_phase_timer(); start_offer_debounce_timer(true); start_find_debounce_timer(true); } void service_discovery_impl::stop() { boost::system::error_code ec; is_suspended_ = true; { std::lock_guard its_lock(main_phase_timer_mutex_); main_phase_timer_.cancel(ec); } { std::lock_guard its_lock(offer_debounce_timer_mutex_); offer_debounce_timer_.cancel(ec); } { std::lock_guard its_lock(find_debounce_timer_mutex_); find_debounce_timer_.cancel(ec); } { std::lock_guard its_lock(repetition_phase_timers_mutex_); for(const auto &t : repetition_phase_timers_) { t.first->cancel(ec); } } { std::lock_guard its_lock(find_repetition_phase_timers_mutex_); for(const auto &t : find_repetition_phase_timers_) { t.first->cancel(ec); } } } void service_discovery_impl::request_service(service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl) { std::lock_guard its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { auto find_instance = find_service->second.find(_instance); if (find_instance == find_service->second.end()) { find_service->second[_instance] = std::make_shared < request > (_major, _minor, _ttl); } } else { requested_[_service][_instance] = std::make_shared < request > (_major, _minor, _ttl); } } void service_discovery_impl::release_service(service_t _service, instance_t _instance) { std::lock_guard its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { find_service->second.erase(_instance); } } std::shared_ptr service_discovery_impl::find_request(service_t _service, instance_t _instance) { std::lock_guard its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { auto find_instance = find_service->second.find(_instance); if (find_instance != find_service->second.end()) { return find_instance->second; } } return nullptr; } void service_discovery_impl::subscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, client_t _client, subscription_type_e _subscription_type) { uint8_t subscribe_count(0); { std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { auto found_client = found_eventgroup->second.find(_client); if (found_client != found_eventgroup->second.end()) { if (found_client->second->get_major() != _major) { VSOMEIP_ERROR << "Subscriptions to different versions of the same " "service instance are not supported!"; } return; } } } } const uint8_t max_parallel_subscriptions = 16; // 4Bit Counter field subscribe_count = static_cast(subscribed_[_service][_instance][_eventgroup].size()); if (subscribe_count >= max_parallel_subscriptions) { VSOMEIP_WARNING << "Too many parallel subscriptions (max.16) on same event group: " << std::hex << _eventgroup << std::dec; return; } } std::shared_ptr < endpoint > its_unreliable; std::shared_ptr < endpoint > its_reliable; bool has_address(false); boost::asio::ip::address its_address; get_subscription_endpoints(_subscription_type, its_unreliable, its_reliable, &its_address, &has_address, _service, _instance, _client); std::shared_ptr its_runtime = runtime_.lock(); if (!its_runtime) { return; } std::shared_ptr its_message = its_runtime->create_message(); { std::lock_guard its_lock(subscribed_mutex_); // New subscription std::shared_ptr < subscription > its_subscription = std::make_shared < subscription > (_major, _ttl, its_reliable, its_unreliable, _subscription_type, subscribe_count); subscribed_[_service][_instance][_eventgroup][_client] = its_subscription; if (has_address) { if (_client != VSOMEIP_ROUTING_CLIENT) { if (its_subscription->get_endpoint(true) && !host_->has_identified(_client, _service, _instance, true)) { return; } if (its_subscription->get_endpoint(false) && !host_->has_identified(_client, _service, _instance, false)) { return; } } if (its_subscription->get_endpoint(true) && its_subscription->get_endpoint(true)->is_connected()) { insert_subscription(its_message, _service, _instance, _eventgroup, its_subscription, true, true); } else { // don't insert reliable endpoint option if the // TCP client endpoint is not yet connected insert_subscription(its_message, _service, _instance, _eventgroup, its_subscription, false, true); its_subscription->set_tcp_connection_established(false); } if(0 < its_message->get_entries().size()) { its_subscription->set_acknowledged(false); } } } if(has_address && 0 < its_message->get_entries().size()) { serialize_and_send(its_message, its_address); } } void service_discovery_impl::get_subscription_endpoints( subscription_type_e _subscription_type, std::shared_ptr& _unreliable, std::shared_ptr& _reliable, boost::asio::ip::address* _address, bool* _has_address, service_t _service, instance_t _instance, client_t _client) const { switch (_subscription_type) { case subscription_type_e::SU_RELIABLE_AND_UNRELIABLE: _reliable = host_->find_or_create_remote_client(_service, _instance, true, _client); _unreliable = host_->find_or_create_remote_client(_service, _instance, false, _client); if (_unreliable) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast(_unreliable); if (its_client_endpoint) { *_has_address = its_client_endpoint->get_remote_address( *_address); } } if (_reliable) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast(_reliable); if (its_client_endpoint) { *_has_address = *_has_address || its_client_endpoint->get_remote_address( *_address); } } break; case subscription_type_e::SU_PREFER_UNRELIABLE: _unreliable = host_->find_or_create_remote_client(_service, _instance, false, _client); if (_unreliable) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast(_unreliable); if (its_client_endpoint) { *_has_address = its_client_endpoint->get_remote_address( *_address); } } else { _reliable = host_->find_or_create_remote_client(_service, _instance, true, _client); if (_reliable) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast( _reliable); if (its_client_endpoint) { *_has_address = its_client_endpoint->get_remote_address( *_address); } } } break; case subscription_type_e::SU_PREFER_RELIABLE: _reliable = host_->find_or_create_remote_client(_service, _instance, true, _client); if (_reliable) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast(_reliable); if (its_client_endpoint) { *_has_address = its_client_endpoint->get_remote_address( *_address); } } else { _unreliable = host_->find_or_create_remote_client(_service, _instance, false, _client); if (_unreliable) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast( _unreliable); if (its_client_endpoint) { *_has_address = its_client_endpoint->get_remote_address( *_address); } } } break; case subscription_type_e::SU_UNRELIABLE: _unreliable = host_->find_or_create_remote_client(_service, _instance, false, _client); if (_unreliable) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast(_unreliable); if (its_client_endpoint) { *_has_address = its_client_endpoint->get_remote_address( *_address); } } break; case subscription_type_e::SU_RELIABLE: _reliable = host_->find_or_create_remote_client(_service, _instance, true, _client); if (_reliable) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast(_reliable); if (its_client_endpoint) { *_has_address = its_client_endpoint->get_remote_address( *_address); } } } } void service_discovery_impl::unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, client_t _client) { std::shared_ptr < runtime > its_runtime = runtime_.lock(); if(!its_runtime) { return; } std::shared_ptr < message_impl > its_message = its_runtime->create_message(); boost::asio::ip::address its_address; bool has_address(false); { std::lock_guard its_lock(subscribed_mutex_); std::shared_ptr < subscription > its_subscription; auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { auto found_client = found_eventgroup->second.find(_client); if (found_client != found_eventgroup->second.end()) { its_subscription = found_client->second; its_subscription->set_ttl(0); found_eventgroup->second.erase(_client); auto endpoint = its_subscription->get_endpoint(false); if (endpoint) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast( endpoint); if (its_client_endpoint) { has_address = its_client_endpoint->get_remote_address( its_address); } } else { endpoint = its_subscription->get_endpoint(true); if (endpoint) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast< client_endpoint>(endpoint); if (its_client_endpoint) { has_address = its_client_endpoint->get_remote_address( its_address); } } else { return; } } insert_subscription(its_message, _service, _instance, _eventgroup, its_subscription, true, true); } } } } } if (has_address && 0 < its_message->get_entries().size()) { serialize_and_send(its_message, its_address); } } void service_discovery_impl::unsubscribe_all(service_t _service, instance_t _instance) { std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { for (auto &its_eventgroup : found_instance->second) { for (auto its_client : its_eventgroup.second) { its_client.second->set_acknowledged(true); its_client.second->set_endpoint(nullptr, true); its_client.second->set_endpoint(nullptr, false); } } } } } void service_discovery_impl::unsubscribe_client(service_t _service, instance_t _instance, client_t _client) { std::shared_ptr its_runtime = runtime_.lock(); if (!its_runtime) { return; } std::shared_ptr < message_impl > its_message = its_runtime->create_message(); boost::asio::ip::address its_address; bool has_address(false); { std::lock_guard its_lock(subscribed_mutex_); std::shared_ptr < subscription > its_subscription; auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { for (auto &found_eventgroup : found_instance->second) { auto found_client = found_eventgroup.second.find(_client); if (found_client != found_eventgroup.second.end()) { its_subscription = found_client->second; its_subscription->set_ttl(0); found_eventgroup.second.erase(_client); if (!has_address) { auto endpoint = its_subscription->get_endpoint( false); if (endpoint) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast< client_endpoint>(endpoint); if (its_client_endpoint) { has_address = its_client_endpoint->get_remote_address( its_address); } } else { endpoint = its_subscription->get_endpoint(true); if (endpoint) { std::shared_ptr its_client_endpoint = std::dynamic_pointer_cast< client_endpoint>(endpoint); if (its_client_endpoint) { has_address = its_client_endpoint->get_remote_address( its_address); } } else { return; } } } insert_subscription(its_message, _service, _instance, found_eventgroup.first, its_subscription, true, true); } } } } } if (has_address && 0 < its_message->get_entries().size()) { serialize_and_send(its_message, its_address); } } std::pair service_discovery_impl::get_session( const boost::asio::ip::address &_address) { std::pair its_session; auto found_session = sessions_sent_.find(_address); if (found_session == sessions_sent_.end()) { its_session = sessions_sent_[_address] = { 1, true }; } else { its_session = found_session->second; } return its_session; } void service_discovery_impl::increment_session( const boost::asio::ip::address &_address) { auto found_session = sessions_sent_.find(_address); if (found_session != sessions_sent_.end()) { found_session->second.first++; if (found_session->second.first == 0) { found_session->second = { 1, false }; } } } bool service_discovery_impl::is_reboot( const boost::asio::ip::address &_sender, const boost::asio::ip::address &_destination, bool _reboot_flag, session_t _session) { bool result(false); auto its_last_session = sessions_received_.find(_sender); bool is_multicast = _destination.is_multicast(); session_t its_unicast_id = (is_multicast ? 0 : _session); session_t its_multicast_id = (is_multicast ? _session : 0); if (its_last_session == sessions_received_.end()) { sessions_received_[_sender] = std::make_tuple(its_multicast_id, its_unicast_id, _reboot_flag); } else { // Reboot detection: Either the flag has changed from false to true, // or the session identifier overrun while the flag is true if (its_last_session != sessions_received_.end()) { if (!std::get<2>(its_last_session->second) && _reboot_flag) { result = true; } else { session_t its_last_id = (is_multicast ? std::get<0>(its_last_session->second) : std::get<1>(its_last_session->second)); if (std::get<2>(its_last_session->second) && _reboot_flag && its_last_id >= _session) { result = true; } } } if (result == false) { // no reboot -> update session if (is_multicast) { std::get<0>(its_last_session->second) = its_multicast_id; } else { std::get<1>(its_last_session->second) = its_unicast_id; } } else { // reboot -> reset the session sessions_received_.erase(_sender); } } return result; } template std::shared_ptr service_discovery_impl::find_existing_option( std::shared_ptr &_message, AddressType _address, uint16_t _port, layer_four_protocol_e _protocol, option_type_e _option_type) { if (_message->get_options().size() > 0) { std::uint16_t option_length(0x0); if(_option_type == option_type_e::IP4_ENDPOINT || _option_type == option_type_e::IP4_MULTICAST) { option_length = 0x9; } else if(_option_type == option_type_e::IP6_ENDPOINT || _option_type == option_type_e::IP6_MULTICAST) { option_length = 0x15; } else { // unsupported option type return nullptr; } bool is_multicast(false); if(_option_type == option_type_e::IP4_MULTICAST || _option_type == option_type_e::IP6_MULTICAST) { is_multicast = true; } std::vector> its_options = _message->get_options(); for (const std::shared_ptr& opt : its_options) { if (opt->get_length() == option_length && opt->get_type() == _option_type && std::static_pointer_cast(opt)->get_layer_four_protocol() == _protocol && std::static_pointer_cast(opt)->get_port() == _port && std::static_pointer_cast