diff options
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r-- | implementation/service_discovery/src/service_discovery_impl.cpp | 400 |
1 files changed, 297 insertions, 103 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index 0cc2d6f..9fc955d 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -59,6 +59,8 @@ service_discovery_impl::service_discovery_impl(service_discovery_host *_host) repetitions_max_(VSOMEIP_SD_DEFAULT_REPETITIONS_MAX), cyclic_offer_delay_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY), offer_debounce_timer_(_host->get_io()), + find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME), + find_debounce_timer_(_host->get_io()), main_phase_timer_(_host->get_io()), is_suspended_(true) { std::chrono::seconds smallest_ttl(DEFAULT_TTL); @@ -140,22 +142,12 @@ void service_discovery_impl::start() { VSOMEIP_ERROR << "Couldn't start service discovery"; return; } - // Send out pending find services messages if have any - bool send_find(false); - { - std::lock_guard<std::mutex> its_lock(requested_mutex_); - if (requested_.size()) { - send_find = true; - } - } - if (send_find) { - send(false, true); - } } is_suspended_ = false; start_main_phase_timer(); start_offer_debounce_timer(true); + start_find_debounce_timer(true); } void service_discovery_impl::stop() { @@ -170,36 +162,38 @@ void service_discovery_impl::stop() { offer_debounce_timer_.cancel(ec); } { + std::lock_guard<std::mutex> its_lock(find_debounce_timer_mutex_); + find_debounce_timer_.cancel(ec); + } + { std::lock_guard<std::mutex> its_lock(repetition_phase_timers_mutex_); for(const auto &t : repetition_phase_timers_) { t.first->cancel(ec); } } + { + std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_); + for(const auto &t : find_repetition_phase_timers_) { + t.first->cancel(ec); + } + } + } void service_discovery_impl::request_service(service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl) { - bool is_new_request(true); - { - std::lock_guard<std::mutex> its_lock(requested_mutex_); - auto find_service = requested_.find(_service); - if (find_service != requested_.end()) { - auto find_instance = find_service->second.find(_instance); - if (find_instance != find_service->second.end()) { - is_new_request = false; - // TODO: check version and report errors - } else { - find_service->second[_instance] = std::make_shared < request - > (_major, _minor, _ttl); - } - } else { - requested_[_service][_instance] = std::make_shared < request + std::lock_guard<std::mutex> its_lock(requested_mutex_); + auto find_service = requested_.find(_service); + if (find_service != requested_.end()) { + auto find_instance = find_service->second.find(_instance); + if (find_instance == find_service->second.end()) { + find_service->second[_instance] = std::make_shared < request > (_major, _minor, _ttl); } - } - if (is_new_request && !is_suspended_) { - send(false, true); + } else { + requested_[_service][_instance] = std::make_shared < request + > (_major, _minor, _ttl); } } @@ -239,11 +233,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, if (found_eventgroup != found_instance->second.end()) { auto found_client = found_eventgroup->second.find(_client); if (found_client != found_eventgroup->second.end()) { - if (found_client->second->get_major() == _major) { - found_client->second->set_ttl(_ttl); - found_client->second->set_expiration(std::chrono::steady_clock::now() - + std::chrono::seconds(_ttl)); - } else { + if (found_client->second->get_major() != _major) { VSOMEIP_ERROR << "Subscriptions to different versions of the same " "service instance are not supported!"; @@ -281,8 +271,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, // New subscription std::shared_ptr < subscription > its_subscription = std::make_shared < subscription > (_major, _ttl, its_reliable, its_unreliable, - _subscription_type, subscribe_count, - std::chrono::steady_clock::time_point() + std::chrono::seconds(_ttl)); + _subscription_type, subscribe_count); subscribed_[_service][_instance][_eventgroup][_client] = its_subscription; if (has_address) { @@ -774,44 +763,53 @@ void service_discovery_impl::insert_option( } void service_discovery_impl::insert_find_entries( - std::shared_ptr<message_impl> &_message, + std::shared_ptr<message_impl> &_message, const requests_t &_requests, uint32_t _start, uint32_t &_size, bool &_done) { std::lock_guard<std::mutex> its_lock(requested_mutex_); uint32_t its_size(0); uint32_t i = 0; _done = true; - for (auto its_service : requested_) { + for (auto its_service : _requests) { for (auto its_instance : its_service.second) { auto its_request = its_instance.second; - uint8_t its_sent_counter = its_request->get_sent_counter(); - if (its_sent_counter != repetitions_max_ + 1) { - if (i >= _start) { - if (its_size + VSOMEIP_SOMEIP_SD_ENTRY_SIZE <= max_message_size_) { - std::shared_ptr < serviceentry_impl > its_entry = - _message->create_service_entry(); - if (its_entry) { - its_entry->set_type(entry_type_e::FIND_SERVICE); - its_entry->set_service(its_service.first); - its_entry->set_instance(its_instance.first); - its_entry->set_major_version(its_request->get_major()); - its_entry->set_minor_version(its_request->get_minor()); - its_entry->set_ttl(its_request->get_ttl()); - its_size += VSOMEIP_SOMEIP_SD_ENTRY_SIZE; - its_sent_counter++; - - its_request->set_sent_counter(its_sent_counter); - } else { - VSOMEIP_ERROR << "Failed to create service entry!"; + + // check if release_service was called + auto the_service = requested_.find(its_service.first); + if ( the_service != requested_.end() ) { + auto the_instance = the_service->second.find(its_instance.first); + if(the_instance != the_service->second.end() ) { + uint8_t its_sent_counter = its_request->get_sent_counter(); + if (its_sent_counter != repetitions_max_ + 1) { + if (i >= _start) { + if (its_size + VSOMEIP_SOMEIP_SD_ENTRY_SIZE <= max_message_size_) { + std::shared_ptr < serviceentry_impl > its_entry = + _message->create_service_entry(); + if (its_entry) { + its_entry->set_type(entry_type_e::FIND_SERVICE); + its_entry->set_service(its_service.first); + its_entry->set_instance(its_instance.first); + its_entry->set_major_version(its_request->get_major()); + its_entry->set_minor_version(its_request->get_minor()); + its_entry->set_ttl(its_request->get_ttl()); + its_size += VSOMEIP_SOMEIP_SD_ENTRY_SIZE; + its_sent_counter++; + + its_request->set_sent_counter(its_sent_counter); + } else { + VSOMEIP_ERROR << "Failed to create service entry!"; + } + } else { + _done = false; + _size = its_size; + return; + } } - } else { - _done = false; - _size = its_size; - return; } + i++; } } - i++; + } } _size = its_size; @@ -912,7 +910,7 @@ void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared std::shared_ptr < endpoint > its_endpoint; its_endpoint = _subscription->get_endpoint(true); - if (its_endpoint) { + if (its_endpoint && its_endpoint->is_connected()) { insert_option(_message, its_stop_entry, unicast_, its_endpoint->get_local_port(), true); insert_option(_message, its_entry, unicast_, @@ -931,6 +929,25 @@ void service_discovery_impl::insert_subscription_ack( std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, uint8_t _counter, major_version_t _major, uint16_t _reserved) { + + for (auto its_entry : _message->get_entries()) { + if (its_entry->is_eventgroup_entry()) { + std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry = + std::dynamic_pointer_cast < eventgroupentry_impl + > (its_entry); + if(its_eventgroup_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK + && its_eventgroup_entry->get_service() == _service + && its_eventgroup_entry->get_instance() == _instance + && its_eventgroup_entry->get_eventgroup() == _eventgroup + && its_eventgroup_entry->get_major_version() == _major + && its_eventgroup_entry->get_reserved() == _reserved + && its_eventgroup_entry->get_counter() == _counter + && its_eventgroup_entry->get_ttl() == _ttl) { + return; + } + } + } + std::shared_ptr < eventgroupentry_impl > its_entry = _message->create_eventgroup_entry(); its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP_ACK); @@ -968,36 +985,23 @@ void service_discovery_impl::insert_subscription_nack( its_entry->set_ttl(0x0); } -bool service_discovery_impl::send(bool _is_announcing, bool _is_find) { +bool service_discovery_impl::send(bool _is_announcing) { std::shared_ptr < runtime > its_runtime = runtime_.lock(); if (its_runtime) { std::vector< std::shared_ptr< message_impl > > its_messages; std::shared_ptr < message_impl > its_message; - if (_is_find || !_is_announcing) { - uint32_t its_start(0); - uint32_t its_size(0); - bool is_done(false); - while (!is_done) { - its_message = its_runtime->create_message(); - its_messages.push_back(its_message); - - insert_find_entries(its_message, its_start, its_size, is_done); - its_start += its_size / VSOMEIP_SOMEIP_SD_ENTRY_SIZE; - }; - } else { + if(_is_announcing) { its_message = its_runtime->create_message(); its_messages.push_back(its_message); - } - if (!_is_find) { services_t its_offers = host_->get_offered_services(); fill_message_with_offer_entries(its_runtime, its_message, its_messages, its_offers, false); - } - // Serialize and send - return serialize_and_send_messages(its_messages); + // Serialize and send + return serialize_and_send_messages(its_messages); + } } return false; } @@ -1063,7 +1067,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry = std::dynamic_pointer_cast < eventgroupentry_impl > (its_entry); - process_eventgroupentry( its_eventgroup_entry, its_options, its_message_response, accepted_subscribers); + process_eventgroupentry( its_eventgroup_entry, its_options, its_message_response, accepted_subscribers, _destination); } } @@ -1182,6 +1186,8 @@ void service_discovery_impl::process_serviceentry( } else { std::shared_ptr<request> its_request = find_request(its_service, its_instance); if (its_request) { + std::lock_guard<std::mutex> its_lock(requested_mutex_); + // ID: SIP_SD_830 its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); } unsubscribe_all(its_service, its_instance); @@ -1203,8 +1209,10 @@ void service_discovery_impl::process_offerservice_serviceentry( return; std::shared_ptr<request> its_request = find_request(_service, _instance); - if (its_request) + if (its_request) { + std::lock_guard<std::mutex> its_lock(requested_mutex_); its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); + } smallest_ttl_ = host_->add_routing_info(_service, _instance, _major, _minor, _ttl, @@ -1547,7 +1555,8 @@ void service_discovery_impl::process_eventgroupentry( std::shared_ptr<eventgroupentry_impl> &_entry, const std::vector<std::shared_ptr<option_impl> > &_options, std::shared_ptr < message_impl > &its_message_response, - std::vector <accepted_subscriber_t> &accepted_subscribers) { + std::vector <accepted_subscriber_t> &accepted_subscribers, + const boost::asio::ip::address &_destination) { service_t its_service = _entry->get_service(); instance_t its_instance = _entry->get_instance(); eventgroup_t its_eventgroup = _entry->get_eventgroup(); @@ -1567,6 +1576,14 @@ void service_discovery_impl::process_eventgroupentry( } if(its_type == entry_type_e::SUBSCRIBE_EVENTGROUP) { + if( _destination.is_multicast() ) { + VSOMEIP_ERROR << "Received a SubscribeEventGroup entry on multicast address"; + if(its_ttl > 0) { + insert_subscription_nack(its_message_response, its_service, its_instance, + its_eventgroup, its_counter, its_major, its_reserved); + } + return; + } if (_entry->get_num_options(1) == 0 && _entry->get_num_options(2) == 0) { VSOMEIP_ERROR << "Invalid number of options in SubscribeEventGroup entry"; @@ -1785,6 +1802,7 @@ void service_discovery_impl::process_eventgroupentry( if(its_ttl > 0) { insert_subscription_nack(its_message_response, its_service, its_instance, its_eventgroup, its_counter, its_major, its_reserved); + return; } break; } @@ -1982,12 +2000,27 @@ void service_discovery_impl::handle_eventgroup_subscription_nack(service_t _serv if (client.second->get_counter() == _counter) { // Deliver nack nackedClient = client.first; - host_->on_subscribe_nack(client.first, _service, _instance, _eventgroup); + host_->on_subscribe_nack(client.first, _service, + _instance, _eventgroup, ANY_EVENT); break; } } - // Remove nacked subscription - found_eventgroup->second.erase(nackedClient); + + // Restart TCP connection only for non selective subscriptions + for (auto client : found_eventgroup->second) { + if( !client.second->is_acknowledged() + && client.first == VSOMEIP_ROUTING_CLIENT ) { + auto endpoint = client.second->get_endpoint(true); + if(endpoint) { + endpoint->restart(); + } + } + } + + // Remove nacked subscription only for selective events + if(nackedClient != VSOMEIP_ROUTING_CLIENT) { + found_eventgroup->second.erase(nackedClient); + } } } } @@ -2010,7 +2043,7 @@ void service_discovery_impl::handle_eventgroup_subscription_ack( if (its_client.second->get_counter() == _counter) { its_client.second->set_acknowledged(true); host_->on_subscribe_ack(its_client.first, _service, - _instance, _eventgroup); + _instance, _eventgroup, ANY_EVENT); } if (_address.is_multicast()) { host_->on_subscribe_ack(_service, _instance, _address, @@ -2294,6 +2327,92 @@ void service_discovery_impl::start_offer_debounce_timer(bool _first_start) { this, std::placeholders::_1)); } + + +void service_discovery_impl::start_find_debounce_timer(bool _first_start) { + std::lock_guard<std::mutex> its_lock(find_debounce_timer_mutex_); + boost::system::error_code ec; + if (_first_start) { + find_debounce_timer_.expires_from_now(initial_delay_, ec); + } else { + find_debounce_timer_.expires_from_now(find_debounce_time_, ec); + } + if (ec) { + VSOMEIP_ERROR<< "service_discovery_impl::start_find_debounce_timer " + "setting expiry time of timer failed: " << ec.message(); + } + find_debounce_timer_.async_wait( + std::bind( + &service_discovery_impl::on_find_debounce_timer_expired, + this, std::placeholders::_1)); +} + +//initial delay +void service_discovery_impl::on_find_debounce_timer_expired( + const boost::system::error_code &_error) { + if(_error) { // timer was canceled + return; + } + // only copy the accumulated requests of the initial wait phase + // if the sent counter for the request is zero. + requests_t repetition_phase_finds; + bool new_finds(false); + { + std::lock_guard<std::mutex> its_lock(requested_mutex_); + for (const auto its_service : requested_) { + for (const auto its_instance : its_service.second) { + if( its_instance.second->get_sent_counter() == 0) { + repetition_phase_finds[its_service.first][its_instance.first] = its_instance.second; + } + } + } + if (repetition_phase_finds.size()) { + new_finds = true; + } + } + + if (!new_finds) { + start_find_debounce_timer(false); + return; + } + + // Sent out finds for the first time as initial wait phase ended + std::shared_ptr<runtime> its_runtime = runtime_.lock(); + if (its_runtime) { + std::vector<std::shared_ptr<message_impl>> its_messages; + std::shared_ptr<message_impl> its_message = + its_runtime->create_message(); + its_messages.push_back(its_message); + // Serialize and send FindService (increments sent counter in requested_ map) + fill_message_with_find_entries(its_runtime, its_message, its_messages, + repetition_phase_finds); + serialize_and_send_messages(its_messages); + } + + std::chrono::milliseconds its_delay(repetitions_base_delay_); + std::uint8_t its_repetitions(1); + + std::shared_ptr<boost::asio::steady_timer> its_timer = std::make_shared< + boost::asio::steady_timer>(host_->get_io()); + { + std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_); + find_repetition_phase_timers_[its_timer] = repetition_phase_finds; + } + + boost::system::error_code ec; + its_timer->expires_from_now(its_delay, ec); + if (ec) { + VSOMEIP_ERROR<< "service_discovery_impl::on_find_debounce_timer_expired " + "setting expiry time of timer failed: " << ec.message(); + } + its_timer->async_wait( + std::bind( + &service_discovery_impl::on_find_repetition_phase_timer_expired, + this, std::placeholders::_1, its_timer, its_repetitions, + its_delay.count())); + start_find_debounce_timer(false); +} + void service_discovery_impl::on_offer_debounce_timer_expired( const boost::system::error_code &_error) { if(_error) { // timer was canceled @@ -2385,20 +2504,10 @@ void service_discovery_impl::on_repetition_phase_timer_expired( if (its_timer_pair != repetition_phase_timers_.end()) { std::chrono::milliseconds new_delay(0); std::uint8_t repetition(0); + bool move_to_main(false); if (_repetition <= repetitions_max_) { // sent offers, double time to wait and start timer again. - std::shared_ptr<runtime> its_runtime = runtime_.lock(); - if (its_runtime) { - std::vector<std::shared_ptr<message_impl>> its_messages; - std::shared_ptr<message_impl> its_message = - its_runtime->create_message(); - its_messages.push_back(its_message); - fill_message_with_offer_entries(its_runtime, its_message, - its_messages, its_timer_pair->second, true); - - // Serialize and send - serialize_and_send_messages(its_messages); - } + new_delay = std::chrono::milliseconds(_last_delay * 2); repetition = ++_repetition; } else { @@ -2410,13 +2519,28 @@ void service_discovery_impl::on_repetition_phase_timer_expired( // the cyclic offer delay before moving the offers in to main // phase if (last_offer_shorter_half_offer_delay_ago()) { - move_offers_into_main_phase(_timer); - return; + move_to_main = true; } else { new_delay = cyclic_offer_delay_; repetition = 0; } } + std::shared_ptr<runtime> its_runtime = runtime_.lock(); + if (its_runtime) { + std::vector<std::shared_ptr<message_impl>> its_messages; + std::shared_ptr<message_impl> its_message = + its_runtime->create_message(); + its_messages.push_back(its_message); + fill_message_with_offer_entries(its_runtime, its_message, + its_messages, its_timer_pair->second, true); + + // Serialize and send + serialize_and_send_messages(its_messages); + } + if (move_to_main) { + move_offers_into_main_phase(_timer); + return; + } boost::system::error_code ec; its_timer_pair->first->expires_from_now(new_delay, ec); if (ec) { @@ -2433,6 +2557,55 @@ void service_discovery_impl::on_repetition_phase_timer_expired( } } + +void service_discovery_impl::on_find_repetition_phase_timer_expired( + const boost::system::error_code &_error, + std::shared_ptr<boost::asio::steady_timer> _timer, + std::uint8_t _repetition, std::uint32_t _last_delay) { + if (_error) { + return; + } + + std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_); + auto its_timer_pair = find_repetition_phase_timers_.find(_timer); + if (its_timer_pair != find_repetition_phase_timers_.end()) { + std::chrono::milliseconds new_delay(0); + std::uint8_t repetition(0); + if (_repetition <= repetitions_max_) { + // sent findService entries in one message, double time to wait and start timer again. + std::shared_ptr<runtime> its_runtime = runtime_.lock(); + if (its_runtime) { + std::vector<std::shared_ptr<message_impl>> its_messages; + std::shared_ptr<message_impl> its_message = + its_runtime->create_message(); + its_messages.push_back(its_message); + fill_message_with_find_entries(its_runtime, its_message, + its_messages, its_timer_pair->second); + serialize_and_send_messages(its_messages); + } + new_delay = std::chrono::milliseconds(_last_delay * 2); + repetition = ++_repetition; + } else { + // repetition phase is now over, erase the timer on next expiry time + find_repetition_phase_timers_.erase(its_timer_pair); + return; + } + boost::system::error_code ec; + its_timer_pair->first->expires_from_now(new_delay, ec); + if (ec) { + VSOMEIP_ERROR << + "service_discovery_impl::on_find_repetition_phase_timer_expired " + "setting expiry time of timer failed: " << ec.message(); + } + its_timer_pair->first->async_wait( + std::bind( + &service_discovery_impl::on_find_repetition_phase_timer_expired, + this, std::placeholders::_1, its_timer_pair->first, + repetition, new_delay.count())); + } +} + + void service_discovery_impl::move_offers_into_main_phase( const std::shared_ptr<boost::asio::steady_timer> &_timer) { // HINT: make sure to lock the repetition_phase_timers_mutex_ before calling @@ -2469,6 +2642,27 @@ void service_discovery_impl::fill_message_with_offer_entries( } } +void service_discovery_impl::fill_message_with_find_entries( + std::shared_ptr<runtime> _runtime, + std::shared_ptr<message_impl> _message, + std::vector<std::shared_ptr<message_impl>> &_messages, + const requests_t &_requests) { + uint32_t its_start(0); + uint32_t its_size(0); + bool is_done(false); + while (!is_done) { + insert_find_entries(_message, _requests, its_start, its_size, + is_done); + its_start += its_size / VSOMEIP_SOMEIP_SD_ENTRY_SIZE; + if (!is_done) { + its_start = 0; + _message = _runtime->create_message(); + _messages.push_back(_message); + } + }; +} + + bool service_discovery_impl::serialize_and_send_messages( const std::vector<std::shared_ptr<message_impl>> &_messages) { bool has_sent(false); @@ -2585,7 +2779,7 @@ void service_discovery_impl::on_main_phase_timer_expired( if (_error) { return; } - send(true, false); + send(true); start_main_phase_timer(); } |