diff options
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r-- | implementation/service_discovery/src/service_discovery_impl.cpp | 269 |
1 files changed, 200 insertions, 69 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index 94a03b4..e33027a 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -25,6 +25,7 @@ #include "../../configuration/include/configuration.hpp" #include "../../configuration/include/internal.hpp" #include "../../endpoints/include/endpoint.hpp" +#include "../../endpoints/include/client_endpoint.hpp" #include "../../endpoints/include/endpoint_definition.hpp" #include "../../endpoints/include/tcp_server_endpoint_impl.hpp" #include "../../endpoints/include/udp_server_endpoint_impl.hpp" @@ -41,8 +42,12 @@ service_discovery_impl::service_discovery_impl(service_discovery_host *_host) host_(_host), port_(VSOMEIP_SD_DEFAULT_PORT), reliable_(false), - serializer_(std::make_shared<serializer>()), - deserializer_(std::make_shared<deserializer>()), + serializer_( + std::make_shared<serializer>( + host_->get_configuration()->get_buffer_shrink_threshold())), + deserializer_( + std::make_shared<deserializer>( + host_->get_configuration()->get_buffer_shrink_threshold())), ttl_timer_(_host->get_io()), smallest_ttl_(DEFAULT_TTL), ttl_(VSOMEIP_SD_DEFAULT_TTL), @@ -55,7 +60,7 @@ service_discovery_impl::service_discovery_impl(service_discovery_host *_host) cyclic_offer_delay_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY), offer_debounce_timer_(_host->get_io()), main_phase_timer_(_host->get_io()), - is_suspended_(false) { + is_suspended_(true) { std::chrono::seconds smallest_ttl(DEFAULT_TTL); smallest_ttl_ = std::chrono::duration_cast<std::chrono::milliseconds>(smallest_ttl); @@ -81,6 +86,7 @@ void service_discovery_impl::init() { host_->get_configuration(); if (its_configuration) { unicast_ = its_configuration->get_unicast_address(); + sd_multicast_ = its_configuration->get_sd_multicast(); port_ = its_configuration->get_sd_port(); reliable_ = (its_configuration->get_sd_protocol() @@ -88,14 +94,6 @@ void service_discovery_impl::init() { max_message_size_ = (reliable_ ? VSOMEIP_MAX_TCP_SD_PAYLOAD : VSOMEIP_MAX_UDP_SD_PAYLOAD); - serializer_->create_data( - reliable_ ? - VSOMEIP_MAX_TCP_MESSAGE_SIZE : - VSOMEIP_MAX_UDP_MESSAGE_SIZE); - - endpoint_ = host_->create_service_discovery_endpoint( - its_configuration->get_sd_multicast(), port_, reliable_); - ttl_ = its_configuration->get_sd_ttl(); // generate random initial delay based on initial delay min and max @@ -135,6 +133,26 @@ void service_discovery_impl::init() { } void service_discovery_impl::start() { + if (!endpoint_) { + endpoint_ = host_->create_service_discovery_endpoint( + sd_multicast_, port_, reliable_); + if (!endpoint_) { + VSOMEIP_ERROR << "Couldn't start service discovery"; + return; + } + // Send out pending find services messages if have any + bool send_find(false); + { + std::lock_guard<std::mutex> its_lock(requested_mutex_); + if (requested_.size()) { + send_find = true; + } + } + if (send_find) { + send(false, true); + } + } + is_suspended_ = false; start_main_phase_timer(); start_offer_debounce_timer(true); @@ -174,13 +192,14 @@ void service_discovery_impl::request_service(service_t _service, > (_major, _minor, _ttl); } } - if (is_new_request) { + if (is_new_request && !is_suspended_) { send(false, true); } } void service_discovery_impl::release_service(service_t _service, instance_t _instance) { + std::lock_guard<std::mutex> its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { find_service->second.erase(_instance); @@ -189,6 +208,7 @@ void service_discovery_impl::release_service(service_t _service, std::shared_ptr<request> service_discovery_impl::find_request(service_t _service, instance_t _instance) { + std::lock_guard<std::mutex> its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { auto find_instance = find_service->second.find(_instance); @@ -309,23 +329,44 @@ void service_discovery_impl::get_subscription_endpoints( _unreliable = host_->find_or_create_remote_client(_service, _instance, false, _client); if (_unreliable) { - *_has_address = _unreliable->get_remote_address(*_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>(_unreliable); + if (its_client_endpoint) { + *_has_address = its_client_endpoint->get_remote_address( + *_address); + } } if (_reliable) { - *_has_address = *_has_address - || _reliable->get_remote_address(*_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>(_reliable); + if (its_client_endpoint) { + *_has_address = *_has_address + || its_client_endpoint->get_remote_address( + *_address); + } } break; case subscription_type_e::SU_PREFER_UNRELIABLE: _unreliable = host_->find_or_create_remote_client(_service, _instance, false, _client); if (_unreliable) { - *_has_address = _unreliable->get_remote_address(*_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>(_unreliable); + if (its_client_endpoint) { + *_has_address = its_client_endpoint->get_remote_address( + *_address); + } } else { _reliable = host_->find_or_create_remote_client(_service, _instance, true, _client); if (_reliable) { - *_has_address = _reliable->get_remote_address(*_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>( + _reliable); + if (its_client_endpoint) { + *_has_address = its_client_endpoint->get_remote_address( + *_address); + } } } break; @@ -333,12 +374,23 @@ void service_discovery_impl::get_subscription_endpoints( _reliable = host_->find_or_create_remote_client(_service, _instance, true, _client); if (_reliable) { - *_has_address = _reliable->get_remote_address(*_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>(_reliable); + if (its_client_endpoint) { + *_has_address = its_client_endpoint->get_remote_address( + *_address); + } } else { - _unreliable = host_->find_or_create_remote_client(_service, - _instance, false, _client); + _unreliable = host_->find_or_create_remote_client(_service, + _instance, false, _client); if (_unreliable) { - *_has_address = _unreliable->get_remote_address(*_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>( + _unreliable); + if (its_client_endpoint) { + *_has_address = its_client_endpoint->get_remote_address( + *_address); + } } } break; @@ -347,14 +399,24 @@ void service_discovery_impl::get_subscription_endpoints( _instance, false, _client); if (_unreliable) { - *_has_address = _unreliable->get_remote_address(*_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>(_unreliable); + if (its_client_endpoint) { + *_has_address = its_client_endpoint->get_remote_address( + *_address); + } } break; case subscription_type_e::SU_RELIABLE: _reliable = host_->find_or_create_remote_client(_service, _instance, - true, _client); + true, _client); if (_reliable) { - *_has_address = _reliable->get_remote_address(*_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>(_reliable); + if (its_client_endpoint) { + *_has_address = its_client_endpoint->get_remote_address( + *_address); + } } } } @@ -384,11 +446,25 @@ void service_discovery_impl::unsubscribe(service_t _service, found_eventgroup->second.erase(_client); auto endpoint = its_subscription->get_endpoint(false); if (endpoint) { - has_address = endpoint->get_remote_address(its_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>( + endpoint); + if (its_client_endpoint) { + has_address = + its_client_endpoint->get_remote_address( + its_address); + } } else { endpoint = its_subscription->get_endpoint(true); if (endpoint) { - has_address = endpoint->get_remote_address(its_address); + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast< + client_endpoint>(endpoint); + if (its_client_endpoint) { + has_address = + its_client_endpoint->get_remote_address( + its_address); + } } else { return; } @@ -422,6 +498,70 @@ void service_discovery_impl::unsubscribe_all(service_t _service, instance_t _ins } } +void service_discovery_impl::unsubscribe_client(service_t _service, + instance_t _instance, + client_t _client) { + std::shared_ptr<runtime> its_runtime = runtime_.lock(); + if (!its_runtime) { + return; + } + std::shared_ptr < message_impl > its_message = its_runtime->create_message(); + boost::asio::ip::address its_address; + bool has_address(false); + { + std::lock_guard<std::mutex> its_lock(subscribed_mutex_); + std::shared_ptr < subscription > its_subscription; + auto found_service = subscribed_.find(_service); + if (found_service != subscribed_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + for (auto &found_eventgroup : found_instance->second) { + auto found_client = found_eventgroup.second.find(_client); + if (found_client != found_eventgroup.second.end()) { + its_subscription = found_client->second; + its_subscription->set_ttl(0); + found_eventgroup.second.erase(_client); + if (!has_address) { + auto endpoint = its_subscription->get_endpoint( + false); + if (endpoint) { + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast< + client_endpoint>(endpoint); + if (its_client_endpoint) { + has_address = + its_client_endpoint->get_remote_address( + its_address); + } + } else { + endpoint = its_subscription->get_endpoint(true); + if (endpoint) { + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast< + client_endpoint>(endpoint); + if (its_client_endpoint) { + has_address = + its_client_endpoint->get_remote_address( + its_address); + } + } else { + return; + } + } + } + insert_subscription(its_message, _service, _instance, + found_eventgroup.first, its_subscription, true, + true); + } + } + } + } + } + if (has_address && 0 < its_message->get_entries().size()) { + serialize_and_send(its_message, its_address); + } +} + std::pair<session_t, bool> service_discovery_impl::get_session( const boost::asio::ip::address &_address) { std::pair<session_t, bool> its_session; @@ -628,14 +768,14 @@ void service_discovery_impl::insert_option( } void service_discovery_impl::insert_find_entries( - std::shared_ptr<message_impl> &_message, requests_t &_requests, + std::shared_ptr<message_impl> &_message, uint32_t _start, uint32_t &_size, bool &_done) { std::lock_guard<std::mutex> its_lock(requested_mutex_); uint32_t its_size(0); uint32_t i = 0; _done = true; - for (auto its_service : _requests) { + for (auto its_service : requested_) { for (auto its_instance : its_service.second) { auto its_request = its_instance.second; uint8_t its_sent_counter = its_request->get_sent_counter(); @@ -838,7 +978,7 @@ bool service_discovery_impl::send(bool _is_announcing, bool _is_find) { its_message = its_runtime->create_message(); its_messages.push_back(its_message); - insert_find_entries(its_message, requested_, its_start, its_size, is_done); + insert_find_entries(its_message, its_start, its_size, is_done); its_start += its_size / VSOMEIP_SOMEIP_SD_ENTRY_SIZE; }; its_remaining -= its_size; @@ -868,14 +1008,16 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, msg << "sdi::on_message: "; for (length_t i = 0; i < _length; ++i) msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; - VSOMEIP_DEBUG << msg.str(); + VSOMEIP_INFO << msg.str(); #endif if(is_suspended_) { return; } + current_remote_address_ = _sender; deserializer_->set_data(_data, _length); std::shared_ptr < message_impl > its_message(deserializer_->deserialize_sd_message()); + deserializer_->reset(); if (its_message) { // ignore all SD messages with source address equal to node's unicast address if (!check_source_address(_sender)) { @@ -1195,13 +1337,12 @@ void service_discovery_impl::send_unicast_offer_service( uint32_t its_size(max_message_size_); insert_offer_service(its_message, _service, _instance, _info, its_size); - const boost::asio::ip::address its_address(get_current_remote_address()); - if (its_address.is_unspecified()) { + if (current_remote_address_.is_unspecified()) { VSOMEIP_ERROR << "service_discovery_impl::" "send_unicast_offer_service current remote address " "is unspecified, won't send offer."; } else { - serialize_and_send(its_message, its_address); + serialize_and_send(its_message, current_remote_address_); } } } @@ -1891,14 +2032,13 @@ bool service_discovery_impl::is_tcp_connected(service_t _service, std::shared_ptr<serviceinfo> its_info = found_instance->second; if(its_info) { //get reliable server endpoint - auto its_reliable_endpoint = its_info->get_endpoint(true); - if(its_reliable_endpoint) { - std::shared_ptr<tcp_server_endpoint_impl> its_ptr(std::static_pointer_cast<tcp_server_endpoint_impl>(its_reliable_endpoint)); - if( !its_ptr->is_established(its_endpoint)) { - } - else { - is_connected = true; - } + auto its_reliable_server_endpoint = + std::dynamic_pointer_cast<tcp_server_endpoint_impl>( + its_info->get_endpoint(true)); + if (its_reliable_server_endpoint + && its_reliable_server_endpoint->is_established( + its_endpoint)) { + is_connected = true; } } } @@ -1947,24 +2087,6 @@ void service_discovery_impl::check_ttl(const boost::system::error_code &_error) } } -boost::asio::ip::address service_discovery_impl::get_current_remote_address() const { - boost::asio::ip::address its_address; - if (reliable_) { - auto endpoint = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(endpoint_); - if (endpoint && !endpoint->get_remote_address(its_address)) { - VSOMEIP_ERROR << "service_discovery_impl::get_current_remote_address: " - "couldn't determine remote address (reliable)"; - } - } else { - auto endpoint = std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint_); - if (endpoint && !endpoint->get_remote_address(its_address)) { - VSOMEIP_ERROR << "service_discovery_impl::get_current_remote_address: " - "couldn't determine remote address (unreliable)"; - } - } - return its_address; -} - bool service_discovery_impl::check_static_header_fields( const std::shared_ptr<const message> &_message) const { if(_message->get_protocol_version() != protocol_version) { @@ -2031,7 +2153,14 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ found_client->second->set_endpoint(its_unreliable, false); } if (endpoint) { - endpoint->get_remote_address(its_address); + if (!has_address) { + VSOMEIP_WARNING << "service_discovery_impl::" + "send_subscriptions couldn't determine " + "address for service.instance: " + << std::hex << std::setw(4) << std::setfill('0') + << _service << "." << _instance; + continue; + } std::shared_ptr<message_impl> its_message = its_runtime->create_message(); @@ -2119,7 +2248,7 @@ bool service_discovery_impl::check_ipv4_address( } else { #if 0 - VSOMEIP_DEBUG << "First 3 triples of subscribers endpoint IP address are valid!"; + VSOMEIP_INFO << "First 3 triples of subscribers endpoint IP address are valid!"; #endif } } @@ -2418,16 +2547,18 @@ bool service_discovery_impl::send_stop_offer( std::shared_ptr<serviceinfo> _info) { std::shared_ptr < runtime > its_runtime = runtime_.lock(); if (its_runtime) { - std::vector<std::shared_ptr<message_impl>> its_messages; - std::shared_ptr<message_impl> its_message; - its_message = its_runtime->create_message(); - its_messages.push_back(its_message); + if (_info->get_endpoint(false) || _info->get_endpoint(true)) { + std::vector<std::shared_ptr<message_impl>> its_messages; + std::shared_ptr<message_impl> its_message; + its_message = its_runtime->create_message(); + its_messages.push_back(its_message); - uint32_t its_size(max_message_size_); - insert_offer_service(its_message, _service, _instance, _info, its_size); + uint32_t its_size(max_message_size_); + insert_offer_service(its_message, _service, _instance, _info, its_size); - // Serialize and send - return serialize_and_send_messages(its_messages); + // Serialize and send + return serialize_and_send_messages(its_messages); + } } return false; } |