// Copyright (C) 2014-2018 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include #include #include #include #include #include "../include/constants.hpp" #include "../include/defines.hpp" #include "../include/deserializer.hpp" #include "../include/enumeration_types.hpp" #include "../include/eventgroupentry_impl.hpp" #include "../include/ipv4_option_impl.hpp" #include "../include/ipv6_option_impl.hpp" #include "../include/selective_option_impl.hpp" #include "../include/message_impl.hpp" #include "../include/remote_subscription_ack.hpp" #include "../include/request.hpp" #include "../include/runtime.hpp" #include "../include/service_discovery_host.hpp" #include "../include/service_discovery_impl.hpp" #include "../include/serviceentry_impl.hpp" #include "../include/subscription.hpp" #include "../../configuration/include/configuration.hpp" #include "../../endpoints/include/endpoint.hpp" #include "../../endpoints/include/client_endpoint.hpp" #include "../../endpoints/include/endpoint_definition.hpp" #include "../../endpoints/include/tcp_server_endpoint_impl.hpp" #include "../../endpoints/include/udp_server_endpoint_impl.hpp" #include "../../message/include/serializer.hpp" #include "../../plugin/include/plugin_manager_impl.hpp" #include "../../routing/include/event.hpp" #include "../../routing/include/eventgroupinfo.hpp" #include "../../routing/include/serviceinfo.hpp" #include "../../utility/include/byteorder.hpp" namespace vsomeip_v3 { namespace sd { service_discovery_impl::service_discovery_impl( service_discovery_host *_host, const std::shared_ptr& _configuration) : io_(_host->get_io()), host_(_host), configuration_(_configuration), port_(VSOMEIP_SD_DEFAULT_PORT), reliable_(false), serializer_(std::make_shared( configuration_->get_buffer_shrink_threshold())), deserializer_(std::make_shared( configuration_->get_buffer_shrink_threshold())), ttl_timer_(_host->get_io()), ttl_timer_runtime_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY / 2), ttl_(VSOMEIP_SD_DEFAULT_TTL), subscription_expiration_timer_(_host->get_io()), max_message_size_(VSOMEIP_MAX_UDP_SD_PAYLOAD), initial_delay_(0), offer_debounce_time_(VSOMEIP_SD_DEFAULT_OFFER_DEBOUNCE_TIME), repetitions_base_delay_(VSOMEIP_SD_DEFAULT_REPETITIONS_BASE_DELAY), repetitions_max_(VSOMEIP_SD_DEFAULT_REPETITIONS_MAX), cyclic_offer_delay_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY), offer_debounce_timer_(_host->get_io()), find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME), find_debounce_timer_(_host->get_io()), main_phase_timer_(_host->get_io()), is_suspended_(false), is_diagnosis_(false), last_msg_received_timer_(_host->get_io()), last_msg_received_timer_timeout_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY + (VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY / 10)) { next_subscription_expiration_ = std::chrono::steady_clock::now() + std::chrono::hours(24); } service_discovery_impl::~service_discovery_impl() { } boost::asio::io_service & service_discovery_impl::get_io() { return io_; } void service_discovery_impl::init() { runtime_ = std::dynamic_pointer_cast( plugin_manager::get()->get_plugin( plugin_type_e::SD_RUNTIME_PLUGIN, VSOMEIP_SD_LIBRARY)); unicast_ = configuration_->get_unicast_address(); sd_multicast_ = configuration_->get_sd_multicast(); boost::system::error_code ec; sd_multicast_address_ = boost::asio::ip::address::from_string(sd_multicast_, ec); port_ = configuration_->get_sd_port(); reliable_ = (configuration_->get_sd_protocol() == "tcp"); max_message_size_ = (reliable_ ? VSOMEIP_MAX_TCP_SD_PAYLOAD : VSOMEIP_MAX_UDP_SD_PAYLOAD); ttl_ = configuration_->get_sd_ttl(); // generate random initial delay based on initial delay min and max std::uint32_t initial_delay_min = configuration_->get_sd_initial_delay_min(); std::uint32_t initial_delay_max = configuration_->get_sd_initial_delay_max(); if (initial_delay_min > initial_delay_max) { const std::uint32_t tmp(initial_delay_min); initial_delay_min = initial_delay_max; initial_delay_max = tmp; } std::random_device r; std::mt19937 e(r()); std::uniform_int_distribution distribution( initial_delay_min, initial_delay_max); initial_delay_ = std::chrono::milliseconds(distribution(e)); repetitions_base_delay_ = std::chrono::milliseconds( configuration_->get_sd_repetitions_base_delay()); repetitions_max_ = configuration_->get_sd_repetitions_max(); cyclic_offer_delay_ = std::chrono::milliseconds( configuration_->get_sd_cyclic_offer_delay()); offer_debounce_time_ = std::chrono::milliseconds( configuration_->get_sd_offer_debounce_time()); ttl_timer_runtime_ = cyclic_offer_delay_ / 2; ttl_factor_offers_ = configuration_->get_ttl_factor_offers(); ttl_factor_subscriptions_ = configuration_->get_ttl_factor_subscribes(); last_msg_received_timer_timeout_ = cyclic_offer_delay_ + (cyclic_offer_delay_ / 10); } void service_discovery_impl::start() { if (!endpoint_) { endpoint_ = host_->create_service_discovery_endpoint( sd_multicast_, port_, reliable_); if (!endpoint_) { VSOMEIP_ERROR << "Couldn't start service discovery"; return; } } { std::lock_guard its_lock(sessions_received_mutex_); sessions_received_.clear(); } { std::lock_guard its_lock(serialize_mutex_); sessions_sent_.clear(); } if (is_suspended_) { // make sure to sent out FindService messages after resume std::lock_guard its_lock(requested_mutex_); for (const auto &s : requested_) { for (const auto &i : s.second) { i.second->set_sent_counter(0); } } if (endpoint_ && !reliable_) { // rejoin multicast group dynamic_cast( endpoint_.get())->join(sd_multicast_); } } is_suspended_ = false; start_main_phase_timer(); start_offer_debounce_timer(true); start_find_debounce_timer(true); start_ttl_timer(); } void service_discovery_impl::stop() { is_suspended_ = true; stop_ttl_timer(); stop_last_msg_received_timer(); } void service_discovery_impl::request_service( service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl) { std::lock_guard its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { auto find_instance = find_service->second.find(_instance); if (find_instance == find_service->second.end()) { find_service->second[_instance] = std::make_shared(_major, _minor, _ttl); } } else { requested_[_service][_instance] = std::make_shared(_major, _minor, _ttl); } } void service_discovery_impl::release_service( service_t _service, instance_t _instance) { std::lock_guard its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { find_service->second.erase(_instance); } } std::shared_ptr service_discovery_impl::find_request(service_t _service, instance_t _instance) { std::lock_guard its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { auto find_instance = find_service->second.find(_instance); if (find_instance != find_service->second.end()) { return find_instance->second; } } return nullptr; } void service_discovery_impl::subscribe( service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, client_t _client, const std::shared_ptr &_info) { #ifdef VSOMEIP_ENABLE_COMPAT bool is_selective(_info ? _info->is_selective() : false); #endif // VSOMEIP_ENABLE_COMPAT std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { auto its_subscription = found_eventgroup->second; #ifdef VSOMEIP_ENABLE_COMPAT if (!its_subscription->is_selective() && is_selective) { its_subscription->set_selective(true); its_subscription->remove_client(VSOMEIP_ROUTING_CLIENT); for (const auto& e : _info->get_events()) { for (const auto& c : e->get_subscribers(_eventgroup)) { its_subscription->add_client(c); } } } #endif // VSOMEIP_ENABLE_COMPAT if (its_subscription->get_major() != _major) { VSOMEIP_ERROR << "Subscriptions to different versions of the same " "service instance are not supported!"; } else if (its_subscription->is_selective()) { if (its_subscription->add_client(_client)) { its_subscription->set_state(_client, subscription_state_e::ST_NOT_ACKNOWLEDGED); send_subscription(its_subscription, _service, _instance, _eventgroup, _client); } } return; } } } std::shared_ptr its_reliable, its_unreliable; get_subscription_endpoints(_service, _instance, its_reliable, its_unreliable); // New subscription std::shared_ptr its_subscription = create_subscription( _major, _ttl, its_reliable, its_unreliable, _info); if (!its_subscription) { VSOMEIP_ERROR << __func__ << ": creating subscription failed!"; return; } subscribed_[_service][_instance][_eventgroup] = its_subscription; its_subscription->add_client(_client); its_subscription->set_state(_client, subscription_state_e::ST_NOT_ACKNOWLEDGED); send_subscription(its_subscription, _service, _instance, _eventgroup, _client); } void service_discovery_impl::send_subscription( const std::shared_ptr &_subscription, const service_t _service, const instance_t _instance, const eventgroup_t _eventgroup, const client_t _client) { (void)_client; auto its_reliable = _subscription->get_endpoint(true); auto its_unreliable = _subscription->get_endpoint(false); boost::asio::ip::address its_address; get_subscription_address(its_reliable, its_unreliable, its_address); if (!its_address.is_unspecified()) { entry_data_t its_data; const reliability_type_e its_reliability_type = get_eventgroup_reliability(_service, _instance, _eventgroup, _subscription); if (its_reliability_type == reliability_type_e::RT_UNRELIABLE && its_unreliable) { if (its_unreliable->is_established()) { its_data = create_eventgroup_entry(_service, _instance, _eventgroup, _subscription, its_reliability_type); } else { _subscription->set_udp_connection_established(false); } } else if (its_reliability_type == reliability_type_e::RT_RELIABLE && its_reliable) { if (its_reliable->is_established()) { its_data = create_eventgroup_entry(_service, _instance, _eventgroup, _subscription, its_reliability_type); } else { _subscription->set_tcp_connection_established(false); } } else if (its_reliability_type == reliability_type_e::RT_BOTH && its_reliable && its_unreliable) { if (its_reliable->is_established() && its_unreliable->is_established()) { its_data = create_eventgroup_entry(_service, _instance, _eventgroup, _subscription, its_reliability_type); } else { if (!its_reliable->is_established()) { _subscription->set_tcp_connection_established(false); } if (!its_unreliable->is_established()) { _subscription->set_udp_connection_established(false); } } } else if (its_reliability_type == reliability_type_e::RT_UNKNOWN) { VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't determine reliability type for subscription to [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "; } if (its_data.entry_) { // TODO: Implement a simple path, that sends a single message auto its_current_message = std::make_shared(); std::vector > its_messages; its_messages.push_back(its_current_message); add_entry_data(its_messages, its_data); serialize_and_send(its_messages, its_address); } } } void service_discovery_impl::get_subscription_endpoints( service_t _service, instance_t _instance, std::shared_ptr &_reliable, std::shared_ptr &_unreliable) const { _unreliable = host_->find_or_create_remote_client( _service, _instance, false); _reliable = host_->find_or_create_remote_client( _service, _instance, true); } void service_discovery_impl::get_subscription_address( const std::shared_ptr &_reliable, const std::shared_ptr &_unreliable, boost::asio::ip::address &_address) const { if (_reliable) { auto its_client_endpoint = std::dynamic_pointer_cast(_reliable); if (its_client_endpoint) { its_client_endpoint->get_remote_address(_address); return; } } if (_unreliable) { auto its_client_endpoint = std::dynamic_pointer_cast(_unreliable); if (its_client_endpoint) { its_client_endpoint->get_remote_address(_address); } } } void service_discovery_impl::unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, client_t _client) { std::shared_ptr < runtime > its_runtime = runtime_.lock(); if (!its_runtime) { return; } auto its_current_message = std::make_shared(); boost::asio::ip::address its_address; { std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { auto its_subscription = found_eventgroup->second; if (its_subscription->remove_client(_client)) { auto its_reliable = its_subscription->get_endpoint(true); auto its_unreliable = its_subscription->get_endpoint(false); get_subscription_address( its_reliable, its_unreliable, its_address); if (!its_subscription->has_client()) { its_subscription->set_ttl(0); } else if (its_subscription->is_selective()) { // create a dummy subscription object to unsubscribe // the single client. auto its_major = its_subscription->get_major(); its_subscription = std::make_shared(); its_subscription->set_major(its_major); its_subscription->set_ttl(0); its_subscription->set_selective(true); its_subscription->set_endpoint(its_reliable, true); its_subscription->set_endpoint(its_unreliable, false); } } // For selective subscriptions, the client must be added again // to generate the selective option if (its_subscription->is_selective()) its_subscription->add_client(_client); const reliability_type_e its_reliability_type = get_eventgroup_reliability(_service, _instance, _eventgroup, its_subscription); auto its_data = create_eventgroup_entry(_service, _instance, _eventgroup, its_subscription, its_reliability_type); if (its_data.entry_) its_current_message->add_entry_data(its_data.entry_, its_data.options_); // Remove it again before updating (only impacts last unsubscribe) if (its_subscription->is_selective()) (void)its_subscription->remove_client(_client); // Ensure to update the "real" subscription its_subscription = found_eventgroup->second; // Finally update the subscriptions if (!its_subscription->has_client()) { found_instance->second.erase(found_eventgroup); if (found_instance->second.size() == 0) { found_service->second.erase(found_instance); } } } } } } std::vector > its_messages; its_messages.push_back(its_current_message); serialize_and_send(its_messages, its_address); } void service_discovery_impl::unsubscribe_all( service_t _service, instance_t _instance) { auto its_current_message = std::make_shared();; boost::asio::ip::address its_address; { std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { for (auto &its_eventgroup : found_instance->second) { auto its_subscription = its_eventgroup.second; its_subscription->set_ttl(0); const reliability_type_e its_reliability = get_eventgroup_reliability(_service, _instance, its_eventgroup.first, its_subscription); auto its_data = create_eventgroup_entry(_service, _instance, its_eventgroup.first, its_subscription, its_reliability); auto its_reliable = its_subscription->get_endpoint(true); auto its_unreliable = its_subscription->get_endpoint(false); get_subscription_address( its_reliable, its_unreliable, its_address); if (its_data.entry_) { its_current_message->add_entry_data(its_data.entry_, its_data.options_); } } found_instance->second.clear(); } } } std::vector > its_messages; its_messages.push_back(its_current_message); serialize_and_send(its_messages, its_address); } void service_discovery_impl::reset_subscriptions( service_t _service, instance_t _instance) { std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { for (auto &its_eventgroup : found_instance->second) { auto its_subscription = its_eventgroup.second; for (auto its_client : its_subscription->get_clients()) { its_subscription->set_state(its_client, subscription_state_e::ST_UNKNOWN); } its_subscription->set_endpoint(nullptr, true); its_subscription->set_endpoint(nullptr, false); } } } } std::pair service_discovery_impl::get_session( const boost::asio::ip::address &_address) { std::pair its_session; auto found_session = sessions_sent_.find(_address); if (found_session == sessions_sent_.end()) { its_session = sessions_sent_[_address] = { 1, true }; } else { its_session = found_session->second; } return its_session; } void service_discovery_impl::increment_session( const boost::asio::ip::address &_address) { auto found_session = sessions_sent_.find(_address); if (found_session != sessions_sent_.end()) { found_session->second.first++; if (found_session->second.first == 0) { found_session->second = { 1, false }; } } } bool service_discovery_impl::is_reboot( const boost::asio::ip::address &_sender, const boost::asio::ip::address &_destination, bool _reboot_flag, session_t _session) { bool result(false); auto its_received = sessions_received_.find(_sender); bool is_multicast = _destination.is_multicast(); // Initialize both sessions with 0. Thus, the session identifier // for the session not being received from the network is stored // as 0 and will never trigger the reboot detection. session_t its_multicast_session(0), its_unicast_session(0); // Initialize both flags with true. Thus, the flag not being // received from the network will never trigger the reboot detection. bool its_multicast_reboot_flag(true), its_unicast_reboot_flag(true); if (is_multicast) { its_multicast_session = _session; its_multicast_reboot_flag = _reboot_flag; } else { its_unicast_session = _session; its_unicast_reboot_flag = _reboot_flag; } if (its_received == sessions_received_.end()) { sessions_received_[_sender] = std::make_tuple(its_multicast_session, its_unicast_session, its_multicast_reboot_flag, its_unicast_reboot_flag); } else { // Reboot detection: Either the flag has changed from false to true, // or the session identifier overrun while the flag is true. if (_reboot_flag && ((is_multicast && !std::get<2>(its_received->second)) || (!is_multicast && !std::get<3>(its_received->second)))) { result = true; } else { session_t its_old_session; bool its_old_reboot_flag; if (is_multicast) { its_old_session = std::get<0>(its_received->second); its_old_reboot_flag = std::get<2>(its_received->second); } else { its_old_session = std::get<1>(its_received->second); its_old_reboot_flag = std::get<3>(its_received->second); } if (its_old_reboot_flag && _reboot_flag && its_old_session >= _session) { result = true; } } if (result == false) { // no reboot -> update session/flag if (is_multicast) { std::get<0>(its_received->second) = its_multicast_session; std::get<2>(its_received->second) = its_multicast_reboot_flag; } else { std::get<1>(its_received->second) = its_unicast_session; std::get<3>(its_received->second) = its_unicast_reboot_flag; } } else { // reboot -> reset the sender data sessions_received_.erase(_sender); } } return result; } void service_discovery_impl::insert_find_entries( std::vector > &_messages, const requests_t &_requests) { std::lock_guard its_lock(requested_mutex_); entry_data_t its_data; its_data.entry_ = its_data.other_ = nullptr; for (const auto& its_service : _requests) { for (const auto& its_instance : its_service.second) { auto its_request = its_instance.second; // check if release_service was called auto the_service = requested_.find(its_service.first); if ( the_service != requested_.end() ) { auto the_instance = the_service->second.find(its_instance.first); if(the_instance != the_service->second.end() ) { uint8_t its_sent_counter = its_request->get_sent_counter(); if (its_sent_counter != repetitions_max_ + 1) { auto its_entry = std::make_shared(); if (its_entry) { its_entry->set_type(entry_type_e::FIND_SERVICE); its_entry->set_service(its_service.first); its_entry->set_instance(its_instance.first); its_entry->set_major_version(its_request->get_major()); its_entry->set_minor_version(its_request->get_minor()); its_entry->set_ttl(its_request->get_ttl()); its_sent_counter++; its_request->set_sent_counter(its_sent_counter); its_data.entry_ = its_entry; add_entry_data(_messages, its_data); } else { VSOMEIP_ERROR << "Failed to create service entry!"; } } } } } } } void service_discovery_impl::insert_offer_entries( std::vector > &_messages, const services_t &_services, bool _ignore_phase) { for (const auto& its_service : _services) { for (const auto& its_instance : its_service.second) { if ((!is_suspended_) && ((!is_diagnosis_) || (is_diagnosis_ && !configuration_->is_someip(its_service.first, its_instance.first)))) { // Only insert services with configured endpoint(s) if ((_ignore_phase || its_instance.second->is_in_mainphase()) && (its_instance.second->get_endpoint(false) || its_instance.second->get_endpoint(true))) { insert_offer_service(_messages, its_instance.second); } } } } } entry_data_t service_discovery_impl::create_eventgroup_entry( service_t _service, instance_t _instance, eventgroup_t _eventgroup, const std::shared_ptr &_subscription, reliability_type_e _reliability_type) { entry_data_t its_data; its_data.entry_ = nullptr; its_data.other_ = nullptr; std::shared_ptr its_reliable_endpoint(_subscription->get_endpoint(true)); std::shared_ptr its_unreliable_endpoint(_subscription->get_endpoint(false)); bool insert_reliable(false); bool insert_unreliable(false); switch (_reliability_type) { case reliability_type_e::RT_RELIABLE: if (its_reliable_endpoint) { insert_reliable = true; } break; case reliability_type_e::RT_UNRELIABLE: if (its_unreliable_endpoint) { insert_unreliable = true; } break; case reliability_type_e::RT_BOTH: if (its_reliable_endpoint && its_unreliable_endpoint) { insert_reliable = true; insert_unreliable = true; } break; default: break; } if (!insert_reliable && !insert_unreliable && _reliability_type != reliability_type_e::RT_UNKNOWN) { VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " "subscription doesn't match reliability type: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] " << (uint16_t) _reliability_type; return its_data; } std::shared_ptr its_entry, its_other; if (insert_reliable && its_reliable_endpoint) { const std::uint16_t its_port = its_reliable_endpoint->get_local_port(); if (its_port) { its_entry = std::make_shared(); if (!its_entry) { VSOMEIP_ERROR << __func__ << ": Could not create eventgroup entry."; return its_data; } its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); its_entry->set_service(_service); its_entry->set_instance(_instance); its_entry->set_eventgroup(_eventgroup); its_entry->set_counter(0); its_entry->set_major_version(_subscription->get_major()); its_entry->set_ttl(_subscription->get_ttl()); its_data.entry_ = its_entry; for (const auto& its_client : _subscription->get_clients()) { if (_subscription->get_state(its_client) == subscription_state_e::ST_RESUBSCRIBING_NOT_ACKNOWLEDGED) { its_other = std::make_shared(); its_other->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); its_other->set_service(_service); its_other->set_instance(_instance); its_other->set_eventgroup(_eventgroup); its_other->set_counter(0); its_other->set_major_version(_subscription->get_major()); its_other->set_ttl(0); its_data.other_ = its_other; break; } } auto its_option = create_ip_option(unicast_, its_port, true); its_data.options_.push_back(its_option); } else { VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " "local reliable port is zero: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; its_data.entry_ = nullptr; its_data.other_ = nullptr; return its_data; } } if (insert_unreliable && its_unreliable_endpoint) { const std::uint16_t its_port = its_unreliable_endpoint->get_local_port(); if (its_port) { if (!its_entry) { its_entry = std::make_shared(); if (!its_entry) { VSOMEIP_ERROR << __func__ << ": Could not create eventgroup entry."; return its_data; } its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); its_entry->set_service(_service); its_entry->set_instance(_instance); its_entry->set_eventgroup(_eventgroup); its_entry->set_counter(0); its_entry->set_major_version(_subscription->get_major()); its_entry->set_ttl(_subscription->get_ttl()); its_data.entry_ = its_entry; } for (const auto& its_client : _subscription->get_clients()) { if (_subscription->get_state(its_client) == subscription_state_e::ST_RESUBSCRIBING_NOT_ACKNOWLEDGED) { if (!its_other) { its_other = std::make_shared(); its_other->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); its_other->set_service(_service); its_other->set_instance(_instance); its_other->set_eventgroup(_eventgroup); its_other->set_counter(0); its_other->set_major_version(_subscription->get_major()); its_other->set_ttl(0); its_data.other_ = its_other; break; } } } auto its_option = create_ip_option(unicast_, its_port, false); its_data.options_.push_back(its_option); } else { VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " " local unreliable port is zero: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; its_data.entry_ = nullptr; its_data.other_ = nullptr; return its_data; } } if (its_entry &&_subscription->is_selective()) { auto its_selective_option = std::make_shared(); its_selective_option->set_clients(_subscription->get_clients()); its_data.options_.push_back(its_selective_option); } if (its_entry && its_other) { its_data.entry_ = its_other; its_data.other_ = its_entry; } return its_data; } void service_discovery_impl::insert_subscription_ack( const std::shared_ptr& _acknowledgement, const std::shared_ptr &_info, ttl_t _ttl, const std::shared_ptr &_target, const std::set &_clients) { std::unique_lock its_lock(_acknowledgement->get_lock()); auto its_message = _acknowledgement->get_current_message(); auto its_service = _info->get_service(); auto its_instance = _info->get_instance(); auto its_eventgroup = _info->get_eventgroup(); auto its_major = _info->get_major(); for (const auto& its_entry : its_message->get_entries()) { if (its_entry->is_eventgroup_entry()) { std::shared_ptr its_eventgroup_entry = std::dynamic_pointer_cast(its_entry); if (its_eventgroup_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK && its_eventgroup_entry->get_service() == its_service && its_eventgroup_entry->get_instance() == its_instance && its_eventgroup_entry->get_eventgroup() == its_eventgroup && its_eventgroup_entry->get_major_version() == its_major && its_eventgroup_entry->get_ttl() == _ttl) { if (_ttl > 0) { if (_target) { if (_target->is_reliable()) { if (!its_eventgroup_entry->get_target(true)) { its_eventgroup_entry->add_target(_target); } } else { if (!its_eventgroup_entry->get_target(false)) { its_eventgroup_entry->add_target(_target); } } } } if (_clients.size() > 1 || (*(_clients.begin())) != 0) { auto its_selective_option = its_eventgroup_entry->get_selective_option(); if (its_selective_option) its_selective_option->set_clients(_clients); } return; } } } entry_data_t its_data; std::shared_ptr its_entry = std::make_shared(); its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP_ACK); its_entry->set_service(its_service); its_entry->set_instance(its_instance); its_entry->set_eventgroup(its_eventgroup); its_entry->set_major_version(its_major); its_entry->set_reserved(0); its_entry->set_counter(0); // SWS_SD_00315 its_entry->set_ttl(_ttl); if (_ttl > 0) { if (_target) { its_entry->add_target(_target); } boost::asio::ip::address its_address; uint16_t its_port; if (_info->get_multicast(its_address, its_port) && _info->get_threshold() > 0) { // SIP_SD_855 // Only insert a multicast option for eventgroups with multicast threshold > 0 auto its_option = create_ip_option(its_address, its_port, false); its_data.options_.push_back(its_option); } } // Selective if (_clients.size() > 1 || (*(_clients.begin())) != 0) { auto its_selective_option = std::make_shared(); (void)its_selective_option->set_clients(_clients); its_data.options_.push_back(its_selective_option); } its_data.entry_ = its_entry; its_data.other_ = nullptr; add_entry_data_to_remote_subscription_ack_msg(_acknowledgement, its_data); } bool service_discovery_impl::send(bool _is_announcing) { std::shared_ptr < runtime > its_runtime = runtime_.lock(); if (its_runtime) { std::vector > its_messages; std::shared_ptr its_message; if (_is_announcing) { its_message = std::make_shared(); its_messages.push_back(its_message); std::lock_guard its_lock(offer_mutex_); services_t its_offers = host_->get_offered_services(); insert_offer_entries(its_messages, its_offers, true); // Serialize and send return send(its_messages); } } return false; } // Interface endpoint_host void service_discovery_impl::on_message( const byte_t *_data, length_t _length, const boost::asio::ip::address &_sender, const boost::asio::ip::address &_destination) { #if 0 std::stringstream msg; msg << "sdi::on_message: "; for (length_t i = 0; i < _length; ++i) msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; VSOMEIP_INFO << msg.str(); #endif std::lock_guard its_lock(check_ttl_mutex_); std::lock_guard its_session_lock(sessions_received_mutex_); if(is_suspended_) { return; } // ignore all SD messages with source address equal to node's unicast address if (!check_source_address(_sender)) { return; } const bool received_via_mcast = (_destination == sd_multicast_address_); if (received_via_mcast) { std::lock_guard its_lock(last_msg_received_timer_mutex_); boost::system::error_code ec; last_msg_received_timer_.cancel(ec); last_msg_received_timer_.expires_from_now( last_msg_received_timer_timeout_, ec); last_msg_received_timer_.async_wait( std::bind(&service_discovery_impl::on_last_msg_received_timer_expired, shared_from_this(), std::placeholders::_1)); } current_remote_address_ = _sender; deserializer_->set_data(_data, _length); std::shared_ptr its_message( deserializer_->deserialize_sd_message()); deserializer_->reset(); if (its_message) { // ignore all messages which are sent with invalid header fields if(!check_static_header_fields(its_message)) { return; } // Expire all subscriptions / services in case of reboot if (is_reboot(_sender, _destination, its_message->get_reboot_flag(), its_message->get_session())) { VSOMEIP_INFO << "Reboot detected: IP=" << _sender.to_string(); remove_remote_offer_type_by_ip(_sender); host_->expire_subscriptions(_sender); host_->expire_services(_sender); if (reboot_notification_handler_) { ip_address_t ip; if (_sender.is_v4()) { ip.address_.v4_ = _sender.to_v4().to_bytes(); ip.is_v4_ = true; } else { ip.address_.v6_ = _sender.to_v6().to_bytes(); ip.is_v4_ = false; } reboot_notification_handler_(ip); } } std::vector > its_options = its_message->get_options(); std::shared_ptr its_runtime = runtime_.lock(); if (!its_runtime) { return; } auto its_acknowledgement = std::make_shared(_sender); std::vector > its_resubscribes; its_resubscribes.push_back(std::make_shared()); const message_impl::entries_t& its_entries = its_message->get_entries(); const message_impl::entries_t::const_iterator its_end = its_entries.end(); bool is_stop_subscribe_subscribe(false); bool force_initial_events(false); bool sd_acceptance_queried(false); expired_ports_t expired_ports; sd_acceptance_state_t accept_state(expired_ports); for (auto iter = its_entries.begin(); iter != its_end; iter++) { if (!sd_acceptance_queried) { sd_acceptance_queried = true; if (sd_acceptance_handler_) { accept_state.sd_acceptance_required_ = configuration_->is_protected_device(_sender); remote_info_t remote; remote.first_ = ANY_PORT; remote.last_ = ANY_PORT; remote.is_range_ = false; if (_sender.is_v4()) { remote.ip_.address_.v4_ = _sender.to_v4().to_bytes(); remote.ip_.is_v4_ = true; } else { remote.ip_.address_.v6_ = _sender.to_v6().to_bytes(); remote.ip_.is_v4_ = false; } accept_state.accept_entries_ = sd_acceptance_handler_(remote); } else { accept_state.accept_entries_ = true; } } if ((*iter)->is_service_entry()) { std::shared_ptr its_service_entry = std::dynamic_pointer_cast(*iter); bool its_unicast_flag = its_message->get_unicast_flag(); process_serviceentry(its_service_entry, its_options, its_unicast_flag, its_resubscribes, received_via_mcast, accept_state); } else { std::shared_ptr its_eventgroup_entry = std::dynamic_pointer_cast(*iter); bool must_process(true); // Do we need to process it? if (its_eventgroup_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP) { must_process = !has_same(iter, its_end, its_options); } if (must_process) { if (is_stop_subscribe_subscribe) { force_initial_events = true; } is_stop_subscribe_subscribe = check_stop_subscribe_subscribe(iter, its_end, its_options); process_eventgroupentry(its_eventgroup_entry, its_options, its_acknowledgement, _sender, _destination, is_stop_subscribe_subscribe, force_initial_events, accept_state); } } } { std::unique_lock its_lock(its_acknowledgement->get_lock()); its_acknowledgement->complete(); // TODO: Check the following logic... if (its_acknowledgement->has_subscription()) { update_acknowledgement(its_acknowledgement); } else { if (!its_acknowledgement->is_pending() && !its_acknowledgement->is_done()) { send_subscription_ack(its_acknowledgement); } } } // check resubscriptions for validity for (auto iter = its_resubscribes.begin(); iter != its_resubscribes.end();) { if ((*iter)->get_entries().empty() || (*iter)->get_options().empty()) { iter = its_resubscribes.erase(iter); } else { iter++; } } if (!its_resubscribes.empty()) { serialize_and_send(its_resubscribes, _sender); } } else { VSOMEIP_ERROR << "service_discovery_impl::" << __func__ << ": Deserialization error."; return; } } // Entry processing void service_discovery_impl::process_serviceentry( std::shared_ptr &_entry, const std::vector > &_options, bool _unicast_flag, std::vector > &_resubscribes, bool _received_via_mcast, const sd_acceptance_state_t& _sd_ac_state) { // Read service info from entry entry_type_e its_type = _entry->get_type(); service_t its_service = _entry->get_service(); instance_t its_instance = _entry->get_instance(); major_version_t its_major = _entry->get_major_version(); minor_version_t its_minor = _entry->get_minor_version(); ttl_t its_ttl = _entry->get_ttl(); // Read address info from options boost::asio::ip::address its_reliable_address; uint16_t its_reliable_port(ILLEGAL_PORT); boost::asio::ip::address its_unreliable_address; uint16_t its_unreliable_port(ILLEGAL_PORT); for (auto i : { 1, 2 }) { for (auto its_index : _entry->get_options(uint8_t(i))) { if( _options.size() > its_index ) { std::shared_ptr < option_impl > its_option = _options[its_index]; switch (its_option->get_type()) { case option_type_e::IP4_ENDPOINT: { std::shared_ptr < ipv4_option_impl > its_ipv4_option = std::dynamic_pointer_cast < ipv4_option_impl > (its_option); boost::asio::ip::address_v4 its_ipv4_address( its_ipv4_option->get_address()); if (its_ipv4_option->get_layer_four_protocol() == layer_four_protocol_e::UDP) { its_unreliable_address = its_ipv4_address; its_unreliable_port = its_ipv4_option->get_port(); } else { its_reliable_address = its_ipv4_address; its_reliable_port = its_ipv4_option->get_port(); } break; } case option_type_e::IP6_ENDPOINT: { std::shared_ptr < ipv6_option_impl > its_ipv6_option = std::dynamic_pointer_cast < ipv6_option_impl > (its_option); boost::asio::ip::address_v6 its_ipv6_address( its_ipv6_option->get_address()); if (its_ipv6_option->get_layer_four_protocol() == layer_four_protocol_e::UDP) { its_unreliable_address = its_ipv6_address; its_unreliable_port = its_ipv6_option->get_port(); } else { its_reliable_address = its_ipv6_address; its_reliable_port = its_ipv6_option->get_port(); } break; } case option_type_e::IP4_MULTICAST: case option_type_e::IP6_MULTICAST: break; case option_type_e::CONFIGURATION: break; case option_type_e::UNKNOWN: default: VSOMEIP_ERROR << __func__ << ": Unsupported service option"; break; } } } } if (0 < its_ttl) { switch(its_type) { case entry_type_e::FIND_SERVICE: process_findservice_serviceentry(its_service, its_instance, its_major, its_minor, _unicast_flag); break; case entry_type_e::OFFER_SERVICE: process_offerservice_serviceentry(its_service, its_instance, its_major, its_minor, its_ttl, its_reliable_address, its_reliable_port, its_unreliable_address, its_unreliable_port, _resubscribes, _received_via_mcast, _sd_ac_state); break; case entry_type_e::UNKNOWN: default: VSOMEIP_ERROR << __func__ << ": Unsupported service entry type"; } } else if (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_) { std::shared_ptr its_request = find_request(its_service, its_instance); if (its_request) { std::lock_guard its_lock(requested_mutex_); // ID: SIP_SD_830 its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); } remove_remote_offer_type(its_service, its_instance, its_reliable_address, its_reliable_port, its_unreliable_address, its_unreliable_port); reset_subscriptions(its_service, its_instance); if (!is_diagnosis_ && !is_suspended_) { host_->del_routing_info(its_service, its_instance, (its_reliable_port != ILLEGAL_PORT), (its_unreliable_port != ILLEGAL_PORT)); } } } void service_discovery_impl::process_offerservice_serviceentry( service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl, const boost::asio::ip::address &_reliable_address, uint16_t _reliable_port, const boost::asio::ip::address &_unreliable_address, uint16_t _unreliable_port, std::vector > &_resubscribes, bool _received_via_mcast, const sd_acceptance_state_t& _sd_ac_state) { std::shared_ptr < runtime > its_runtime = runtime_.lock(); if (!its_runtime) return; bool is_secure = configuration_->is_secure_service(_service, _instance); if (is_secure && ((_reliable_port != ILLEGAL_PORT && !configuration_->is_secure_port(_reliable_address, _reliable_port, true)) || (_unreliable_port != ILLEGAL_PORT && !configuration_->is_secure_port(_unreliable_address, _unreliable_port, false)))) { VSOMEIP_WARNING << __func__ << ": Ignoring offer of [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; return; } std::shared_ptr its_request = find_request(_service, _instance); if (its_request) { std::lock_guard its_lock(requested_mutex_); its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); } remote_offer_type_e offer_type(remote_offer_type_e::UNKNOWN); if (_reliable_port != ILLEGAL_PORT && _unreliable_port != ILLEGAL_PORT && !_reliable_address.is_unspecified() && !_unreliable_address.is_unspecified()) { offer_type = remote_offer_type_e::RELIABLE_UNRELIABLE; } else if (_unreliable_port != ILLEGAL_PORT && !_unreliable_address.is_unspecified()) { offer_type = remote_offer_type_e::UNRELIABLE; } else if (_reliable_port != ILLEGAL_PORT && !_reliable_address.is_unspecified()) { offer_type = remote_offer_type_e::RELIABLE; } else { VSOMEIP_WARNING << __func__ << ": Unknown remote offer type [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; return; // Unknown remote offer type --> no way to access it! } if (_sd_ac_state.sd_acceptance_required_) { auto expire_subscriptions_and_services = [this, &_sd_ac_state](const boost::asio::ip::address& _address, std::uint16_t _port, bool _reliable) { const auto its_port_pair = std::make_pair(_reliable, _port); if (_sd_ac_state.expired_ports_.find(its_port_pair) == _sd_ac_state.expired_ports_.end()) { VSOMEIP_WARNING << "service_discovery_impl::" << __func__ << ": Do not accept offer from " << _address.to_string() << ":" << std::dec << _port << " reliable=" << _reliable; remove_remote_offer_type_by_ip(_address, _port, _reliable); host_->expire_subscriptions(_address, _port, _reliable); host_->expire_services(_address, _port, _reliable); _sd_ac_state.expired_ports_.insert(its_port_pair); } }; // return if the registered sd_acceptance handler returned false // and for the provided port sd_acceptance is necessary switch (offer_type) { case remote_offer_type_e::UNRELIABLE: if (!_sd_ac_state.accept_entries_ && configuration_->is_protected_port( _unreliable_address, _unreliable_port, false)) { expire_subscriptions_and_services(_unreliable_address, _unreliable_port, false); return; } break; case remote_offer_type_e::RELIABLE: if (!_sd_ac_state.accept_entries_ && configuration_->is_protected_port( _reliable_address, _reliable_port, true)) { expire_subscriptions_and_services(_reliable_address, _reliable_port, true); return; } break; case remote_offer_type_e::RELIABLE_UNRELIABLE: if (!_sd_ac_state.accept_entries_ && (configuration_->is_protected_port( _unreliable_address, _unreliable_port, false) || configuration_->is_protected_port( _reliable_address, _reliable_port, true))) { expire_subscriptions_and_services(_unreliable_address, _unreliable_port, false); expire_subscriptions_and_services(_reliable_address, _reliable_port, true); return; } break; case remote_offer_type_e::UNKNOWN: default: break; } } if (update_remote_offer_type(_service, _instance, offer_type, _reliable_address, _reliable_port, _unreliable_address, _unreliable_port)) { VSOMEIP_WARNING << __func__ << ": Remote offer type changed [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; // Only update eventgroup reliability type if it was initially unknown auto its_eventgroups = host_->get_subscribed_eventgroups(_service, _instance); for (auto eg : its_eventgroups) { auto its_info = host_->find_eventgroup( _service, _instance, eg); if (its_info) { if (its_info->is_reliability_auto_mode()) { reliability_type_e its_reliability(reliability_type_e::RT_UNKNOWN); switch (offer_type) { case remote_offer_type_e::RELIABLE: its_reliability = reliability_type_e::RT_RELIABLE; break; case remote_offer_type_e::UNRELIABLE: its_reliability = reliability_type_e::RT_UNRELIABLE; break; case remote_offer_type_e::RELIABLE_UNRELIABLE: its_reliability = reliability_type_e::RT_BOTH; break; default: ; } if (its_reliability != reliability_type_e::RT_UNKNOWN && its_reliability != its_info->get_reliability()) { VSOMEIP_WARNING << "sd::" << __func__ << ": eventgroup reliability type changed [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << eg << "]" << " using reliability type: " << std::hex << std::setw(4) << std::setfill('0') << (uint16_t) its_reliability; its_info->set_reliability(its_reliability); } } } } } host_->add_routing_info(_service, _instance, _major, _minor, _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_), _reliable_address, _reliable_port, _unreliable_address, _unreliable_port); // No need to resubscribe for unicast offers if (_received_via_mcast) { std::int32_t its_remaining = VSOMEIP_MAX_UDP_MESSAGE_SIZE; its_remaining -= _resubscribes.back()->get_size(); std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { if (0 < found_instance->second.size()) { for (const auto& its_eventgroup : found_instance->second) { auto its_subscription = its_eventgroup.second; std::shared_ptr its_reliable, its_unreliable; get_subscription_endpoints(_service, _instance, its_reliable, its_unreliable); its_subscription->set_endpoint(its_reliable, true); its_subscription->set_endpoint(its_unreliable, false); for (const auto& its_client : its_subscription->get_clients()) { if (its_subscription->get_state(its_client) == subscription_state_e::ST_ACKNOWLEDGED) { its_subscription->set_state(its_client, subscription_state_e::ST_RESUBSCRIBING); } else { its_subscription->set_state(its_client, subscription_state_e::ST_RESUBSCRIBING_NOT_ACKNOWLEDGED); } } const reliability_type_e its_reliability = get_eventgroup_reliability(_service, _instance, its_eventgroup.first, its_subscription); auto its_data = create_eventgroup_entry(_service, _instance, its_eventgroup.first, its_subscription, its_reliability); if (its_data.entry_) { add_entry_data(_resubscribes, its_data); } for (const auto& its_client : its_subscription->get_clients()) { its_subscription->set_state(its_client, subscription_state_e::ST_NOT_ACKNOWLEDGED); } } } } } } } void service_discovery_impl::process_findservice_serviceentry( service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, bool _unicast_flag) { if (_instance != ANY_INSTANCE) { std::shared_ptr its_info = host_->get_offered_service( _service, _instance); if (its_info) { if (_major == ANY_MAJOR || _major == its_info->get_major()) { if (_minor == 0xFFFFFFFF || _minor <= its_info->get_minor()) { if (its_info->get_endpoint(false) || its_info->get_endpoint(true)) { send_uni_or_multicast_offerservice(its_info, _unicast_flag); } } } } } else { std::map> offered_instances = host_->get_offered_service_instances(_service); // send back all available instances for (const auto &found_instance : offered_instances) { auto its_info = found_instance.second; if (_major == ANY_MAJOR || _major == its_info->get_major()) { if (_minor == 0xFFFFFFFF || _minor <= its_info->get_minor()) { if (its_info->get_endpoint(false) || its_info->get_endpoint(true)) { send_uni_or_multicast_offerservice(its_info, _unicast_flag); } } } } } } void service_discovery_impl::send_unicast_offer_service( const std::shared_ptr &_info) { std::shared_ptr its_runtime = runtime_.lock(); if (!its_runtime) { return; } auto its_offer_message(std::make_shared()); std::vector > its_messages; its_messages.push_back(its_offer_message); insert_offer_service(its_messages, _info); serialize_and_send(its_messages, current_remote_address_); } void service_discovery_impl::send_multicast_offer_service( const std::shared_ptr &_info) { auto its_offer_message(std::make_shared()); std::vector > its_messages; its_messages.push_back(its_offer_message); insert_offer_service(its_messages, _info); serialize_and_send(its_messages, current_remote_address_); } void service_discovery_impl::on_endpoint_connected( service_t _service, instance_t _instance, const std::shared_ptr &_endpoint) { std::shared_ptr its_runtime = runtime_.lock(); if (!its_runtime) { return; } // TODO: Simplify this method! It is not clear, why we need to check // both endpoints here although the method is always called for a // single one. std::vector > its_messages; its_messages.push_back(std::make_shared()); boost::asio::ip::address its_address; std::shared_ptr its_dummy; if (_endpoint->is_reliable()) get_subscription_address(_endpoint, its_dummy, its_address); else get_subscription_address(its_dummy, _endpoint, its_address); { std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { if (0 < found_instance->second.size()) { for (const auto &its_eventgroup : found_instance->second) { std::shared_ptr its_subscription(its_eventgroup.second); if (its_subscription) { if (!its_subscription->is_tcp_connection_established() || !its_subscription->is_udp_connection_established()) { const std::shared_ptr its_reliable_endpoint( its_subscription->get_endpoint(true)); const std::shared_ptr its_unreliable_endpoint( its_subscription->get_endpoint(false)); if (its_reliable_endpoint && its_reliable_endpoint->is_established()) { if (its_reliable_endpoint.get() == _endpoint.get()) { // mark tcp as established its_subscription->set_tcp_connection_established(true); } } if (its_unreliable_endpoint && its_unreliable_endpoint->is_established()) { if (its_unreliable_endpoint.get() == _endpoint.get()) { // mark udp as established its_subscription->set_udp_connection_established(true); } } if ((its_reliable_endpoint && its_unreliable_endpoint && its_subscription->is_tcp_connection_established() && its_subscription->is_udp_connection_established()) || (its_reliable_endpoint && !its_unreliable_endpoint && its_subscription->is_tcp_connection_established()) || (its_unreliable_endpoint && !its_reliable_endpoint && its_subscription->is_udp_connection_established())) { std::shared_ptr its_unreliable; std::shared_ptr its_reliable; get_subscription_endpoints(_service, _instance, its_reliable, its_unreliable); get_subscription_address(its_reliable, its_unreliable, its_address); its_subscription->set_endpoint(its_reliable, true); its_subscription->set_endpoint(its_unreliable, false); for (const auto& its_client : its_subscription->get_clients()) its_subscription->set_state(its_client, subscription_state_e::ST_NOT_ACKNOWLEDGED); const reliability_type_e its_reliability_type = get_eventgroup_reliability(_service, _instance, its_eventgroup.first, its_subscription); auto its_data = create_eventgroup_entry(_service, _instance, its_eventgroup.first, its_subscription, its_reliability_type); if (its_data.entry_) { add_entry_data(its_messages, its_data); } } } } } } } } } serialize_and_send(its_messages, its_address); } std::shared_ptr service_discovery_impl::create_ip_option( const boost::asio::ip::address &_address, uint16_t _port, bool _is_reliable) const { std::shared_ptr its_option; if (_address.is_v4()) { its_option = std::make_shared( _address, _port, _is_reliable); } else { its_option = std::make_shared( _address, _port, _is_reliable); } return its_option; } void service_discovery_impl::insert_offer_service( std::vector > &_messages, const std::shared_ptr &_info) { entry_data_t its_data; its_data.entry_ = its_data.other_ = nullptr; std::shared_ptr its_reliable = _info->get_endpoint(true); if (its_reliable) { auto its_new_option = create_ip_option(unicast_, its_reliable->get_local_port(), true); its_data.options_.push_back(its_new_option); } std::shared_ptr its_unreliable = _info->get_endpoint(false); if (its_unreliable) { auto its_new_option = create_ip_option(unicast_, its_unreliable->get_local_port(), false); its_data.options_.push_back(its_new_option); } auto its_entry = std::make_shared(); if (its_entry) { its_data.entry_ = its_entry; its_entry->set_type(entry_type_e::OFFER_SERVICE); its_entry->set_service(_info->get_service()); its_entry->set_instance(_info->get_instance()); its_entry->set_major_version(_info->get_major()); its_entry->set_minor_version(_info->get_minor()); ttl_t its_ttl = _info->get_ttl(); if (its_ttl > 0) its_ttl = ttl_; its_entry->set_ttl(its_ttl); add_entry_data(_messages, its_data); } else { VSOMEIP_ERROR << __func__ << ": Failed to create service entry."; } } void service_discovery_impl::process_eventgroupentry( std::shared_ptr &_entry, const std::vector > &_options, std::shared_ptr &_acknowledgement, const boost::asio::ip::address &_sender, const boost::asio::ip::address &_destination, bool _is_stop_subscribe_subscribe, bool _force_initial_events, const sd_acceptance_state_t& _sd_ac_state) { std::set its_clients({0}); // maybe overridden for selectives auto its_sender = _acknowledgement->get_target_address(); auto its_session = _entry->get_owning_message()->get_session(); service_t its_service = _entry->get_service(); instance_t its_instance = _entry->get_instance(); eventgroup_t its_eventgroup = _entry->get_eventgroup(); entry_type_e its_type = _entry->get_type(); major_version_t its_major = _entry->get_major_version(); ttl_t its_ttl = _entry->get_ttl(); auto its_info = host_->find_eventgroup( its_service, its_instance, its_eventgroup); if (!its_info) { if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type) { // We received a subscription for a non-existing eventgroup. // --> Create dummy eventgroupinfo to send Nack. its_info = std::make_shared(its_service, its_instance, its_eventgroup, its_major, its_ttl); boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Received a SubscribeEventGroup entry for unknown eventgroup " << " from: " << its_sender.to_string(ec) << " for: [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "] session: " << std::hex << std::setw(4) << std::setfill('0') << its_session << ", ttl: " << its_ttl; if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } } else { // We received a subscription [n]ack for an eventgroup that does not exist. // --> Remove subscription. unsubscribe(its_service, its_instance, its_eventgroup, VSOMEIP_ROUTING_CLIENT); } return; } if (_entry->get_owning_message()->get_return_code() != return_code) { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Invalid return code in SOMEIP/SD header " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } if(its_type == entry_type_e::SUBSCRIBE_EVENTGROUP) { if (_destination.is_multicast() ) { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Received a SubscribeEventGroup entry on multicast address " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } if (_entry->get_num_options(1) == 0 && _entry->get_num_options(2) == 0) { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Invalid number of options in SubscribeEventGroup entry " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { // increase number of required acks by one as number required acks // is calculated based on the number of referenced options insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } if (_entry->get_owning_message()->get_options_length() < 12) { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Invalid options length in SOMEIP/SD message " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } if (_options.size() // cast is needed in order to get unsigned type since int will be promoted // by the + operator on 16 bit or higher machines. < static_cast>::size_type>( (_entry->get_num_options(1)) + (_entry->get_num_options(2)))) { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << "Fewer options in SOMEIP/SD message than " "referenced in EventGroup entry or malformed option received " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { // set to 0 to ensure an answer containing at least this subscribe_nack is sent out insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } if (_entry->get_owning_message()->get_someip_length() < _entry->get_owning_message()->get_length() && its_ttl > 0) { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": SOME/IP length field in SubscribeEventGroup message header: [" << std::dec << _entry->get_owning_message()->get_someip_length() << "] bytes, is shorter than length of deserialized message: [" << (uint32_t) _entry->get_owning_message()->get_length() << "] bytes. " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; return; } } boost::asio::ip::address its_first_address; uint16_t its_first_port(ILLEGAL_PORT); bool is_first_reliable(false); boost::asio::ip::address its_second_address; uint16_t its_second_port(ILLEGAL_PORT); bool is_second_reliable(false); for (auto i : { 1, 2 }) { for (auto its_index : _entry->get_options(uint8_t(i))) { std::shared_ptr < option_impl > its_option; try { its_option = _options.at(its_index); } catch(const std::out_of_range& e) { #ifdef _WIN32 e; // silence MSVC warning C4101 #endif boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Fewer options in SD message than " "referenced in EventGroup entry for " "option run number: " << i << " " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type && its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } switch (its_option->get_type()) { case option_type_e::IP4_ENDPOINT: { if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type) { std::shared_ptr < ipv4_option_impl > its_ipv4_option = std::dynamic_pointer_cast < ipv4_option_impl > (its_option); boost::asio::ip::address_v4 its_ipv4_address( its_ipv4_option->get_address()); if (!check_layer_four_protocol(its_ipv4_option)) { if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } if (its_first_port == ILLEGAL_PORT) { its_first_address = its_ipv4_address; its_first_port = its_ipv4_option->get_port(); is_first_reliable = (its_ipv4_option->get_layer_four_protocol() == layer_four_protocol_e::TCP); // reject subscription referencing two conflicting options of same protocol type // ID: SIP_SD_1144 if (is_first_reliable == is_second_reliable && its_second_port != ILLEGAL_PORT) { if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Multiple IPv4 endpoint options of same kind referenced! " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session << " is_first_reliable: " << is_first_reliable; return; } if (!check_ipv4_address(its_first_address) || 0 == its_first_port) { if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Invalid port or IP address in first IPv4 endpoint option specified! " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; return; } } else if (its_second_port == ILLEGAL_PORT) { its_second_address = its_ipv4_address; its_second_port = its_ipv4_option->get_port(); is_second_reliable = (its_ipv4_option->get_layer_four_protocol() == layer_four_protocol_e::TCP); // reject subscription referencing two conflicting options of same protocol type // ID: SIP_SD_1144 if (is_second_reliable == is_first_reliable && its_first_port != ILLEGAL_PORT) { if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Multiple IPv4 endpoint options of same kind referenced! " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session << " is_second_reliable: " << is_second_reliable; return; } if (!check_ipv4_address(its_second_address) || 0 == its_second_port) { if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Invalid port or IP address in second IPv4 endpoint option specified! " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; return; } } else { // TODO: error message, too many endpoint options! } } else { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Invalid eventgroup option (IPv4 Endpoint)" << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; } break; } case option_type_e::IP6_ENDPOINT: { if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type) { std::shared_ptr < ipv6_option_impl > its_ipv6_option = std::dynamic_pointer_cast < ipv6_option_impl > (its_option); boost::asio::ip::address_v6 its_ipv6_address( its_ipv6_option->get_address()); if (!check_layer_four_protocol(its_ipv6_option)) { if(its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << "Invalid layer 4 protocol type in IPv6 endpoint option specified! " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; return; } if (its_first_port == ILLEGAL_PORT) { its_first_address = its_ipv6_address; its_first_port = its_ipv6_option->get_port(); is_first_reliable = (its_ipv6_option->get_layer_four_protocol() == layer_four_protocol_e::TCP); if (is_first_reliable == is_second_reliable && its_second_port != ILLEGAL_PORT) { if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Multiple IPv6 endpoint options of same kind referenced! " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session << " is_first_reliable: " << is_first_reliable; return; } } else if (its_second_port == ILLEGAL_PORT) { its_second_address = its_ipv6_address; its_second_port = its_ipv6_option->get_port(); is_second_reliable = (its_ipv6_option->get_layer_four_protocol() == layer_four_protocol_e::TCP); if (is_second_reliable == is_first_reliable && its_first_port != ILLEGAL_PORT) { if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Multiple IPv6 endpoint options of same kind referenced! " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session << " is_second_reliable: " << is_second_reliable; return; } } else { // TODO: error message, too many endpoint options! } } else { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Invalid eventgroup option (IPv6 Endpoint) " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; } break; } case option_type_e::IP4_MULTICAST: if (entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) { std::shared_ptr < ipv4_option_impl > its_ipv4_option = std::dynamic_pointer_cast < ipv4_option_impl > (its_option); boost::asio::ip::address_v4 its_ipv4_address( its_ipv4_option->get_address()); if (its_first_port == ILLEGAL_PORT) { its_first_address = its_ipv4_address; its_first_port = its_ipv4_option->get_port(); } else if (its_second_port == ILLEGAL_PORT) { its_second_address = its_ipv4_address; its_second_port = its_ipv4_option->get_port(); } else { // TODO: error message, too many endpoint options! } // ID: SIP_SD_946, ID: SIP_SD_1144 if (its_first_port != ILLEGAL_PORT && its_second_port != ILLEGAL_PORT) { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Multiple IPv4 multicast options referenced! " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; return; } } else { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Invalid eventgroup option (IPv4 Multicast) " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; } break; case option_type_e::IP6_MULTICAST: if (entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) { std::shared_ptr < ipv6_option_impl > its_ipv6_option = std::dynamic_pointer_cast < ipv6_option_impl > (its_option); boost::asio::ip::address_v6 its_ipv6_address( its_ipv6_option->get_address()); if (its_first_port == ILLEGAL_PORT) { its_first_address = its_ipv6_address; its_first_port = its_ipv6_option->get_port(); } else if (its_second_port == ILLEGAL_PORT) { its_second_address = its_ipv6_address; its_second_port = its_ipv6_option->get_port(); } else { // TODO: error message, too many endpoint options! } // ID: SIP_SD_946, ID: SIP_SD_1144 if (its_first_port != ILLEGAL_PORT && its_second_port != ILLEGAL_PORT) { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << "Multiple IPv6 multicast options referenced! " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; return; } } else { boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Invalid eventgroup option (IPv6 Multicast) " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; } break; case option_type_e::CONFIGURATION: { break; } case option_type_e::SELECTIVE: { auto its_selective_option = std::dynamic_pointer_cast(its_option); if (its_selective_option) { its_clients = its_selective_option->get_clients(); } break; } case option_type_e::UNKNOWN: default: boost::system::error_code ec; VSOMEIP_WARNING << __func__ << ": Unsupported eventgroup option [" << std::hex << (int)its_option->get_type() << "] " << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); return; } break; } } } if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type) { handle_eventgroup_subscription(its_service, its_instance, its_eventgroup, its_major, its_ttl, 0, 0, its_first_address, its_first_port, is_first_reliable, its_second_address, its_second_port, is_second_reliable, _acknowledgement, _is_stop_subscribe_subscribe, _force_initial_events, its_clients, _sd_ac_state, its_info); } else { if (entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) { //this type is used for ACK and NACK messages if (its_ttl > 0) { handle_eventgroup_subscription_ack(its_service, its_instance, its_eventgroup, its_major, its_ttl, 0, its_clients, _sender, its_first_address, its_first_port); } else { handle_eventgroup_subscription_nack(its_service, its_instance, its_eventgroup, 0, its_clients); } } } } void service_discovery_impl::handle_eventgroup_subscription( service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, uint8_t _counter, uint16_t _reserved, const boost::asio::ip::address &_first_address, uint16_t _first_port, bool _is_first_reliable, const boost::asio::ip::address &_second_address, uint16_t _second_port, bool _is_second_reliable, std::shared_ptr &_acknowledgement, bool _is_stop_subscribe_subscribe, bool _force_initial_events, const std::set &_clients, const sd_acceptance_state_t& _sd_ac_state, const std::shared_ptr& _info) { (void)_counter; (void)_reserved; auto its_messages = _acknowledgement->get_messages(); #ifndef VSOMEIP_ENABLE_COMPAT bool reliablility_nack(false); if (_info) { const bool first_port_set(_first_port != ILLEGAL_PORT); const bool second_port_set(_second_port != ILLEGAL_PORT); switch (_info->get_reliability()) { case reliability_type_e::RT_UNRELIABLE: if (!(first_port_set && !_is_first_reliable) && !(second_port_set && !_is_second_reliable)) { reliablility_nack = true; } break; case reliability_type_e::RT_RELIABLE: if (!(first_port_set && _is_first_reliable) && !(second_port_set && _is_second_reliable)) { reliablility_nack = true; } break; case reliability_type_e::RT_BOTH: if (_first_port == ILLEGAL_PORT || _second_port == ILLEGAL_PORT) { reliablility_nack = true; } if (_is_first_reliable == _is_second_reliable) { reliablility_nack = true; } break; default: break; } } if (reliablility_nack && _ttl > 0) { insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients); boost::system::error_code ec; // TODO: Add sender and session id VSOMEIP_WARNING << __func__ << ": Subscription for [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" << " not valid: Event configuration (" << (std::uint32_t)_info->get_reliability() << ") does not match the provided endpoint options: " << _first_address.to_string(ec) << ":" << std::dec << _first_port << " " << _second_address.to_string(ec) << ":" << std::dec << _second_port; return; } #endif std::shared_ptr its_subscriber; std::shared_ptr its_reliable; std::shared_ptr its_unreliable; // wrong major version if (_major != _info->get_major()) { // Create a temporary info object with TTL=0 --> send NACK auto its_info = std::make_shared(_service, _instance, _eventgroup, _major, 0); boost::system::error_code ec; // TODO: Add session id VSOMEIP_ERROR << __func__ << ": Requested major version:[" << (uint32_t) _major << "] in subscription to service: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" << " does not match with services major version:[" << (uint32_t) _info->get_major() << "] subscriber: " << _first_address.to_string(ec) << ":" << std::dec << _first_port; if (_ttl > 0) { insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, _clients); } return; } else { boost::asio::ip::address its_first_address, its_second_address; if (ILLEGAL_PORT != _first_port) { uint16_t its_first_port(0); its_subscriber = endpoint_definition::get( _first_address, _first_port, _is_first_reliable, _service, _instance); if (!_is_first_reliable && _info->get_multicast(its_first_address, its_first_port) && _info->is_sending_multicast()) { // udp multicast its_unreliable = endpoint_definition::get( its_first_address, its_first_port, false, _service, _instance); } else if (_is_first_reliable) { // tcp unicast its_reliable = its_subscriber; // check if TCP connection is established by client if (_ttl > 0 && !is_tcp_connected(_service, _instance, its_reliable)) { insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients); boost::system::error_code ec; // TODO: Add sender and session id VSOMEIP_ERROR << "TCP connection to target1: [" << its_reliable->get_address().to_string() << ":" << its_reliable->get_port() << "] not established for subscription to: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "; return; } } else { // udp unicast its_unreliable = its_subscriber; } } if (ILLEGAL_PORT != _second_port) { uint16_t its_second_port(0); its_subscriber = endpoint_definition::get( _second_address, _second_port, _is_second_reliable, _service, _instance); if (!_is_second_reliable && _info->get_multicast(its_second_address, its_second_port) && _info->is_sending_multicast()) { // udp multicast its_unreliable = endpoint_definition::get( its_second_address, its_second_port, false, _service, _instance); } else if (_is_second_reliable) { // tcp unicast its_reliable = its_subscriber; // check if TCP connection is established by client if (_ttl > 0 && !is_tcp_connected(_service, _instance, its_reliable)) { insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients); boost::system::error_code ec; // TODO: Add sender and session id VSOMEIP_ERROR << "TCP connection to target2 : [" << its_reliable->get_address().to_string() << ":" << its_reliable->get_port() << "] not established for subscription to: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "; return; } } else { // udp unicast its_unreliable = its_subscriber; } } } // check if the subscription should be rejected because of sd_acceptance_handling if (_ttl > 0 && _sd_ac_state.sd_acceptance_required_) { bool insert_nack(false); if (_first_port != ILLEGAL_PORT && !_sd_ac_state.accept_entries_ && configuration_->is_protected_port(_first_address, _first_port, _is_first_reliable)) { insert_nack = true; } if (!insert_nack && _second_port != ILLEGAL_PORT && !_sd_ac_state.accept_entries_ && configuration_->is_protected_port(_second_address, _second_port, _is_second_reliable)) { insert_nack = true; } if (insert_nack) { insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients); return; } } // Create subscription object auto its_subscription = std::make_shared(); its_subscription->set_eventgroupinfo(_info); its_subscription->set_subscriber(its_subscriber); its_subscription->set_reliable(its_reliable); its_subscription->set_unreliable(its_unreliable); its_subscription->reset(_clients); if (_ttl == 0) { // --> unsubscribe its_subscription->set_ttl(0); if (!_is_stop_subscribe_subscribe) { { std::lock_guard its_lock(pending_remote_subscriptions_mutex_); pending_remote_subscriptions_[its_subscription] = _acknowledgement; _acknowledgement->add_subscription(its_subscription); } host_->on_remote_unsubscribe(its_subscription); } return; } if (_force_initial_events) { its_subscription->set_force_initial_events(true); } its_subscription->set_ttl(_ttl * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_)); { std::lock_guard its_lock(pending_remote_subscriptions_mutex_); pending_remote_subscriptions_[its_subscription] = _acknowledgement; _acknowledgement->add_subscription(its_subscription); } host_->on_remote_subscribe(its_subscription, std::bind(&service_discovery_impl::update_remote_subscription, shared_from_this(), std::placeholders::_1)); } void service_discovery_impl::handle_eventgroup_subscription_nack( service_t _service, instance_t _instance, eventgroup_t _eventgroup, uint8_t _counter, const std::set &_clients) { (void)_counter; std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { auto its_subscription = found_eventgroup->second; for (const auto& its_client : _clients) { host_->on_subscribe_nack(its_client, _service, _instance, _eventgroup, ANY_EVENT, PENDING_SUBSCRIPTION_ID); // TODO: This is a dummy call... } if (!its_subscription->is_selective()) { auto its_reliable = its_subscription->get_endpoint(true); if (its_reliable) its_reliable->restart(); } } } } } void service_discovery_impl::handle_eventgroup_subscription_ack( service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, uint8_t _counter, const std::set &_clients, const boost::asio::ip::address &_sender, const boost::asio::ip::address &_address, uint16_t _port) { (void)_major; (void)_ttl; (void)_counter; std::lock_guard its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { for (const auto& its_client : _clients) { if (found_eventgroup->second->get_state(its_client) == subscription_state_e::ST_NOT_ACKNOWLEDGED) { found_eventgroup->second->set_state(its_client, subscription_state_e::ST_ACKNOWLEDGED); host_->on_subscribe_ack(its_client, _service, _instance, _eventgroup, ANY_EVENT, PENDING_SUBSCRIPTION_ID); } } if (_address.is_multicast()) { host_->on_subscribe_ack_with_multicast( _service, _instance, _sender, _address, _port); } } } } } bool service_discovery_impl::is_tcp_connected(service_t _service, instance_t _instance, const std::shared_ptr& its_endpoint) { bool is_connected = false; std::shared_ptr its_info = host_->get_offered_service(_service, _instance); if (its_info) { //get reliable server endpoint auto its_reliable_server_endpoint = std::dynamic_pointer_cast< tcp_server_endpoint_impl>(its_info->get_endpoint(true)); if (its_reliable_server_endpoint && its_reliable_server_endpoint->is_established(its_endpoint)) { is_connected = true; } } return is_connected; } bool service_discovery_impl::send( const std::vector > &_messages) { bool its_result(true); std::lock_guard its_lock(serialize_mutex_); for (const auto &m : _messages) { if (m->has_entry()) { std::pair its_session = get_session(unicast_); m->set_session(its_session.first); m->set_reboot_flag(its_session.second); if (host_->send(VSOMEIP_SD_CLIENT, m)) { increment_session(unicast_); } } else { its_result = false; } } return its_result; } bool service_discovery_impl::serialize_and_send( const std::vector > &_messages, const boost::asio::ip::address &_address) { bool its_result(true); if (!_address.is_unspecified()) { std::lock_guard its_lock(serialize_mutex_); for (const auto &m : _messages) { if (m->has_entry()) { std::pair its_session = get_session(_address); m->set_session(its_session.first); m->set_reboot_flag(its_session.second); if (serializer_->serialize(m.get())) { if (host_->send_via_sd(endpoint_definition::get(_address, port_, reliable_, m->get_service(), m->get_instance()), serializer_->get_data(), serializer_->get_size(), port_)) { increment_session(_address); } } else { VSOMEIP_ERROR << "service_discovery_impl::" << __func__ << ": Serialization failed!"; its_result = false; } serializer_->reset(); } else { its_result = false; } } } return its_result; } void service_discovery_impl::start_ttl_timer() { std::lock_guard its_lock(ttl_timer_mutex_); boost::system::error_code ec; ttl_timer_.expires_from_now(std::chrono::milliseconds(ttl_timer_runtime_), ec); ttl_timer_.async_wait( std::bind(&service_discovery_impl::check_ttl, shared_from_this(), std::placeholders::_1)); } void service_discovery_impl::stop_ttl_timer() { std::lock_guard its_lock(ttl_timer_mutex_); boost::system::error_code ec; ttl_timer_.cancel(ec); } void service_discovery_impl::check_ttl(const boost::system::error_code &_error) { if (!_error) { { std::lock_guard its_lock(check_ttl_mutex_); host_->update_routing_info(ttl_timer_runtime_); } start_ttl_timer(); } } bool service_discovery_impl::check_static_header_fields( const std::shared_ptr &_message) const { if(_message->get_protocol_version() != protocol_version) { VSOMEIP_ERROR << "Invalid protocol version in SD header"; return false; } if(_message->get_interface_version() != interface_version) { VSOMEIP_ERROR << "Invalid interface version in SD header"; return false; } if(_message->get_message_type() != message_type) { VSOMEIP_ERROR << "Invalid message type in SD header"; return false; } if(_message->get_return_code() > return_code_e::E_OK && _message->get_return_code()< return_code_e::E_UNKNOWN) { VSOMEIP_ERROR << "Invalid return code in SD header"; return false; } return true; } bool service_discovery_impl::check_layer_four_protocol( const std::shared_ptr& _ip_option) const { if (_ip_option->get_layer_four_protocol() == layer_four_protocol_e::UNKNOWN) { VSOMEIP_ERROR << "Invalid layer 4 protocol in IP endpoint option"; return false; } return true; } void service_discovery_impl::start_subscription_expiration_timer() { std::lock_guard its_lock(subscription_expiration_timer_mutex_); start_subscription_expiration_timer_unlocked(); } void service_discovery_impl::start_subscription_expiration_timer_unlocked() { subscription_expiration_timer_.expires_at(next_subscription_expiration_); subscription_expiration_timer_.async_wait( std::bind(&service_discovery_impl::expire_subscriptions, shared_from_this(), std::placeholders::_1)); } void service_discovery_impl::stop_subscription_expiration_timer() { std::lock_guard its_lock(subscription_expiration_timer_mutex_); stop_subscription_expiration_timer_unlocked(); } void service_discovery_impl::stop_subscription_expiration_timer_unlocked() { subscription_expiration_timer_.cancel(); } void service_discovery_impl::expire_subscriptions( const boost::system::error_code &_error) { if (!_error) { next_subscription_expiration_ = host_->expire_subscriptions(false); start_subscription_expiration_timer(); } } bool service_discovery_impl::check_ipv4_address( const boost::asio::ip::address& its_address) const { //Check unallowed ipv4 address bool is_valid = true; const boost::asio::ip::address_v4::bytes_type its_unicast_address = unicast_.to_v4().to_bytes(); const boost::asio::ip::address_v4::bytes_type endpoint_address = its_address.to_v4().to_bytes(); const boost::asio::ip::address_v4::bytes_type its_netmask = configuration_->get_netmask().to_v4().to_bytes(); //same address as unicast address of DUT not allowed if (its_unicast_address == endpoint_address) { VSOMEIP_ERROR << "Subscriber's IP address is same as host's address! : " << its_address; is_valid = false; } else { const std::uint32_t self = VSOMEIP_BYTES_TO_LONG(its_unicast_address[0], its_unicast_address[1], its_unicast_address[2], its_unicast_address[3]); const std::uint32_t remote = VSOMEIP_BYTES_TO_LONG(endpoint_address[0], endpoint_address[1], endpoint_address[2], endpoint_address[3]); const std::uint32_t netmask = VSOMEIP_BYTES_TO_LONG(its_netmask[0], its_netmask[1], its_netmask[2], its_netmask[3]); if ((self & netmask) != (remote & netmask)) { VSOMEIP_ERROR<< "Subscriber's IP isn't in the same subnet as host's IP: " << its_address; is_valid = false; } } return is_valid; } void service_discovery_impl::offer_service(const std::shared_ptr &_info) { service_t its_service = _info->get_service(); service_t its_instance = _info->get_instance(); std::lock_guard its_lock(collected_offers_mutex_); // check if offer is in map bool found(false); const auto its_service_it = collected_offers_.find(its_service); if (its_service_it != collected_offers_.end()) { const auto its_instance_it = its_service_it->second.find(its_instance); if (its_instance_it != its_service_it->second.end()) { found = true; } } if (!found) { collected_offers_[its_service][its_instance] = _info; } } void service_discovery_impl::start_offer_debounce_timer(bool _first_start) { std::lock_guard its_lock(offer_debounce_timer_mutex_); boost::system::error_code ec; if (_first_start) { offer_debounce_timer_.expires_from_now(initial_delay_, ec); } else { offer_debounce_timer_.expires_from_now(offer_debounce_time_, ec); } if (ec) { VSOMEIP_ERROR<< "service_discovery_impl::start_offer_debounce_timer " "setting expiry time of timer failed: " << ec.message(); } offer_debounce_timer_.async_wait( std::bind(&service_discovery_impl::on_offer_debounce_timer_expired, this, std::placeholders::_1)); } void service_discovery_impl::start_find_debounce_timer(bool _first_start) { std::lock_guard its_lock(find_debounce_timer_mutex_); boost::system::error_code ec; if (_first_start) { find_debounce_timer_.expires_from_now(initial_delay_, ec); } else { find_debounce_timer_.expires_from_now(find_debounce_time_, ec); } if (ec) { VSOMEIP_ERROR<< "service_discovery_impl::start_find_debounce_timer " "setting expiry time of timer failed: " << ec.message(); } find_debounce_timer_.async_wait( std::bind( &service_discovery_impl::on_find_debounce_timer_expired, this, std::placeholders::_1)); } // initial delay void service_discovery_impl::on_find_debounce_timer_expired( const boost::system::error_code &_error) { if(_error) { // timer was canceled return; } // Only copy the accumulated requests of the initial wait phase // if the sent counter for the request is zero. requests_t repetition_phase_finds; bool new_finds(false); { std::lock_guard its_lock(requested_mutex_); for (const auto& its_service : requested_) { for (const auto& its_instance : its_service.second) { if( its_instance.second->get_sent_counter() == 0) { repetition_phase_finds[its_service.first][its_instance.first] = its_instance.second; } } } if (repetition_phase_finds.size()) { new_finds = true; } } if (!new_finds) { start_find_debounce_timer(false); return; } // Sent out finds for the first time as initial wait phase ended std::vector> its_messages; std::shared_ptr its_message( std::make_shared()); its_messages.push_back(its_message); // Serialize and send FindService (increments sent counter in requested_ map) insert_find_entries(its_messages, repetition_phase_finds); send(its_messages); std::chrono::milliseconds its_delay(repetitions_base_delay_); std::uint8_t its_repetitions(1); std::shared_ptr its_timer = std::make_shared< boost::asio::steady_timer>(host_->get_io()); { std::lock_guard its_lock(find_repetition_phase_timers_mutex_); find_repetition_phase_timers_[its_timer] = repetition_phase_finds; } boost::system::error_code ec; its_timer->expires_from_now(its_delay, ec); if (ec) { VSOMEIP_ERROR<< "service_discovery_impl::on_find_debounce_timer_expired " "setting expiry time of timer failed: " << ec.message(); } its_timer->async_wait( std::bind( &service_discovery_impl::on_find_repetition_phase_timer_expired, this, std::placeholders::_1, its_timer, its_repetitions, its_delay.count())); start_find_debounce_timer(false); } void service_discovery_impl::on_offer_debounce_timer_expired( const boost::system::error_code &_error) { if(_error) { // timer was canceled return; } // Copy the accumulated offers of the initial wait phase services_t repetition_phase_offers; bool new_offers(false); { std::vector non_someip_services; std::lock_guard its_lock(collected_offers_mutex_); if (collected_offers_.size()) { if (is_diagnosis_) { for (services_t::iterator its_service = collected_offers_.begin(); its_service != collected_offers_.end(); its_service++) { for (const auto& its_instance : its_service->second) { if (!configuration_->is_someip( its_service->first, its_instance.first)) { non_someip_services.push_back(its_service); } } } for (auto its_service : non_someip_services) { repetition_phase_offers.insert(*its_service); collected_offers_.erase(its_service); } } else { repetition_phase_offers = collected_offers_; collected_offers_.clear(); } new_offers = true; } } if (!new_offers) { start_offer_debounce_timer(false); return; } // Sent out offers for the first time as initial wait phase ended std::vector> its_messages; std::shared_ptr its_message(std::make_shared()); its_messages.push_back(its_message); insert_offer_entries(its_messages, repetition_phase_offers, true); // Serialize and send send(its_messages); std::chrono::milliseconds its_delay(0); std::uint8_t its_repetitions(0); if (repetitions_max_) { // Start timer for repetition phase the first time // with 2^0 * repetitions_base_delay its_delay = repetitions_base_delay_; its_repetitions = 1; } else { // If repetitions_max is set to zero repetition phase is skipped, // therefore wait one cyclic offer delay before entering main phase its_delay = cyclic_offer_delay_; its_repetitions = 0; } std::shared_ptr its_timer = std::make_shared< boost::asio::steady_timer>(host_->get_io()); { std::lock_guard its_lock(repetition_phase_timers_mutex_); repetition_phase_timers_[its_timer] = repetition_phase_offers; } boost::system::error_code ec; its_timer->expires_from_now(its_delay, ec); if (ec) { VSOMEIP_ERROR<< "service_discovery_impl::on_offer_debounce_timer_expired " "setting expiry time of timer failed: " << ec.message(); } its_timer->async_wait( std::bind( &service_discovery_impl::on_repetition_phase_timer_expired, this, std::placeholders::_1, its_timer, its_repetitions, its_delay.count())); start_offer_debounce_timer(false); } void service_discovery_impl::on_repetition_phase_timer_expired( const boost::system::error_code &_error, const std::shared_ptr& _timer, std::uint8_t _repetition, std::uint32_t _last_delay) { if (_error) { return; } if (_repetition == 0) { std::lock_guard its_lock(repetition_phase_timers_mutex_); // We waited one cyclic offer delay, the offers can now be sent in the // main phase and the timer can be deleted move_offers_into_main_phase(_timer); } else { std::lock_guard its_lock(repetition_phase_timers_mutex_); auto its_timer_pair = repetition_phase_timers_.find(_timer); if (its_timer_pair != repetition_phase_timers_.end()) { std::chrono::milliseconds new_delay(0); std::uint8_t repetition(0); bool move_to_main(false); if (_repetition <= repetitions_max_) { // Sent offers, double time to wait and start timer again. new_delay = std::chrono::milliseconds(_last_delay * 2); repetition = ++_repetition; } else { // Repetition phase is now over we have to sleep one cyclic // offer delay before it's allowed to sent the offer again. // If the last offer was sent shorter than half the // configured cyclic_offer_delay_ago the offers are directly // moved into the mainphase to avoid potentially sleeping twice // the cyclic offer delay before moving the offers in to main // phase if (last_offer_shorter_half_offer_delay_ago()) { move_to_main = true; } else { new_delay = cyclic_offer_delay_; repetition = 0; } } std::vector> its_messages; std::shared_ptr its_message( std::make_shared()); its_messages.push_back(its_message); insert_offer_entries(its_messages, its_timer_pair->second, true); // Serialize and send send(its_messages); if (move_to_main) { move_offers_into_main_phase(_timer); return; } boost::system::error_code ec; its_timer_pair->first->expires_from_now(new_delay, ec); if (ec) { VSOMEIP_ERROR << "service_discovery_impl::on_repetition_phase_timer_expired " "setting expiry time of timer failed: " << ec.message(); } its_timer_pair->first->async_wait( std::bind( &service_discovery_impl::on_repetition_phase_timer_expired, this, std::placeholders::_1, its_timer_pair->first, repetition, new_delay.count())); } } } void service_discovery_impl::on_find_repetition_phase_timer_expired( const boost::system::error_code &_error, const std::shared_ptr& _timer, std::uint8_t _repetition, std::uint32_t _last_delay) { if (_error) { return; } std::lock_guard its_lock(find_repetition_phase_timers_mutex_); auto its_timer_pair = find_repetition_phase_timers_.find(_timer); if (its_timer_pair != find_repetition_phase_timers_.end()) { std::chrono::milliseconds new_delay(0); std::uint8_t repetition(0); if (_repetition <= repetitions_max_) { // Sent findService entries in one message, double time to wait and start timer again. std::vector> its_messages; std::shared_ptr its_message( std::make_shared()); its_messages.push_back(its_message); insert_find_entries(its_messages, its_timer_pair->second); send(its_messages); new_delay = std::chrono::milliseconds(_last_delay * 2); repetition = ++_repetition; } else { // Repetition phase is now over, erase the timer on next expiry time find_repetition_phase_timers_.erase(its_timer_pair); return; } boost::system::error_code ec; its_timer_pair->first->expires_from_now(new_delay, ec); if (ec) { VSOMEIP_ERROR << __func__ << "setting expiry time of timer failed: " << ec.message(); } its_timer_pair->first->async_wait( std::bind( &service_discovery_impl::on_find_repetition_phase_timer_expired, this, std::placeholders::_1, its_timer_pair->first, repetition, new_delay.count())); } } void service_discovery_impl::move_offers_into_main_phase( const std::shared_ptr &_timer) { // HINT: make sure to lock the repetition_phase_timers_mutex_ before calling // this function set flag on all serviceinfos bound to this timer that they // will be included in the cyclic offers from now on const auto its_timer = repetition_phase_timers_.find(_timer); if (its_timer != repetition_phase_timers_.end()) { for (const auto& its_service : its_timer->second) { for (const auto& instance : its_service.second) { instance.second->set_is_in_mainphase(true); } } repetition_phase_timers_.erase(_timer); } } void service_discovery_impl::stop_offer_service( const std::shared_ptr &_info) { std::lock_guard its_lock(offer_mutex_); _info->set_ttl(0); const service_t its_service = _info->get_service(); const instance_t its_instance = _info->get_instance(); bool stop_offer_required(false); // Delete from initial phase offers { std::lock_guard its_lock(collected_offers_mutex_); if (collected_offers_.size()) { auto its_service_it = collected_offers_.find(its_service); if (its_service_it != collected_offers_.end()) { auto its_instance_it = its_service_it->second.find(its_instance); if (its_instance_it != its_service_it->second.end()) { if (its_instance_it->second == _info) { its_service_it->second.erase(its_instance_it); if (!collected_offers_[its_service].size()) { collected_offers_.erase(its_service_it); } } } } } // No need to sent out a stop offer message here as all services // instances contained in the collected offers weren't broadcasted yet } // Delete from repetition phase offers { std::lock_guard its_lock(repetition_phase_timers_mutex_); for (auto rpt = repetition_phase_timers_.begin(); rpt != repetition_phase_timers_.end();) { auto its_service_it = rpt->second.find(its_service); if (its_service_it != rpt->second.end()) { auto its_instance_it = its_service_it->second.find(its_instance); if (its_instance_it != its_service_it->second.end()) { if (its_instance_it->second == _info) { its_service_it->second.erase(its_instance_it); stop_offer_required = true; if (!rpt->second[its_service].size()) { rpt->second.erase(its_service); } } } } if (!rpt->second.size()) { rpt = repetition_phase_timers_.erase(rpt); } else { ++rpt; } } } // Sent stop offer if(_info->is_in_mainphase() || stop_offer_required) { send_stop_offer(_info); } // sent out NACKs for all pending subscriptions // TODO: remote_subscription_not_acknowledge_all(its_service, its_instance); } bool service_discovery_impl::send_stop_offer(const std::shared_ptr &_info) { if (_info->get_endpoint(false) || _info->get_endpoint(true)) { std::vector > its_messages; std::shared_ptr its_current_message( std::make_shared()); its_messages.push_back(its_current_message); insert_offer_service(its_messages, _info); // Serialize and send return send(its_messages); } return false; } void service_discovery_impl::start_main_phase_timer() { std::lock_guard its_lock(main_phase_timer_mutex_); boost::system::error_code ec; main_phase_timer_.expires_from_now(cyclic_offer_delay_); if (ec) { VSOMEIP_ERROR<< "service_discovery_impl::start_main_phase_timer " "setting expiry time of timer failed: " << ec.message(); } main_phase_timer_.async_wait( std::bind(&service_discovery_impl::on_main_phase_timer_expired, this, std::placeholders::_1)); } void service_discovery_impl::on_main_phase_timer_expired( const boost::system::error_code &_error) { if (_error) { return; } send(true); start_main_phase_timer(); } void service_discovery_impl::send_uni_or_multicast_offerservice( const std::shared_ptr &_info, bool _unicast_flag) { if (_unicast_flag) { // SID_SD_826 if (last_offer_shorter_half_offer_delay_ago()) { // SIP_SD_89 send_unicast_offer_service(_info); } else { // SIP_SD_90 send_multicast_offer_service(_info); } } else { // SID_SD_826 send_unicast_offer_service(_info); } } bool service_discovery_impl::last_offer_shorter_half_offer_delay_ago() { // Get remaining time to next offer since last offer std::chrono::milliseconds remaining(0); { std::lock_guard its_lock(main_phase_timer_mutex_); remaining = std::chrono::duration_cast( main_phase_timer_.expires_from_now()); } if (std::chrono::milliseconds(0) > remaining) { remaining = cyclic_offer_delay_; } const std::chrono::milliseconds half_cyclic_offer_delay = cyclic_offer_delay_ / 2; return remaining > half_cyclic_offer_delay; } bool service_discovery_impl::check_source_address( const boost::asio::ip::address &its_source_address) const { bool is_valid = true; // Check if source address is same as nodes unicast address if (unicast_ == its_source_address) { VSOMEIP_ERROR << "Source address of message is same as DUT's unicast address! : " << its_source_address.to_string(); is_valid = false; } return is_valid; } void service_discovery_impl::set_diagnosis_mode(const bool _activate) { is_diagnosis_ = _activate; } bool service_discovery_impl::get_diagnosis_mode() { return is_diagnosis_; } void service_discovery_impl::update_remote_subscription( const std::shared_ptr &_subscription) { if (!_subscription->is_pending() || 0 == _subscription->get_answers()) { std::shared_ptr its_ack; { std::lock_guard its_lock(pending_remote_subscriptions_mutex_); auto found_ack = pending_remote_subscriptions_.find(_subscription); if (found_ack != pending_remote_subscriptions_.end()) { its_ack = found_ack->second; } } if (its_ack) { std::unique_lock its_lock(its_ack->get_lock()); update_acknowledgement(its_ack); } } } void service_discovery_impl::update_acknowledgement( const std::shared_ptr &_acknowledgement) { if (_acknowledgement->is_complete() && !_acknowledgement->is_pending() && !_acknowledgement->is_done()) { send_subscription_ack(_acknowledgement); std::lock_guard its_lock(pending_remote_subscriptions_mutex_); for (const auto &its_subscription : _acknowledgement->get_subscriptions()) pending_remote_subscriptions_.erase(its_subscription); } } void service_discovery_impl::update_subscription_expiration_timer( const std::vector > &_messages) { std::lock_guard its_lock(subscription_expiration_timer_mutex_); const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); stop_subscription_expiration_timer_unlocked(); for (const auto &m : _messages) { for (const auto &e : m->get_entries()) { if (e && e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK && e->get_ttl()) { const std::chrono::steady_clock::time_point its_expiration = now + std::chrono::seconds(e->get_ttl() * get_ttl_factor( e->get_service(), e->get_instance(), ttl_factor_subscriptions_)); if (its_expiration < next_subscription_expiration_) { next_subscription_expiration_ = its_expiration; } } } } start_subscription_expiration_timer_unlocked(); } bool service_discovery_impl::check_stop_subscribe_subscribe( message_impl::entries_t::const_iterator _iter, message_impl::entries_t::const_iterator _end, const message_impl::options_t& _options) const { const message_impl::entries_t::const_iterator its_next = std::next(_iter); if ((*_iter)->get_ttl() > 0 || (*_iter)->get_type() != entry_type_e::STOP_SUBSCRIBE_EVENTGROUP || its_next == _end || (*its_next)->get_type() != entry_type_e::SUBSCRIBE_EVENTGROUP) { return false; } return (*static_cast(_iter->get())).matches( *(static_cast(its_next->get())), _options); } bool service_discovery_impl::has_opposite( message_impl::entries_t::const_iterator _iter, message_impl::entries_t::const_iterator _end, const message_impl::options_t &_options) const { const auto its_entry = std::dynamic_pointer_cast(*_iter); auto its_other = std::next(_iter); for (; its_other != _end; its_other++) { if ((*its_other)->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP) { const auto its_other_entry = std::dynamic_pointer_cast(*its_other); if ((its_entry->get_ttl() == 0 && its_other_entry->get_ttl() > 0) || (its_entry->get_ttl() > 0 && its_other_entry->get_ttl() == 0)) { if (its_entry->matches(*(its_other_entry.get()), _options)) return true; } } } return false; } bool service_discovery_impl::has_same( message_impl::entries_t::const_iterator _iter, message_impl::entries_t::const_iterator _end, const message_impl::options_t &_options) const { const auto its_entry = std::dynamic_pointer_cast(*_iter); auto its_other = std::next(_iter); for (; its_other != _end; its_other++) { if (its_entry->get_type() == (*its_other)->get_type()) { const auto its_other_entry = std::dynamic_pointer_cast(*its_other); if (its_entry->get_ttl() == its_other_entry->get_ttl() && its_entry->matches(*(its_other_entry.get()), _options)) { return true; } } } return false; } bool service_discovery_impl::is_subscribed( const std::shared_ptr &_entry, const message_impl::options_t &_options) const { const auto its_service = _entry->get_service(); const auto its_instance = _entry->get_instance(); auto its_info = host_->find_eventgroup( its_service, its_instance, _entry->get_eventgroup()); if (its_info) { std::shared_ptr its_reliable, its_unreliable; for (const auto& o : _options) { if (o->get_type() == option_type_e::IP4_ENDPOINT) { const auto its_endpoint_option = std::dynamic_pointer_cast(o); if (its_endpoint_option) { if (its_endpoint_option->get_layer_four_protocol() == layer_four_protocol_e::TCP) { its_reliable = endpoint_definition::get( boost::asio::ip::address_v4( its_endpoint_option->get_address()), its_endpoint_option->get_port(), true, its_service, its_instance); } else if (its_endpoint_option->get_layer_four_protocol() == layer_four_protocol_e::UDP) { its_unreliable = endpoint_definition::get( boost::asio::ip::address_v4( its_endpoint_option->get_address()), its_endpoint_option->get_port(), false, its_service, its_instance); } } } else if (o->get_type() == option_type_e::IP6_ENDPOINT) { const auto its_endpoint_option = std::dynamic_pointer_cast(o); if (its_endpoint_option->get_layer_four_protocol() == layer_four_protocol_e::TCP) { its_reliable = endpoint_definition::get( boost::asio::ip::address_v6( its_endpoint_option->get_address()), its_endpoint_option->get_port(), true, its_service, its_instance); } else if (its_endpoint_option->get_layer_four_protocol() == layer_four_protocol_e::UDP) { its_unreliable = endpoint_definition::get( boost::asio::ip::address_v6( its_endpoint_option->get_address()), its_endpoint_option->get_port(), false, its_service, its_instance); } } } if (its_reliable || its_unreliable) { for (const auto& its_subscription : its_info->get_remote_subscriptions()) { if ((!its_reliable || its_subscription->get_reliable() == its_reliable) && (!its_unreliable || its_subscription->get_unreliable() == its_unreliable)) { return true; } } } } return false; } configuration::ttl_factor_t service_discovery_impl::get_ttl_factor( service_t _service, instance_t _instance, const configuration::ttl_map_t& _ttl_map) const { configuration::ttl_factor_t its_ttl_factor(1); auto found_service = _ttl_map.find(_service); if (found_service != _ttl_map.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { its_ttl_factor = found_instance->second; } } return its_ttl_factor; } void service_discovery_impl::on_last_msg_received_timer_expired( const boost::system::error_code &_error) { if (!_error) { // We didn't receive a multicast message within 110% of the cyclic_offer_delay_ VSOMEIP_WARNING << "Didn't receive a multicast SD message for " << std::dec << last_msg_received_timer_timeout_.count() << "ms."; // Rejoin multicast group if (endpoint_ && !reliable_) { dynamic_cast( endpoint_.get())->join(sd_multicast_); } { boost::system::error_code ec; std::lock_guard its_lock(last_msg_received_timer_mutex_); last_msg_received_timer_.expires_from_now(last_msg_received_timer_timeout_, ec); last_msg_received_timer_.async_wait( std::bind( &service_discovery_impl::on_last_msg_received_timer_expired, shared_from_this(), std::placeholders::_1)); } } } void service_discovery_impl::stop_last_msg_received_timer() { std::lock_guard its_lock(last_msg_received_timer_mutex_); boost::system::error_code ec; last_msg_received_timer_.cancel(ec); } service_discovery_impl::remote_offer_type_e service_discovery_impl::get_remote_offer_type( service_t _service, instance_t _instance) const { std::lock_guard its_lock(remote_offer_types_mutex_); auto found_si = remote_offer_types_.find(std::make_pair(_service, _instance)); if (found_si != remote_offer_types_.end()) { return found_si->second; } return remote_offer_type_e::UNKNOWN; } service_discovery_impl::remote_offer_type_e service_discovery_impl::get_remote_offer_type( const std::shared_ptr &_subscription) const { bool has_reliable = (_subscription->get_endpoint(true) != nullptr); bool has_unreliable = (_subscription->get_endpoint(false) != nullptr); return (has_reliable ? (has_unreliable ? remote_offer_type_e::RELIABLE_UNRELIABLE : remote_offer_type_e::RELIABLE) : (has_unreliable ? remote_offer_type_e::UNRELIABLE : remote_offer_type_e::UNKNOWN)); } bool service_discovery_impl::update_remote_offer_type( service_t _service, instance_t _instance, remote_offer_type_e _offer_type, const boost::asio::ip::address &_reliable_address, std::uint16_t _reliable_port, const boost::asio::ip::address &_unreliable_address, std::uint16_t _unreliable_port) { bool ret(false); std::lock_guard its_lock(remote_offer_types_mutex_); const std::pair its_si_pair = std::make_pair(_service, _instance); auto found_si = remote_offer_types_.find(its_si_pair); if (found_si != remote_offer_types_.end()) { if (found_si->second != _offer_type ) { found_si->second = _offer_type; ret = true; } } else { remote_offer_types_[its_si_pair] = _offer_type; } switch (_offer_type) { case remote_offer_type_e::UNRELIABLE: remote_offers_by_ip_[_unreliable_address][std::make_pair(false, _unreliable_port)].insert(its_si_pair); break; case remote_offer_type_e::RELIABLE: remote_offers_by_ip_[_reliable_address][std::make_pair(true, _reliable_port)].insert(its_si_pair); break; case remote_offer_type_e::RELIABLE_UNRELIABLE: remote_offers_by_ip_[_unreliable_address][std::make_pair(false, _unreliable_port)].insert(its_si_pair); remote_offers_by_ip_[_unreliable_address][std::make_pair(true, _reliable_port)].insert(its_si_pair); break; case remote_offer_type_e::UNKNOWN: default: VSOMEIP_WARNING << __func__ << ": unknown offer type [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "]" << _offer_type; break; } return ret; } void service_discovery_impl::remove_remote_offer_type( service_t _service, instance_t _instance, const boost::asio::ip::address &_reliable_address, std::uint16_t _reliable_port, const boost::asio::ip::address &_unreliable_address, std::uint16_t _unreliable_port) { std::lock_guard its_lock(remote_offer_types_mutex_); const std::pair its_si_pair = std::make_pair(_service, _instance); remote_offer_types_.erase(its_si_pair); auto delete_from_remote_offers_by_ip = [&]( const boost::asio::ip::address& _address, std::uint16_t _port, bool _reliable) { const auto found_address = remote_offers_by_ip_.find(_address); if (found_address != remote_offers_by_ip_.end()) { auto found_port = found_address->second.find( std::make_pair(_reliable, _port)); if (found_port != found_address->second.end()) { if (found_port->second.erase(std::make_pair(_service, _instance))) { if (found_port->second.empty()) { found_address->second.erase(found_port); if (found_address->second.empty()) { remote_offers_by_ip_.erase(found_address); } } } } } }; if (_reliable_port != ILLEGAL_PORT) { delete_from_remote_offers_by_ip(_reliable_address, _reliable_port, true); } if (_unreliable_port != ILLEGAL_PORT) { delete_from_remote_offers_by_ip(_unreliable_address, _unreliable_port, false); } } void service_discovery_impl::remove_remote_offer_type_by_ip( const boost::asio::ip::address &_address) { remove_remote_offer_type_by_ip(_address, ANY_PORT, false); } void service_discovery_impl::remove_remote_offer_type_by_ip( const boost::asio::ip::address &_address, std::uint16_t _port, bool _reliable) { std::lock_guard its_lock(remote_offer_types_mutex_); const auto found_address = remote_offers_by_ip_.find(_address); if (found_address != remote_offers_by_ip_.end()) { if (_port == ANY_PORT) { for (const auto& port : found_address->second) { for (const auto& si : port.second) { remote_offer_types_.erase(si); } } remote_offers_by_ip_.erase(_address); } else { const auto its_port_reliability = std::make_pair(_reliable, _port); const auto found_port = found_address->second.find(its_port_reliability); if (found_port != found_address->second.end()) { for (const auto& si : found_port->second) { remote_offer_types_.erase(si); } found_address->second.erase(found_port); if (found_address->second.empty()) { remote_offers_by_ip_.erase(found_address); } } } } } std::shared_ptr service_discovery_impl::create_subscription( major_version_t _major, ttl_t _ttl, const std::shared_ptr &_reliable, const std::shared_ptr &_unreliable, const std::shared_ptr &_info) { auto its_subscription = std::make_shared(); its_subscription->set_major(_major); its_subscription->set_ttl(_ttl); if (_reliable) { its_subscription->set_endpoint(_reliable, true); its_subscription->set_tcp_connection_established(_reliable->is_established()); } if (_unreliable) { its_subscription->set_endpoint(_unreliable, false); its_subscription->set_udp_connection_established(_unreliable->is_established()); } // check whether the eventgroup is selective its_subscription->set_selective(_info->is_selective()); its_subscription->set_eventgroupinfo(_info); return its_subscription; } void service_discovery_impl::send_subscription_ack( const std::shared_ptr &_acknowledgement) { if (_acknowledgement->is_done()) return; _acknowledgement->done(); std::uint32_t its_max_answers(1); // Must be 1 as "_acknowledgement" not // necessarily contains subscriptions bool do_not_answer(false); std::shared_ptr its_parent; // Find highest number of necessary answers for (const auto& its_subscription : _acknowledgement->get_subscriptions()) { auto its_answers = its_subscription->get_answers(); if (its_answers > its_max_answers) { its_max_answers = its_answers; } else if (its_answers == 0) { do_not_answer = true; its_parent = its_subscription->get_parent(); } } if (do_not_answer) { if (its_parent) { std::lock_guard its_lock(pending_remote_subscriptions_mutex_); auto its_parent_ack = pending_remote_subscriptions_[its_parent]; if (its_parent_ack) { for (const auto &its_subscription : its_parent_ack->get_subscriptions()) { if (its_subscription != its_parent) its_subscription->set_answers(its_subscription->get_answers() + 1); } } } return; } // send messages for (std::uint32_t i = 0; i < its_max_answers; i++) { for (const auto &its_subscription : _acknowledgement->get_subscriptions()) { if (i < its_subscription->get_answers()) { if (its_subscription->get_ttl() > 0) { auto its_info = its_subscription->get_eventgroupinfo(); if (its_info) { std::set its_acked; std::set its_nacked; for (const auto& its_client : its_subscription->get_clients()) { if (its_subscription->get_client_state(its_client) == remote_subscription_state_e::SUBSCRIPTION_ACKED) { its_acked.insert(its_client); } else { its_nacked.insert(its_client); } } if (0 < its_acked.size()) { insert_subscription_ack(_acknowledgement, its_info, its_subscription->get_ttl(), its_subscription->get_subscriber(), its_acked); } if (0 < its_nacked.size()) { insert_subscription_ack(_acknowledgement, its_info, 0, its_subscription->get_subscriber(), its_nacked); } } } } } auto its_messages = _acknowledgement->get_messages(); serialize_and_send(its_messages, _acknowledgement->get_target_address()); update_subscription_expiration_timer(its_messages); } std::this_thread::yield(); // We might need to send initial events for (const auto &its_subscription : _acknowledgement->get_subscriptions()) { // Assumption: We do _NOT_ need to check whether this is a child // subscription, as this only applies to selective events, which // are owned by exclusive event groups. if (its_subscription->get_ttl() > 0 && its_subscription->is_initial()) { its_subscription->set_initial(false); auto its_info = its_subscription->get_eventgroupinfo(); if (its_info) { its_info->send_initial_events( its_subscription->get_reliable(), its_subscription->get_unreliable()); } } } } void service_discovery_impl::add_entry_data( std::vector > &_messages, const entry_data_t &_data) { auto its_current_message = _messages.back(); const auto is_fitting = its_current_message->add_entry_data( _data.entry_, _data.options_, _data.other_); if (!is_fitting) { its_current_message = std::make_shared(); (void)its_current_message->add_entry_data( _data.entry_, _data.options_, _data.other_); _messages.push_back(its_current_message); } } void service_discovery_impl::add_entry_data_to_remote_subscription_ack_msg( const std::shared_ptr& _acknowledgement, const entry_data_t &_data) { auto its_current_message = _acknowledgement->get_current_message(); const auto is_fitting = its_current_message->add_entry_data( _data.entry_, _data.options_, _data.other_); if (!is_fitting) { its_current_message = _acknowledgement->add_message(); (void)its_current_message->add_entry_data( _data.entry_, _data.options_, _data.other_); } } void service_discovery_impl::register_sd_acceptance_handler( sd_acceptance_handler_t _handler) { sd_acceptance_handler_ = _handler; } void service_discovery_impl::register_reboot_notification_handler( reboot_notification_handler_t _handler) { reboot_notification_handler_ = _handler; } reliability_type_e service_discovery_impl::get_eventgroup_reliability( service_t _service, instance_t _instance, eventgroup_t _eventgroup, const std::shared_ptr& _subscription) { reliability_type_e its_reliability = reliability_type_e::RT_UNKNOWN; auto its_info = _subscription->get_eventgroupinfo().lock(); if (its_info) { its_reliability = its_info->get_reliability(); if (its_reliability == reliability_type_e::RT_UNKNOWN && its_info->is_reliability_auto_mode()) { // fallback: determine how service is offered // and update reliability type of eventgroup switch (get_remote_offer_type(_service, _instance)) { case remote_offer_type_e::RELIABLE: its_reliability = reliability_type_e::RT_RELIABLE; break; case remote_offer_type_e::UNRELIABLE: its_reliability = reliability_type_e::RT_UNRELIABLE; break; case remote_offer_type_e::RELIABLE_UNRELIABLE: its_reliability = reliability_type_e::RT_BOTH; break; default: ; } VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't determine eventgroup reliability type for [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" << " using reliability type: " << std::hex << std::setw(4) << std::setfill('0') << (uint16_t) its_reliability; its_info->set_reliability(its_reliability); } } else { VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't lock eventgroupinfo [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "; auto its_eg_info = host_->find_eventgroup(_service, _instance, _eventgroup); if (its_eg_info) { _subscription->set_eventgroupinfo(its_eg_info); its_reliability = its_eg_info->get_reliability(); } } if (its_reliability == reliability_type_e::RT_UNKNOWN) { VSOMEIP_WARNING << "sd::" << __func__ << ": eventgroup reliability type is unknown [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } return its_reliability; } } // namespace sd } // namespace vsomeip_v3