diff options
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r-- | implementation/service_discovery/src/service_discovery_impl.cpp | 258 |
1 files changed, 170 insertions, 88 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index ca4e131..74e509b 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -165,9 +165,10 @@ service_discovery_impl::start() { } } if (endpoint_ && !reliable_) { - // rejoin multicast group - dynamic_cast<udp_server_endpoint_impl*>( - endpoint_.get())->join(sd_multicast_); + auto its_endpoint = std::dynamic_pointer_cast< + udp_server_endpoint_impl>(endpoint_); + if (its_endpoint) + its_endpoint->join(sd_multicast_); } } is_suspended_ = false; @@ -213,17 +214,17 @@ service_discovery_impl::release_service( } } -std::shared_ptr<request> -service_discovery_impl::find_request(service_t _service, instance_t _instance) { +void +service_discovery_impl::update_request(service_t _service, instance_t _instance) { std::lock_guard<std::mutex> its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { auto find_instance = find_service->second.find(_instance); if (find_instance != find_service->second.end()) { - return find_instance->second; + find_instance->second->set_sent_counter( + std::uint8_t(repetitions_max_ + 1)); } } - return nullptr; } void @@ -232,6 +233,13 @@ service_discovery_impl::subscribe( eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, client_t _client, const std::shared_ptr<eventgroupinfo> &_info) { + + if (is_suspended_) { + VSOMEIP_WARNING << "service_discovery::" << __func__ + << ": Ignoring subscription as we are suspended."; + return; + } + #ifdef VSOMEIP_ENABLE_COMPAT bool is_selective(_info ? _info->is_selective() : false); #endif // VSOMEIP_ENABLE_COMPAT @@ -248,8 +256,8 @@ service_discovery_impl::subscribe( if (!its_subscription->is_selective() && is_selective) { its_subscription->set_selective(true); its_subscription->remove_client(VSOMEIP_ROUTING_CLIENT); - for (const auto& e : _info->get_events()) { - for (const auto& c : e->get_subscribers(_eventgroup)) { + for (const auto &e : _info->get_events()) { + for (const auto &c : e->get_subscribers(_eventgroup)) { its_subscription->add_client(c); } } @@ -517,23 +525,65 @@ service_discovery_impl::unsubscribe_all( } void -service_discovery_impl::reset_subscriptions( +service_discovery_impl::unsubscribe_all_on_suspend() { + + std::map<boost::asio::ip::address, + std::vector<std::shared_ptr<message_impl> > > its_stopsubscribes; + + { + std::lock_guard<std::mutex> its_lock(subscribed_mutex_); + for (auto its_service : subscribed_) { + for (auto its_instance : its_service.second) { + for (auto &its_eventgroup : its_instance.second) { + boost::asio::ip::address its_address; + auto its_current_message = std::make_shared<message_impl>(); + auto its_subscription = its_eventgroup.second; + its_subscription->set_ttl(0); + const reliability_type_e its_reliability = + get_eventgroup_reliability(its_service.first, its_instance.first, + its_eventgroup.first, its_subscription); + auto its_data = create_eventgroup_entry(its_service.first, its_instance.first, + its_eventgroup.first, its_subscription, its_reliability); + auto its_reliable = its_subscription->get_endpoint(true); + auto its_unreliable = its_subscription->get_endpoint(false); + get_subscription_address( + its_reliable, its_unreliable, its_address); + if (its_data.entry_ + && its_current_message->add_entry_data(its_data.entry_, its_data.options_)) { + its_stopsubscribes[its_address].push_back(its_current_message); + } else { + VSOMEIP_WARNING << __func__ << ": Failed to create StopSubscribe entry for: " + << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << "." + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup.first + << " address: " << its_address.to_string(); + } + } + its_instance.second.clear(); + } + its_service.second.clear(); + } + subscribed_.clear(); + } + + for (auto its_address : its_stopsubscribes) { + if (!serialize_and_send(its_address.second, its_address.first)) { + VSOMEIP_WARNING << __func__ << ": Failed to send StopSubscribe to address: " + << its_address.first.to_string(); + } + } +} + +void +service_discovery_impl::remove_subscriptions( service_t _service, instance_t _instance) { std::lock_guard<std::mutex> its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - for (auto &its_eventgroup : found_instance->second) { - auto its_subscription = its_eventgroup.second; - for (auto its_client : its_subscription->get_clients()) { - its_subscription->set_state(its_client, - subscription_state_e::ST_UNKNOWN); - } - its_subscription->set_endpoint(nullptr, true); - its_subscription->set_endpoint(nullptr, false); - } + found_service->second.erase(_instance); + if (found_service->second.empty()) { + subscribed_.erase(found_service); } } } @@ -641,16 +691,16 @@ void service_discovery_impl::insert_find_entries( std::vector<std::shared_ptr<message_impl> > &_messages, const requests_t &_requests) { - std::lock_guard<std::mutex> its_lock(requested_mutex_); entry_data_t its_data; its_data.entry_ = its_data.other_ = nullptr; for (const auto& its_service : _requests) { for (const auto& its_instance : its_service.second) { + std::lock_guard<std::mutex> its_lock(requested_mutex_); auto its_request = its_instance.second; - // check if release_service was called + // check if release_service was called / offer was received auto the_service = requested_.find(its_service.first); if ( the_service != requested_.end() ) { auto the_instance = the_service->second.find(its_instance.first); @@ -722,17 +772,37 @@ service_discovery_impl::create_eventgroup_entry( case reliability_type_e::RT_RELIABLE: if (its_reliable_endpoint) { insert_reliable = true; + } else { + VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " + "reliable endpoint is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } break; case reliability_type_e::RT_UNRELIABLE: if (its_unreliable_endpoint) { insert_unreliable = true; + } else { + VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " + "unreliable endpoint is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } break; case reliability_type_e::RT_BOTH: if (its_reliable_endpoint && its_unreliable_endpoint) { insert_reliable = true; insert_unreliable = true; + } else { + VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " + "endpoint is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " reliable: " << !!its_reliable_endpoint + << " unreliable: " << !!its_unreliable_endpoint; } break; default: @@ -817,7 +887,6 @@ service_discovery_impl::create_eventgroup_entry( its_entry->set_counter(0); its_entry->set_major_version(_subscription->get_major()); its_entry->set_ttl(_subscription->get_ttl()); - its_data.entry_ = its_entry; } @@ -1008,14 +1077,18 @@ service_discovery_impl::on_message( } const bool received_via_mcast = (_destination == sd_multicast_address_); if (received_via_mcast) { - std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_); + static bool must_start_last_msg_received_timer(true); boost::system::error_code ec; - last_msg_received_timer_.cancel(ec); - last_msg_received_timer_.expires_from_now( - last_msg_received_timer_timeout_, ec); - last_msg_received_timer_.async_wait( - std::bind(&service_discovery_impl::on_last_msg_received_timer_expired, - shared_from_this(), std::placeholders::_1)); + + std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_); + if (0 < last_msg_received_timer_.cancel(ec) || must_start_last_msg_received_timer) { + must_start_last_msg_received_timer = false; + last_msg_received_timer_.expires_from_now( + last_msg_received_timer_timeout_, ec); + last_msg_received_timer_.async_wait( + std::bind(&service_discovery_impl::on_last_msg_received_timer_expired, + shared_from_this(), std::placeholders::_1)); + } } current_remote_address_ = _sender; @@ -1256,17 +1329,15 @@ service_discovery_impl::process_serviceentry( default: VSOMEIP_ERROR << __func__ << ": Unsupported service entry type"; } - } else if (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_) { - std::shared_ptr<request> its_request = find_request(its_service, its_instance); - if (its_request) { - std::lock_guard<std::mutex> its_lock(requested_mutex_); - // ID: SIP_SD_830 - its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); - } + } else if (its_type != entry_type_e::FIND_SERVICE + && (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_)) { + // stop sending find service in repetition phase + update_request(its_service, its_instance); + remove_remote_offer_type(its_service, its_instance, its_reliable_address, its_reliable_port, its_unreliable_address, its_unreliable_port); - reset_subscriptions(its_service, its_instance); + remove_subscriptions(its_service, its_instance); if (!is_diagnosis_ && !is_suspended_) { host_->del_routing_info(its_service, its_instance, (its_reliable_port != ILLEGAL_PORT), @@ -1304,11 +1375,9 @@ service_discovery_impl::process_offerservice_serviceentry( return; } - std::shared_ptr<request> its_request = find_request(_service, _instance); - if (its_request) { - std::lock_guard<std::mutex> its_lock(requested_mutex_); - its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); - } + // stop sending find service in repetition phase + update_request(_service, _instance); + remote_offer_type_e offer_type(remote_offer_type_e::UNKNOWN); if (_reliable_port != ILLEGAL_PORT && _unreliable_port != ILLEGAL_PORT @@ -1430,17 +1499,9 @@ service_discovery_impl::process_offerservice_serviceentry( } } - host_->add_routing_info(_service, _instance, - _major, _minor, - _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_), - _reliable_address, _reliable_port, - _unreliable_address, _unreliable_port); // No need to resubscribe for unicast offers if (_received_via_mcast) { - std::int32_t its_remaining = VSOMEIP_MAX_UDP_MESSAGE_SIZE; - its_remaining -= _resubscribes.back()->get_size(); - std::lock_guard<std::mutex> its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { @@ -1482,6 +1543,12 @@ service_discovery_impl::process_offerservice_serviceentry( } } } + + host_->add_routing_info(_service, _instance, + _major, _minor, + _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_), + _reliable_address, _reliable_port, + _unreliable_address, _unreliable_port); } void @@ -1616,7 +1683,7 @@ service_discovery_impl::on_endpoint_connected( its_subscription->set_endpoint(its_reliable, true); its_subscription->set_endpoint(its_unreliable, false); - for (const auto& its_client : its_subscription->get_clients()) + for (const auto its_client : its_subscription->get_clients()) its_subscription->set_state(its_client, subscription_state_e::ST_NOT_ACKNOWLEDGED); @@ -1726,7 +1793,7 @@ service_discovery_impl::process_eventgroupentry( // We received a subscription for a non-existing eventgroup. // --> Create dummy eventgroupinfo to send Nack. its_info = std::make_shared<eventgroupinfo>(its_service, its_instance, - its_eventgroup, its_major, its_ttl); + its_eventgroup, its_major, its_ttl, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS); boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Received a SubscribeEventGroup entry for unknown eventgroup " @@ -1743,6 +1810,16 @@ service_discovery_impl::process_eventgroupentry( // We received a subscription [n]ack for an eventgroup that does not exist. // --> Remove subscription. unsubscribe(its_service, its_instance, its_eventgroup, VSOMEIP_ROUTING_CLIENT); + + boost::system::error_code ec; + VSOMEIP_WARNING << __func__ + << ": Received a SubscribeEventGroup[N]Ack entry for unknown eventgroup " + << " from: " << its_sender.to_string(ec) << " for: [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup + << "] session: " << std::hex << std::setw(4) << std::setfill('0') + << its_session << ", ttl: " << its_ttl; } return; } @@ -2226,7 +2303,7 @@ service_discovery_impl::handle_eventgroup_subscription( if (_major != _info->get_major()) { // Create a temporary info object with TTL=0 --> send NACK auto its_info = std::make_shared<eventgroupinfo>(_service, _instance, - _eventgroup, _major, 0); + _eventgroup, _major, 0, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS); boost::system::error_code ec; // TODO: Add session id VSOMEIP_ERROR << __func__ @@ -2325,42 +2402,44 @@ service_discovery_impl::handle_eventgroup_subscription( } } - // Create subscription object - auto its_subscription = std::make_shared<remote_subscription>(); - its_subscription->set_eventgroupinfo(_info); - its_subscription->set_subscriber(its_subscriber); - its_subscription->set_reliable(its_reliable); - its_subscription->set_unreliable(its_unreliable); - its_subscription->reset(_clients); + if (its_subscriber) { + // Create subscription object + auto its_subscription = std::make_shared<remote_subscription>(); + its_subscription->set_eventgroupinfo(_info); + its_subscription->set_subscriber(its_subscriber); + its_subscription->set_reliable(its_reliable); + its_subscription->set_unreliable(its_unreliable); + its_subscription->reset(_clients); - if (_ttl == 0) { // --> unsubscribe - its_subscription->set_ttl(0); - if (!_is_stop_subscribe_subscribe) { - { - std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); - pending_remote_subscriptions_[its_subscription] = _acknowledgement; - _acknowledgement->add_subscription(its_subscription); + if (_ttl == 0) { // --> unsubscribe + its_subscription->set_ttl(0); + if (!_is_stop_subscribe_subscribe) { + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + pending_remote_subscriptions_[its_subscription] = _acknowledgement; + _acknowledgement->add_subscription(its_subscription); + } + host_->on_remote_unsubscribe(its_subscription); } - host_->on_remote_unsubscribe(its_subscription); + return; } - return; - } - if (_force_initial_events) { - its_subscription->set_force_initial_events(true); - } - its_subscription->set_ttl(_ttl - * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_)); + if (_force_initial_events) { + its_subscription->set_force_initial_events(true); + } + its_subscription->set_ttl(_ttl + * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_)); - { - std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); - pending_remote_subscriptions_[its_subscription] = _acknowledgement; - _acknowledgement->add_subscription(its_subscription); - } + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + pending_remote_subscriptions_[its_subscription] = _acknowledgement; + _acknowledgement->add_subscription(its_subscription); + } - host_->on_remote_subscribe(its_subscription, - std::bind(&service_discovery_impl::update_remote_subscription, - shared_from_this(), std::placeholders::_1)); + host_->on_remote_subscribe(its_subscription, + std::bind(&service_discovery_impl::update_remote_subscription, + shared_from_this(), std::placeholders::_1)); + } } void @@ -2380,7 +2459,7 @@ service_discovery_impl::handle_eventgroup_subscription_nack( for (const auto& its_client : _clients) { host_->on_subscribe_nack(its_client, _service, _instance, _eventgroup, ANY_EVENT, - PENDING_SUBSCRIPTION_ID); // TODO: This is a dummy call... + PENDING_SUBSCRIPTION_ID, false); // TODO: This is a dummy call... } @@ -3339,6 +3418,7 @@ service_discovery_impl::get_ttl_factor( void service_discovery_impl::on_last_msg_received_timer_expired( const boost::system::error_code &_error) { + if (!_error) { // We didn't receive a multicast message within 110% of the cyclic_offer_delay_ VSOMEIP_WARNING << "Didn't receive a multicast SD message for " << @@ -3346,8 +3426,10 @@ service_discovery_impl::on_last_msg_received_timer_expired( // Rejoin multicast group if (endpoint_ && !reliable_) { - dynamic_cast<udp_server_endpoint_impl*>( - endpoint_.get())->join(sd_multicast_); + auto its_endpoint = std::dynamic_pointer_cast< + udp_server_endpoint_impl>(endpoint_); + if (its_endpoint) + its_endpoint->join(sd_multicast_); } { boost::system::error_code ec; |