diff options
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r-- | implementation/service_discovery/src/service_discovery_impl.cpp | 676 |
1 files changed, 477 insertions, 199 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index e5c4a32..7ae5395 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -35,6 +35,7 @@ #include "../../endpoints/include/udp_server_endpoint_impl.hpp" #include "../../message/include/serializer.hpp" #include "../../plugin/include/plugin_manager_impl.hpp" +#include "../../routing/include/event.hpp" #include "../../routing/include/eventgroupinfo.hpp" #include "../../routing/include/serviceinfo.hpp" #include "../../utility/include/byteorder.hpp" @@ -73,7 +74,7 @@ service_discovery_impl::service_discovery_impl( last_msg_received_timer_(_host->get_io()), last_msg_received_timer_timeout_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY + (VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY / 10)) { - // TODO: cleanup start condition! + next_subscription_expiration_ = std::chrono::steady_clock::now() + std::chrono::hours(24); } @@ -234,31 +235,45 @@ void service_discovery_impl::subscribe( service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, - ttl_t _ttl, client_t _client) { - { - std::lock_guard<std::mutex> its_lock(subscribed_mutex_); - auto found_service = subscribed_.find(_service); - if (found_service != subscribed_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_eventgroup = found_instance->second.find(_eventgroup); - if (found_eventgroup != found_instance->second.end()) { - auto its_subscription = found_eventgroup->second; - if (its_subscription->get_major() != _major) { - VSOMEIP_ERROR - << "Subscriptions to different versions of the same " - "service instance are not supported!"; - } else if (its_subscription->is_selective()) { - if (its_subscription->add_client(_client)) { - its_subscription->set_state(_client, - subscription_state_e::ST_NOT_ACKNOWLEDGED); - send_subscription(its_subscription, - _service, _instance, _eventgroup, - _client); + ttl_t _ttl, client_t _client, + const std::shared_ptr<eventgroupinfo> &_info) { +#ifdef VSOMEIP_ENABLE_COMPAT + bool is_selective(_info ? _info->is_selective() : false); +#endif // VSOMEIP_ENABLE_COMPAT + + std::lock_guard<std::mutex> its_lock(subscribed_mutex_); + auto found_service = subscribed_.find(_service); + if (found_service != subscribed_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto found_eventgroup = found_instance->second.find(_eventgroup); + if (found_eventgroup != found_instance->second.end()) { + auto its_subscription = found_eventgroup->second; +#ifdef VSOMEIP_ENABLE_COMPAT + if (!its_subscription->is_selective() && is_selective) { + its_subscription->set_selective(true); + its_subscription->remove_client(VSOMEIP_ROUTING_CLIENT); + for (const auto e : _info->get_events()) { + for (const auto c : e->get_subscribers(_eventgroup)) { + its_subscription->add_client(c); } } - return; } +#endif // VSOMEIP_ENABLE_COMPAT + if (its_subscription->get_major() != _major) { + VSOMEIP_ERROR + << "Subscriptions to different versions of the same " + "service instance are not supported!"; + } else if (its_subscription->is_selective()) { + if (its_subscription->add_client(_client)) { + its_subscription->set_state(_client, + subscription_state_e::ST_NOT_ACKNOWLEDGED); + send_subscription(its_subscription, + _service, _instance, _eventgroup, + _client); + } + } + return; } } } @@ -267,31 +282,26 @@ service_discovery_impl::subscribe( get_subscription_endpoints(_service, _instance, its_reliable, its_unreliable); - { - std::lock_guard<std::mutex> its_lock(subscribed_mutex_); + // New subscription + std::shared_ptr<subscription> its_subscription + = create_subscription( + _major, _ttl, its_reliable, its_unreliable, _info); - // New subscription - std::shared_ptr<subscription> its_subscription - = create_subscription( - _service, _instance, _eventgroup, _major, - _ttl, its_reliable, its_unreliable); - - if (!its_subscription) { - VSOMEIP_ERROR << __func__ - << ": creating subscription failed!"; - return; - } + if (!its_subscription) { + VSOMEIP_ERROR << __func__ + << ": creating subscription failed!"; + return; + } - subscribed_[_service][_instance][_eventgroup] = its_subscription; + subscribed_[_service][_instance][_eventgroup] = its_subscription; - its_subscription->add_client(_client); - its_subscription->set_state(_client, - subscription_state_e::ST_NOT_ACKNOWLEDGED); + its_subscription->add_client(_client); + its_subscription->set_state(_client, + subscription_state_e::ST_NOT_ACKNOWLEDGED); - send_subscription(its_subscription, - _service, _instance, _eventgroup, - _client); - } + send_subscription(its_subscription, + _service, _instance, _eventgroup, + _client); } void @@ -300,38 +310,36 @@ service_discovery_impl::send_subscription( const service_t _service, const instance_t _instance, const eventgroup_t _eventgroup, const client_t _client) { + (void)_client; + auto its_reliable = _subscription->get_endpoint(true); auto its_unreliable = _subscription->get_endpoint(false); boost::asio::ip::address its_address; get_subscription_address(its_reliable, its_unreliable, its_address); - if (!its_address.is_unspecified()) { entry_data_t its_data; - - const remote_offer_type_e its_offer_type - = get_remote_offer_type(_service, _instance); - if (its_offer_type == remote_offer_type_e::UNRELIABLE && - !its_reliable && its_unreliable) { + const reliability_type_e its_reliability_type = + get_eventgroup_reliability(_service, _instance, _eventgroup, _subscription); + if (its_reliability_type == reliability_type_e::RT_UNRELIABLE && its_unreliable) { if (its_unreliable->is_established()) { its_data = create_eventgroup_entry(_service, _instance, - _eventgroup, _subscription, its_offer_type); + _eventgroup, _subscription, its_reliability_type); } else { _subscription->set_udp_connection_established(false); } - } else if (its_offer_type == remote_offer_type_e::RELIABLE && - its_reliable && !its_unreliable) { + } else if (its_reliability_type == reliability_type_e::RT_RELIABLE && its_reliable) { if (its_reliable->is_established()) { its_data = create_eventgroup_entry(_service, _instance, - _eventgroup, _subscription, its_offer_type); + _eventgroup, _subscription, its_reliability_type); } else { _subscription->set_tcp_connection_established(false); } - } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE && + } else if (its_reliability_type == reliability_type_e::RT_BOTH && its_reliable && its_unreliable) { if (its_reliable->is_established() && its_unreliable->is_established()) { its_data = create_eventgroup_entry(_service, _instance, - _eventgroup, _subscription, its_offer_type); + _eventgroup, _subscription, its_reliability_type); } else { if (!its_reliable->is_established()) { _subscription->set_tcp_connection_established(false); @@ -340,16 +348,14 @@ service_discovery_impl::send_subscription( _subscription->set_udp_connection_established(false); } } + } else if (its_reliability_type == reliability_type_e::RT_UNKNOWN) { + VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't determine reliability type for subscription to [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "; } if (its_data.entry_) { - if (_subscription->is_selective()) { - auto its_selective_option = std::make_shared<selective_option_impl>(); - (void)its_selective_option->add_client(_client); - - its_data.options_.push_back(its_selective_option); - } - // TODO: Implement a simple path, that sends a single message auto its_current_message = std::make_shared<message_impl>(); std::vector<std::shared_ptr<message_impl> > its_messages; @@ -368,9 +374,9 @@ service_discovery_impl::get_subscription_endpoints( std::shared_ptr<endpoint> &_reliable, std::shared_ptr<endpoint> &_unreliable) const { _unreliable = host_->find_or_create_remote_client( - _service, _instance, false, VSOMEIP_ROUTING_CLIENT); + _service, _instance, false); _reliable = host_->find_or_create_remote_client( - _service, _instance, true, VSOMEIP_ROUTING_CLIENT); + _service, _instance, true); } void @@ -423,28 +429,37 @@ service_discovery_impl::unsubscribe(service_t _service, if (!its_subscription->has_client()) { its_subscription->set_ttl(0); } else if (its_subscription->is_selective()) { - auto its_major = its_subscription->get_major(); - // create a dummy subscription object to unsubscribe // the single client. + auto its_major = its_subscription->get_major(); + its_subscription = std::make_shared<subscription>(); its_subscription->set_major(its_major); its_subscription->set_ttl(0); its_subscription->set_selective(true); - its_subscription->add_client(_client); its_subscription->set_endpoint(its_reliable, true); its_subscription->set_endpoint(its_unreliable, false); } } - const remote_offer_type_e its_offer_type - = get_remote_offer_type(its_subscription); + // For selective subscriptions, the client must be added again + // to generate the selective option + if (its_subscription->is_selective()) + its_subscription->add_client(_client); + const reliability_type_e its_reliability_type = + get_eventgroup_reliability(_service, _instance, _eventgroup, its_subscription); auto its_data = create_eventgroup_entry(_service, _instance, - _eventgroup, its_subscription, its_offer_type); - if (its_data.entry_) { + _eventgroup, its_subscription, its_reliability_type); + if (its_data.entry_) its_current_message->add_entry_data(its_data.entry_, its_data.options_); - } + + // Remove it again before updating (only impacts last unsubscribe) + if (its_subscription->is_selective()) + (void)its_subscription->remove_client(_client); + + // Ensure to update the "real" subscription + its_subscription = found_eventgroup->second; // Finally update the subscriptions if (!its_subscription->has_client()) { @@ -470,8 +485,6 @@ service_discovery_impl::unsubscribe_all( auto its_current_message = std::make_shared<message_impl>();; boost::asio::ip::address its_address; - const remote_offer_type_e its_offer_type - = get_remote_offer_type(_service, _instance); { std::lock_guard<std::mutex> its_lock(subscribed_mutex_); @@ -482,8 +495,13 @@ service_discovery_impl::unsubscribe_all( for (auto &its_eventgroup : found_instance->second) { auto its_subscription = its_eventgroup.second; its_subscription->set_ttl(0); + + const reliability_type_e its_reliability = + get_eventgroup_reliability(_service, _instance, + its_eventgroup.first, its_subscription); + auto its_data = create_eventgroup_entry(_service, _instance, - its_eventgroup.first, its_subscription, its_offer_type); + its_eventgroup.first, its_subscription, its_reliability); auto its_reliable = its_subscription->get_endpoint(true); auto its_unreliable = its_subscription->get_endpoint(false); get_subscription_address( @@ -672,7 +690,7 @@ entry_data_t service_discovery_impl::create_eventgroup_entry( service_t _service, instance_t _instance, eventgroup_t _eventgroup, const std::shared_ptr<subscription> &_subscription, - remote_offer_type_e _offer_type) { + reliability_type_e _reliability_type) { entry_data_t its_data; its_data.entry_ = nullptr; @@ -683,18 +701,18 @@ service_discovery_impl::create_eventgroup_entry( bool insert_reliable(false); bool insert_unreliable(false); - switch (_offer_type) { - case remote_offer_type_e::RELIABLE: + switch (_reliability_type) { + case reliability_type_e::RT_RELIABLE: if (its_reliable_endpoint) { insert_reliable = true; } break; - case remote_offer_type_e::UNRELIABLE: + case reliability_type_e::RT_UNRELIABLE: if (its_unreliable_endpoint) { insert_unreliable = true; } break; - case remote_offer_type_e::RELIABLE_UNRELIABLE: + case reliability_type_e::RT_BOTH: if (its_reliable_endpoint && its_unreliable_endpoint) { insert_reliable = true; insert_unreliable = true; @@ -705,13 +723,13 @@ service_discovery_impl::create_eventgroup_entry( } if (!insert_reliable && !insert_unreliable - && _offer_type != remote_offer_type_e::UNKNOWN) { + && _reliability_type != reliability_type_e::RT_UNKNOWN) { VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " - "subscription doesn't match offer type: [" + "subscription doesn't match reliability type: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] " - << _offer_type; + << (uint16_t) _reliability_type; return its_data; } std::shared_ptr<eventgroupentry_impl> its_entry, its_other; @@ -753,11 +771,13 @@ service_discovery_impl::create_eventgroup_entry( auto its_option = create_ip_option(unicast_, its_port, true); its_data.options_.push_back(its_option); } else { - VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " "local reliable port is zero: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + its_data.entry_ = nullptr; + its_data.other_ = nullptr; return its_data; } } @@ -805,21 +825,20 @@ service_discovery_impl::create_eventgroup_entry( auto its_option = create_ip_option(unicast_, its_port, false); its_data.options_.push_back(its_option); } else { - VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " " local unreliable port is zero: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + its_data.entry_ = nullptr; + its_data.other_ = nullptr; return its_data; } } - if (its_entry - &&_subscription->is_selective()) { + if (its_entry &&_subscription->is_selective()) { auto its_selective_option = std::make_shared<selective_option_impl>(); - for (const auto &its_client : _subscription->get_clients()) - (void)its_selective_option->add_client(its_client); - + its_selective_option->set_clients(_subscription->get_clients()); its_data.options_.push_back(its_selective_option); } @@ -836,7 +855,7 @@ service_discovery_impl::insert_subscription_ack( const std::shared_ptr<remote_subscription_ack>& _acknowledgement, const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, const std::shared_ptr<endpoint_definition> &_target, - const client_t _client) { + const std::set<client_t> &_clients) { std::unique_lock<std::recursive_mutex> its_lock(_acknowledgement->get_lock()); auto its_message = _acknowledgement->get_current_message(); @@ -871,11 +890,10 @@ service_discovery_impl::insert_subscription_ack( } } - if (_client != VSOMEIP_ROUTING_CLIENT) { + if (_clients.size() > 1 || (*(_clients.begin())) != 0) { auto its_selective_option = its_eventgroup_entry->get_selective_option(); - if (its_selective_option) { - its_selective_option->add_client(_client); - } + if (its_selective_option) + its_selective_option->set_clients(_clients); } return; @@ -911,10 +929,11 @@ service_discovery_impl::insert_subscription_ack( its_data.options_.push_back(its_option); } } + // Selective - if (_client != VSOMEIP_ROUTING_CLIENT) { + if (_clients.size() > 1 || (*(_clients.begin())) != 0) { auto its_selective_option = std::make_shared<selective_option_impl>(); - (void)its_selective_option->add_client(_client); + (void)its_selective_option->set_clients(_clients); its_data.options_.push_back(its_selective_option); } @@ -1031,43 +1050,30 @@ service_discovery_impl::on_message( bool force_initial_events(false); bool sd_acceptance_queried(false); - bool accept_offers(false); - bool expired_services(false); + expired_ports_t expired_ports; + sd_acceptance_state_t accept_state(expired_ports); for (auto iter = its_entries.begin(); iter != its_end; iter++) { if (!sd_acceptance_queried) { + sd_acceptance_queried = true; if (sd_acceptance_handler_) { + accept_state.sd_acceptance_required_ + = configuration_->is_protected_device(_sender); remote_info_t remote; - remote.port_ = ANY_PORT; remote.first_ = ANY_PORT; remote.last_ = ANY_PORT; remote.is_range_ = false; - if (configuration_->sd_acceptance_required(_sender, ANY_PORT)) { - if (_sender.is_v4()) { - remote.ip_.address_.v4_ = _sender.to_v4().to_bytes(); - remote.ip_.is_v4_ = true; - } else { - remote.ip_.address_.v6_ = _sender.to_v6().to_bytes(); - remote.ip_.is_v4_ = false; - } - accept_offers = sd_acceptance_handler_(remote); - if (!accept_offers && !expired_services) { - VSOMEIP_WARNING << "service_discovery_impl::" << __func__ - << ": Do not accept offer / subscribe from " - << std::hex << std::setw(4) << std::setfill('0') - << _sender.to_string(); - remove_remote_offer_type_by_ip(_sender); - host_->expire_subscriptions(_sender); - host_->expire_services(_sender); - expired_services = true; - } + if (_sender.is_v4()) { + remote.ip_.address_.v4_ = _sender.to_v4().to_bytes(); + remote.ip_.is_v4_ = true; } else { - accept_offers = true; + remote.ip_.address_.v6_ = _sender.to_v6().to_bytes(); + remote.ip_.is_v4_ = false; } + accept_state.accept_entries_ = sd_acceptance_handler_(remote); } else { - accept_offers = true; + accept_state.accept_entries_ = true; } - sd_acceptance_queried = true; } if ((*iter)->is_service_entry()) { std::shared_ptr<serviceentry_impl> its_service_entry @@ -1075,8 +1081,8 @@ service_discovery_impl::on_message( bool its_unicast_flag = its_message->get_unicast_flag(); process_serviceentry(its_service_entry, its_options, its_unicast_flag, its_resubscribes, - received_via_mcast, accept_offers); - } else if (accept_offers) { + received_via_mcast, accept_state); + } else { std::shared_ptr<eventgroupentry_impl> its_eventgroup_entry = std::dynamic_pointer_cast<eventgroupentry_impl>(*iter); @@ -1095,7 +1101,8 @@ service_discovery_impl::on_message( check_stop_subscribe_subscribe(iter, its_end, its_options); process_eventgroupentry(its_eventgroup_entry, its_options, its_acknowledgement, _destination, - is_stop_subscribe_subscribe, force_initial_events); + is_stop_subscribe_subscribe, force_initial_events, + accept_state); } } @@ -1139,7 +1146,8 @@ service_discovery_impl::process_serviceentry( const std::vector<std::shared_ptr<option_impl> > &_options, bool _unicast_flag, std::vector<std::shared_ptr<message_impl> > &_resubscribes, - bool _received_via_mcast, bool _accept_offers) { + bool _received_via_mcast, + const sd_acceptance_state_t& _sd_ac_state) { // Read service info from entry entry_type_e its_type = _entry->get_type(); @@ -1221,20 +1229,17 @@ service_discovery_impl::process_serviceentry( its_major, its_minor, _unicast_flag); break; case entry_type_e::OFFER_SERVICE: - if (_accept_offers) { process_offerservice_serviceentry(its_service, its_instance, its_major, its_minor, its_ttl, its_reliable_address, its_reliable_port, its_unreliable_address, its_unreliable_port, _resubscribes, - _received_via_mcast); - } + _received_via_mcast, _sd_ac_state); break; case entry_type_e::UNKNOWN: default: VSOMEIP_ERROR << __func__ << ": Unsupported service entry type"; } - - } else if (_accept_offers) { + } else if (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_) { std::shared_ptr<request> its_request = find_request(its_service, its_instance); if (its_request) { std::lock_guard<std::mutex> its_lock(requested_mutex_); @@ -1242,8 +1247,8 @@ service_discovery_impl::process_serviceentry( its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); } remove_remote_offer_type(its_service, its_instance, - (its_reliable_port != ILLEGAL_PORT ? - its_reliable_address : its_unreliable_address)); + its_reliable_address, its_reliable_port, + its_unreliable_address, its_unreliable_port); unsubscribe_all(its_service, its_instance); if (!is_diagnosis_ && !is_suspended_) { host_->del_routing_info(its_service, its_instance, @@ -1262,7 +1267,7 @@ service_discovery_impl::process_offerservice_serviceentry( const boost::asio::ip::address &_unreliable_address, uint16_t _unreliable_port, std::vector<std::shared_ptr<message_impl> > &_resubscribes, - bool _received_via_mcast) { + bool _received_via_mcast, const sd_acceptance_state_t& _sd_ac_state) { std::shared_ptr < runtime > its_runtime = runtime_.lock(); if (!its_runtime) return; @@ -1290,11 +1295,106 @@ service_discovery_impl::process_offerservice_serviceentry( << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; } - if (update_remote_offer_type(_service,_instance, offer_type, - _reliable_address, _unreliable_address)) { + if (_sd_ac_state.sd_acceptance_required_) { + + auto expire_subscriptions_and_services = + [this, &_sd_ac_state](const boost::asio::ip::address& _address, + std::uint16_t _port, bool _reliable) { + const auto its_port_pair = std::make_pair(_reliable, _port); + if (_sd_ac_state.expired_ports_.find(its_port_pair) == + _sd_ac_state.expired_ports_.end()) { + VSOMEIP_WARNING << "service_discovery_impl::" << __func__ + << ": Do not accept offer from " + << _address.to_string() << ":" + << std::dec << _port << " reliable=" << _reliable; + remove_remote_offer_type_by_ip(_address, _port, _reliable); + host_->expire_subscriptions(_address, _port, _reliable); + host_->expire_services(_address, _port, _reliable); + _sd_ac_state.expired_ports_.insert(its_port_pair); + } + }; + + // return if the registered sd_acceptance handler returned false + // and for the provided port sd_acceptance is necessary + switch (offer_type) { + case remote_offer_type_e::UNRELIABLE: + if (!_sd_ac_state.accept_entries_ + && configuration_->is_protected_port( + _unreliable_address, _unreliable_port, false)) { + expire_subscriptions_and_services(_unreliable_address, + _unreliable_port, false); + return; + } + break; + case remote_offer_type_e::RELIABLE: + if (!_sd_ac_state.accept_entries_ + && configuration_->is_protected_port( + _reliable_address, _reliable_port, true)) { + expire_subscriptions_and_services(_reliable_address, + _reliable_port, true); + return; + } + break; + case remote_offer_type_e::RELIABLE_UNRELIABLE: + if (!_sd_ac_state.accept_entries_ + && (configuration_->is_protected_port( + _unreliable_address, _unreliable_port, false) + || configuration_->is_protected_port( + _reliable_address, _reliable_port, true))) { + expire_subscriptions_and_services(_unreliable_address, + _unreliable_port, false); + expire_subscriptions_and_services(_reliable_address, + _reliable_port, true); + return; + } + break; + case remote_offer_type_e::UNKNOWN: + default: + break; + } + } + + if (update_remote_offer_type(_service, _instance, offer_type, + _reliable_address, _reliable_port, + _unreliable_address, _unreliable_port)) { VSOMEIP_WARNING << __func__ << ": Remote offer type changed [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; + + // Only update eventgroup reliability type if it was initially unknown + auto its_eventgroups = host_->get_subscribed_eventgroups(_service, _instance); + for (auto eg : its_eventgroups) { + auto its_info = host_->find_eventgroup( + _service, _instance, eg); + if (its_info) { + if (its_info->is_reliability_auto_mode()) { + reliability_type_e its_reliability(reliability_type_e::RT_UNKNOWN); + switch (offer_type) { + case remote_offer_type_e::RELIABLE: + its_reliability = reliability_type_e::RT_RELIABLE; + break; + case remote_offer_type_e::UNRELIABLE: + its_reliability = reliability_type_e::RT_UNRELIABLE; + break; + case remote_offer_type_e::RELIABLE_UNRELIABLE: + its_reliability = reliability_type_e::RT_BOTH; + break; + default: + ; + } + if (its_reliability != reliability_type_e::RT_UNKNOWN + && its_reliability != its_info->get_reliability()) { + VSOMEIP_WARNING << "sd::" << __func__ << ": eventgroup reliability type changed [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << eg << "]" + << " using reliability type: " + << std::hex << std::setw(4) << std::setfill('0') << (uint16_t) its_reliability; + its_info->set_reliability(its_reliability); + } + } + } + } } host_->add_routing_info(_service, _instance, @@ -1314,8 +1414,6 @@ service_discovery_impl::process_offerservice_serviceentry( auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { if (0 < found_instance->second.size()) { - const remote_offer_type_e its_offer_type = - get_remote_offer_type(_service, _instance); for (const auto& its_eventgroup : found_instance->second) { auto its_subscription = its_eventgroup.second; std::shared_ptr<endpoint> its_reliable, its_unreliable; @@ -1333,8 +1431,12 @@ service_discovery_impl::process_offerservice_serviceentry( subscription_state_e::ST_RESUBSCRIBING_NOT_ACKNOWLEDGED); } } + const reliability_type_e its_reliability = + get_eventgroup_reliability(_service, _instance, + its_eventgroup.first, its_subscription); + auto its_data = create_eventgroup_entry(_service, _instance, - its_eventgroup.first, its_subscription, its_offer_type); + its_eventgroup.first, its_subscription, its_reliability); if (its_data.entry_) { add_entry_data(_resubscribes, its_data); } @@ -1443,8 +1545,6 @@ service_discovery_impl::on_endpoint_connected( auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { if (0 < found_instance->second.size()) { - const remote_offer_type_e its_offer_type = - get_remote_offer_type(_service, _instance); for (const auto &its_eventgroup : found_instance->second) { std::shared_ptr<subscription> its_subscription(its_eventgroup.second); if (its_subscription) { @@ -1487,8 +1587,10 @@ service_discovery_impl::on_endpoint_connected( its_subscription->set_state(its_client, subscription_state_e::ST_NOT_ACKNOWLEDGED); + const reliability_type_e its_reliability_type = + get_eventgroup_reliability(_service, _instance, its_eventgroup.first, its_subscription); auto its_data = create_eventgroup_entry(_service, _instance, - its_eventgroup.first, its_subscription, its_offer_type); + its_eventgroup.first, its_subscription, its_reliability_type); if (its_data.entry_) { add_entry_data(its_messages, its_data); @@ -1571,7 +1673,10 @@ service_discovery_impl::process_eventgroupentry( const std::vector<std::shared_ptr<option_impl> > &_options, std::shared_ptr<remote_subscription_ack> &_acknowledgement, const boost::asio::ip::address &_destination, - bool _is_stop_subscribe_subscribe, bool _force_initial_events) { + bool _is_stop_subscribe_subscribe, bool _force_initial_events, + const sd_acceptance_state_t& _sd_ac_state) { + + std::set<client_t> its_clients({0}); // maybe overridden for selectives auto its_sender = _acknowledgement->get_target_address(); auto its_session = _entry->get_owning_message()->get_session(); @@ -1601,7 +1706,7 @@ service_discovery_impl::process_eventgroupentry( << "] session: " << std::hex << std::setw(4) << std::setfill('0') << its_session << ", ttl: " << its_ttl; if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } } else { // We received a subscription [n]ack for an eventgroup that does not exist. @@ -1617,7 +1722,7 @@ service_discovery_impl::process_eventgroupentry( << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } @@ -1630,7 +1735,7 @@ service_discovery_impl::process_eventgroupentry( << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } @@ -1644,7 +1749,7 @@ service_discovery_impl::process_eventgroupentry( if (its_ttl > 0) { // increase number of required acks by one as number required acks // is calculated based on the number of referenced options - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } @@ -1655,7 +1760,7 @@ service_discovery_impl::process_eventgroupentry( << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } @@ -1672,7 +1777,7 @@ service_discovery_impl::process_eventgroupentry( << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { // set to 0 to ensure an answer containing at least this subscribe_nack is sent out - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } @@ -1697,7 +1802,6 @@ service_discovery_impl::process_eventgroupentry( boost::asio::ip::address its_second_address; uint16_t its_second_port(ILLEGAL_PORT); bool is_second_reliable(false); - std::set<client_t> its_clients({0}); // maybe overridden for selectives for (auto i : { 1, 2 }) { for (auto its_index : _entry->get_options(uint8_t(i))) { @@ -1718,7 +1822,7 @@ service_discovery_impl::process_eventgroupentry( << std::hex << std::setw(4) << std::setfill('0') << its_session; if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type && its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } @@ -1733,7 +1837,7 @@ service_discovery_impl::process_eventgroupentry( its_ipv4_option->get_address()); if (!check_layer_four_protocol(its_ipv4_option)) { if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } return; } @@ -1749,7 +1853,7 @@ service_discovery_impl::process_eventgroupentry( if (is_first_reliable == is_second_reliable && its_second_port != ILLEGAL_PORT) { if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ @@ -1763,7 +1867,7 @@ service_discovery_impl::process_eventgroupentry( if (!check_ipv4_address(its_first_address) || 0 == its_first_port) { if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ @@ -1784,7 +1888,7 @@ service_discovery_impl::process_eventgroupentry( if (is_second_reliable == is_first_reliable && its_first_port != ILLEGAL_PORT) { if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ @@ -1798,7 +1902,7 @@ service_discovery_impl::process_eventgroupentry( if (!check_ipv4_address(its_second_address) || 0 == its_second_port) { if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ @@ -1829,7 +1933,7 @@ service_discovery_impl::process_eventgroupentry( its_ipv6_option->get_address()); if (!check_layer_four_protocol(its_ipv6_option)) { if(its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << "Invalid layer 4 protocol type in IPv6 endpoint option specified! " @@ -1847,7 +1951,7 @@ service_discovery_impl::process_eventgroupentry( if (is_first_reliable == is_second_reliable && its_second_port != ILLEGAL_PORT) { if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ @@ -1867,7 +1971,7 @@ service_discovery_impl::process_eventgroupentry( if (is_second_reliable == is_first_reliable && its_first_port != ILLEGAL_PORT) { if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); } boost::system::error_code ec; VSOMEIP_ERROR << __func__ @@ -1983,7 +2087,7 @@ service_discovery_impl::process_eventgroupentry( << its_sender.to_string(ec) << " session: " << std::hex << std::setw(4) << std::setfill('0') << its_session; if (its_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients); return; } break; @@ -1997,7 +2101,7 @@ service_discovery_impl::process_eventgroupentry( its_first_address, its_first_port, is_first_reliable, its_second_address, its_second_port, is_second_reliable, _acknowledgement, _is_stop_subscribe_subscribe, - _force_initial_events, its_clients, its_info); + _force_initial_events, its_clients, _sd_ac_state, its_info); } else { if (entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) { //this type is used for ACK and NACK messages if (its_ttl > 0) { @@ -2025,6 +2129,7 @@ service_discovery_impl::handle_eventgroup_subscription( std::shared_ptr<remote_subscription_ack> &_acknowledgement, bool _is_stop_subscribe_subscribe, bool _force_initial_events, const std::set<client_t> &_clients, + const sd_acceptance_state_t& _sd_ac_state, const std::shared_ptr<eventgroupinfo>& _info) { (void)_counter; (void)_reserved; @@ -2062,7 +2167,7 @@ service_discovery_impl::handle_eventgroup_subscription( } } if (reliablility_nack && _ttl > 0) { - insert_subscription_ack(_acknowledgement, _info, 0); + insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients); boost::system::error_code ec; // TODO: Add sender and session id VSOMEIP_WARNING << __func__ @@ -2103,13 +2208,13 @@ service_discovery_impl::handle_eventgroup_subscription( << (uint32_t) _info->get_major() << "] subscriber: " << _first_address.to_string(ec) << ":" << std::dec << _first_port; if (_ttl > 0) { - insert_subscription_ack(_acknowledgement, its_info, 0); + insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, _clients); } return; } else { boost::asio::ip::address its_first_address, its_second_address; - uint16_t its_first_port, its_second_port; if (ILLEGAL_PORT != _first_port) { + uint16_t its_first_port(0); its_subscriber = endpoint_definition::get( _first_address, _first_port, _is_first_reliable, _service, _instance); if (!_is_first_reliable && @@ -2121,7 +2226,7 @@ service_discovery_impl::handle_eventgroup_subscription( its_reliable = its_subscriber; // check if TCP connection is established by client if (_ttl > 0 && !is_tcp_connected(_service, _instance, its_reliable)) { - insert_subscription_ack(_acknowledgement, _info, 0); + insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients); boost::system::error_code ec; // TODO: Add sender and session id VSOMEIP_ERROR << "TCP connection to target1: [" @@ -2139,6 +2244,7 @@ service_discovery_impl::handle_eventgroup_subscription( } if (ILLEGAL_PORT != _second_port) { + uint16_t its_second_port(0); its_subscriber = endpoint_definition::get( _second_address, _second_port, _is_second_reliable, _service, _instance); if (!_is_second_reliable && @@ -2150,7 +2256,7 @@ service_discovery_impl::handle_eventgroup_subscription( its_reliable = its_subscriber; // check if TCP connection is established by client if (_ttl > 0 && !is_tcp_connected(_service, _instance, its_reliable)) { - insert_subscription_ack(_acknowledgement, _info, 0); + insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients); boost::system::error_code ec; // TODO: Add sender and session id VSOMEIP_ERROR << "TCP connection to target2 : [" @@ -2168,6 +2274,26 @@ service_discovery_impl::handle_eventgroup_subscription( } } + // check if the subscription should be rejected because of sd_acceptance_handling + if (_ttl > 0 && _sd_ac_state.sd_acceptance_required_) { + bool insert_nack(false); + if (_first_port != ILLEGAL_PORT && !_sd_ac_state.accept_entries_ + && configuration_->is_protected_port(_first_address, + _first_port, _is_first_reliable)) { + insert_nack = true; + } + if (!insert_nack && _second_port != ILLEGAL_PORT + && !_sd_ac_state.accept_entries_ + && configuration_->is_protected_port(_second_address, + _second_port, _is_second_reliable)) { + insert_nack = true; + } + if (insert_nack) { + insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients); + return; + } + } + // Create subscription object auto its_subscription = std::make_shared<remote_subscription>(); its_subscription->set_eventgroupinfo(_info); @@ -2953,9 +3079,10 @@ service_discovery_impl::last_offer_shorter_half_offer_delay_ago() { bool service_discovery_impl::check_source_address( const boost::asio::ip::address &its_source_address) const { + bool is_valid = true; // Check if source address is same as nodes unicast address - if(unicast_ == its_source_address) { + if (unicast_ == its_source_address) { VSOMEIP_ERROR << "Source address of message is same as DUT's unicast address! : " << its_source_address.to_string(); is_valid = false; @@ -2965,18 +3092,21 @@ service_discovery_impl::check_source_address( void service_discovery_impl::set_diagnosis_mode(const bool _activate) { + is_diagnosis_ = _activate; } bool service_discovery_impl::get_diagnosis_mode() { + return is_diagnosis_; } void service_discovery_impl::update_remote_subscription( const std::shared_ptr<remote_subscription> &_subscription) { - if (!_subscription->is_pending()) { + + if (!_subscription->is_pending() || 0 == _subscription->get_answers()) { std::shared_ptr<remote_subscription_ack> its_ack; { std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); @@ -2995,13 +3125,15 @@ service_discovery_impl::update_remote_subscription( void service_discovery_impl::update_acknowledgement( const std::shared_ptr<remote_subscription_ack> &_acknowledgement) { + if (_acknowledgement->is_complete() && !_acknowledgement->is_pending() && !_acknowledgement->is_done()) { + send_subscription_ack(_acknowledgement); std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); - for (const auto& its_subscription : _acknowledgement->get_subscriptions()) + for (const auto &its_subscription : _acknowledgement->get_subscriptions()) pending_remote_subscriptions_.erase(its_subscription); } } @@ -3241,7 +3373,9 @@ service_discovery_impl::update_remote_offer_type( service_t _service, instance_t _instance, remote_offer_type_e _offer_type, const boost::asio::ip::address &_reliable_address, - const boost::asio::ip::address &_unreliable_address) { + std::uint16_t _reliable_port, + const boost::asio::ip::address &_unreliable_address, + std::uint16_t _unreliable_port) { bool ret(false); std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_); const std::pair<service_t, instance_t> its_si_pair = std::make_pair(_service, _instance); @@ -3256,13 +3390,18 @@ service_discovery_impl::update_remote_offer_type( } switch (_offer_type) { case remote_offer_type_e::UNRELIABLE: - remote_offers_by_ip_[_unreliable_address].insert(its_si_pair); + remote_offers_by_ip_[_unreliable_address][std::make_pair(false, + _unreliable_port)].insert(its_si_pair); break; case remote_offer_type_e::RELIABLE: - remote_offers_by_ip_[_reliable_address].insert(its_si_pair); + remote_offers_by_ip_[_reliable_address][std::make_pair(true, + _reliable_port)].insert(its_si_pair); break; case remote_offer_type_e::RELIABLE_UNRELIABLE: - remote_offers_by_ip_[_unreliable_address].insert(its_si_pair); + remote_offers_by_ip_[_unreliable_address][std::make_pair(false, + _unreliable_port)].insert(its_si_pair); + remote_offers_by_ip_[_unreliable_address][std::make_pair(true, + _reliable_port)].insert(its_si_pair); break; case remote_offer_type_e::UNKNOWN: default: @@ -3278,35 +3417,83 @@ service_discovery_impl::update_remote_offer_type( void service_discovery_impl::remove_remote_offer_type( service_t _service, instance_t _instance, - const boost::asio::ip::address &_address) { + const boost::asio::ip::address &_reliable_address, + std::uint16_t _reliable_port, + const boost::asio::ip::address &_unreliable_address, + std::uint16_t _unreliable_port) { std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_); const std::pair<service_t, instance_t> its_si_pair = std::make_pair(_service, _instance); remote_offer_types_.erase(its_si_pair); - auto found_services = remote_offers_by_ip_.find(_address); - if (found_services != remote_offers_by_ip_.end()) { - found_services->second.erase(its_si_pair); + + auto delete_from_remote_offers_by_ip = [&]( + const boost::asio::ip::address& _address, std::uint16_t _port, + bool _reliable) { + const auto found_address = remote_offers_by_ip_.find(_address); + if (found_address != remote_offers_by_ip_.end()) { + auto found_port = found_address->second.find( + std::make_pair(_reliable, _port)); + if (found_port != found_address->second.end()) { + if (found_port->second.erase(std::make_pair(_service, _instance))) { + if (found_port->second.empty()) { + found_address->second.erase(found_port); + if (found_address->second.empty()) { + remote_offers_by_ip_.erase(found_address); + } + } + } + } + } + }; + if (_reliable_port != ILLEGAL_PORT) { + delete_from_remote_offers_by_ip(_reliable_address, _reliable_port, + true); + } + if (_unreliable_port != ILLEGAL_PORT) { + delete_from_remote_offers_by_ip(_unreliable_address, _unreliable_port, + false); } } void service_discovery_impl::remove_remote_offer_type_by_ip( const boost::asio::ip::address &_address) { + remove_remote_offer_type_by_ip(_address, ANY_PORT, false); +} + +void service_discovery_impl::remove_remote_offer_type_by_ip( + const boost::asio::ip::address &_address, std::uint16_t _port, bool _reliable) { std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_); - auto found_services = remote_offers_by_ip_.find(_address); - if (found_services != remote_offers_by_ip_.end()) { - for (const auto& si : found_services->second) { - remote_offer_types_.erase(si); + const auto found_address = remote_offers_by_ip_.find(_address); + if (found_address != remote_offers_by_ip_.end()) { + if (_port == ANY_PORT) { + for (const auto& port : found_address->second) { + for (const auto& si : port.second) { + remote_offer_types_.erase(si); + } + } + remote_offers_by_ip_.erase(_address); + } else { + const auto its_port_reliability = std::make_pair(_reliable, _port); + const auto found_port = found_address->second.find(its_port_reliability); + if (found_port != found_address->second.end()) { + for (const auto& si : found_port->second) { + remote_offer_types_.erase(si); + } + found_address->second.erase(found_port); + if (found_address->second.empty()) { + remote_offers_by_ip_.erase(found_address); + } + } } } - remote_offers_by_ip_.erase(_address); } std::shared_ptr<subscription> service_discovery_impl::create_subscription( - service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, const std::shared_ptr<endpoint> &_reliable, - const std::shared_ptr<endpoint> &_unreliable) { + const std::shared_ptr<endpoint> &_unreliable, + const std::shared_ptr<eventgroupinfo> &_info) { auto its_subscription = std::make_shared<subscription>(); its_subscription->set_major(_major); its_subscription->set_ttl(_ttl); @@ -3322,10 +3509,9 @@ service_discovery_impl::create_subscription( } // check whether the eventgroup is selective - auto its_eventgroup = host_->find_eventgroup(_service, _instance, _eventgroup); - if (its_eventgroup) { - its_subscription->set_selective(its_eventgroup->is_selective()); - } + its_subscription->set_selective(_info->is_selective()); + + its_subscription->set_eventgroupinfo(_info); return its_subscription; } @@ -3333,33 +3519,70 @@ service_discovery_impl::create_subscription( void service_discovery_impl::send_subscription_ack( const std::shared_ptr<remote_subscription_ack> &_acknowledgement) { + if (_acknowledgement->is_done()) return; _acknowledgement->done(); - std::uint32_t its_answers(1); + std::uint32_t its_max_answers(1); // Must be 1 as "_acknowledgement" not + // necessarily contains subscriptions + bool do_not_answer(false); + std::shared_ptr<remote_subscription> its_parent; // Find highest number of necessary answers for (const auto& its_subscription : _acknowledgement->get_subscriptions()) { - if (its_subscription->get_answers() > its_answers) - its_answers = its_subscription->get_answers(); + auto its_answers = its_subscription->get_answers(); + if (its_answers > its_max_answers) { + its_max_answers = its_answers; + } else if (its_answers == 0) { + do_not_answer = true; + its_parent = its_subscription->get_parent(); + } + } + + if (do_not_answer) { + if (its_parent) { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + auto its_parent_ack = pending_remote_subscriptions_[its_parent]; + if (its_parent_ack) { + for (const auto &its_subscription : its_parent_ack->get_subscriptions()) { + if (its_subscription != its_parent) + its_subscription->set_answers(its_subscription->get_answers() + 1); + } + } + } + return; } // send messages - for (std::uint32_t i = 0; i < its_answers; i++) { - for (const auto& its_subscription : _acknowledgement->get_subscriptions()) { + for (std::uint32_t i = 0; i < its_max_answers; i++) { + for (const auto &its_subscription : _acknowledgement->get_subscriptions()) { if (i < its_subscription->get_answers()) { if (its_subscription->get_ttl() > 0) { auto its_info = its_subscription->get_eventgroupinfo(); if (its_info) { + std::set<client_t> its_acked; + std::set<client_t> its_nacked; for (const auto& its_client : its_subscription->get_clients()) { - auto its_ttl = (its_subscription->get_client_state(its_client) - == remote_subscription_state_e::SUBSCRIPTION_ACKED ? - its_subscription->get_ttl() : 0); + if (its_subscription->get_client_state(its_client) + == remote_subscription_state_e::SUBSCRIPTION_ACKED) { + its_acked.insert(its_client); + } else { + its_nacked.insert(its_client); + } + } - insert_subscription_ack(_acknowledgement, its_info, its_ttl, - its_subscription->get_subscriber(), its_client); + if (0 < its_acked.size()) { + insert_subscription_ack(_acknowledgement, its_info, + its_subscription->get_ttl(), + its_subscription->get_subscriber(), its_acked); + } + + if (0 < its_nacked.size()) { + insert_subscription_ack(_acknowledgement, its_info, + 0, + its_subscription->get_subscriber(), its_nacked); } } } @@ -3371,6 +3594,8 @@ service_discovery_impl::send_subscription_ack( update_subscription_expiration_timer(its_messages); } + std::this_thread::yield(); + // We might need to send initial events for (const auto &its_subscription : _acknowledgement->get_subscriptions()) { // Assumption: We do _NOT_ need to check whether this is a child @@ -3430,5 +3655,58 @@ service_discovery_impl::register_reboot_notification_handler( reboot_notification_handler_ = _handler; } +reliability_type_e service_discovery_impl::get_eventgroup_reliability( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<subscription>& _subscription) { + reliability_type_e its_reliability = reliability_type_e::RT_UNKNOWN; + auto its_info = _subscription->get_eventgroupinfo().lock(); + if (its_info) { + its_reliability = its_info->get_reliability(); + if (its_reliability == reliability_type_e::RT_UNKNOWN + && its_info->is_reliability_auto_mode()) { + // fallback: determine how service is offered + // and update reliability type of eventgroup + switch (get_remote_offer_type(_service, _instance)) { + case remote_offer_type_e::RELIABLE: + its_reliability = reliability_type_e::RT_RELIABLE; + break; + case remote_offer_type_e::UNRELIABLE: + its_reliability = reliability_type_e::RT_UNRELIABLE; + break; + case remote_offer_type_e::RELIABLE_UNRELIABLE: + its_reliability = reliability_type_e::RT_BOTH; + break; + default: + ; + } + VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't determine eventgroup reliability type for [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " using reliability type: " + << std::hex << std::setw(4) << std::setfill('0') << (uint16_t) its_reliability; + its_info->set_reliability(its_reliability); + } + } else { + VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't lock eventgroupinfo [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "; + auto its_eg_info = host_->find_eventgroup(_service, _instance, _eventgroup); + if (its_eg_info) { + _subscription->set_eventgroupinfo(its_eg_info); + its_reliability = its_eg_info->get_reliability(); + } + } + + if (its_reliability == reliability_type_e::RT_UNKNOWN) { + VSOMEIP_WARNING << "sd::" << __func__ << ": eventgroup reliability type is unknown [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + } + return its_reliability; +} + } // namespace sd } // namespace vsomeip_v3 |