diff options
Diffstat (limited to 'implementation/service_discovery')
13 files changed, 299 insertions, 219 deletions
diff --git a/implementation/service_discovery/include/entry_impl.hpp b/implementation/service_discovery/include/entry_impl.hpp index db12a1e..b41a94c 100755 --- a/implementation/service_discovery/include/entry_impl.hpp +++ b/implementation/service_discovery/include/entry_impl.hpp @@ -40,7 +40,7 @@ public: void set_instance(instance_t _instance);
major_version_t get_major_version() const;
- void set_major_version(major_version_t _version);
+ void set_major_version(major_version_t _major_version);
ttl_t get_ttl() const;
void set_ttl(ttl_t _ttl);
@@ -72,7 +72,7 @@ protected: std::uint8_t index2_;
entry_impl();
- entry_impl(const entry_impl &entry_);
+ entry_impl(const entry_impl &_entry);
};
} // namespace sd
diff --git a/implementation/service_discovery/include/ip_option_impl.hpp b/implementation/service_discovery/include/ip_option_impl.hpp index 1345835..a2a7660 100644 --- a/implementation/service_discovery/include/ip_option_impl.hpp +++ b/implementation/service_discovery/include/ip_option_impl.hpp @@ -17,7 +17,7 @@ class ip_option_impl: public option_impl { public: ip_option_impl(); virtual ~ip_option_impl(); - virtual bool operator ==(const ip_option_impl &_option) const; + virtual bool operator ==(const ip_option_impl &_other) const; uint16_t get_port() const; void set_port(uint16_t _port); diff --git a/implementation/service_discovery/include/message_impl.hpp b/implementation/service_discovery/include/message_impl.hpp index baca328..83d3a64 100755 --- a/implementation/service_discovery/include/message_impl.hpp +++ b/implementation/service_discovery/include/message_impl.hpp @@ -98,7 +98,7 @@ public: void forced_initial_events_add(forced_initial_events_t _entry);
const std::vector<forced_initial_events_t> forced_initial_events_get();
- void set_initial_events_required(bool _initial_events);
+ void set_initial_events_required(bool _initial_events_required);
bool initial_events_required() const;
private:
diff --git a/implementation/service_discovery/include/runtime.hpp b/implementation/service_discovery/include/runtime.hpp index f99fec8..670a7bb 100644 --- a/implementation/service_discovery/include/runtime.hpp +++ b/implementation/service_discovery/include/runtime.hpp @@ -10,6 +10,8 @@ namespace vsomeip {
+class configuration;
+
namespace sd {
class message_impl;
@@ -22,7 +24,8 @@ public: }
virtual std::shared_ptr<service_discovery> create_service_discovery(
- service_discovery_host *_host) const = 0;
+ service_discovery_host *_host,
+ std::shared_ptr<vsomeip::configuration> _configuration) const = 0;
virtual std::shared_ptr<message_impl> create_message() const = 0;
};
diff --git a/implementation/service_discovery/include/runtime_impl.hpp b/implementation/service_discovery/include/runtime_impl.hpp index dcdb7d7..0a9baaf 100644 --- a/implementation/service_discovery/include/runtime_impl.hpp +++ b/implementation/service_discovery/include/runtime_impl.hpp @@ -20,7 +20,8 @@ public: virtual ~runtime_impl(); std::shared_ptr<service_discovery> create_service_discovery( - service_discovery_host *_host) const; + service_discovery_host *_host, + std::shared_ptr<configuration> _configuration) const; std::shared_ptr<message_impl> create_message() const; }; diff --git a/implementation/service_discovery/include/service_discovery.hpp b/implementation/service_discovery/include/service_discovery.hpp index 92d80d3..e1c3dc4 100644 --- a/implementation/service_discovery/include/service_discovery.hpp +++ b/implementation/service_discovery/include/service_discovery.hpp @@ -11,6 +11,7 @@ #include <vsomeip/primitive_types.hpp> #include <vsomeip/enumeration_types.hpp> +#include <vsomeip/handler.hpp> #include "../../routing/include/serviceinfo.hpp" #include "../../endpoints/include/endpoint.hpp" #include "../include/service_discovery_host.hpp" @@ -26,7 +27,6 @@ public: virtual ~service_discovery() { } - virtual std::shared_ptr<configuration> get_configuration() const = 0; virtual boost::asio::io_service & get_io() = 0; virtual void init() = 0; @@ -72,6 +72,11 @@ public: service_t _service, instance_t _instance, eventgroup_t _eventgroup, client_t _client, bool _accepted, const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) = 0; + + virtual void register_offer_acceptance_handler( + vsomeip::offer_acceptance_handler_t _handler) = 0; + virtual void register_reboot_notification_handler( + reboot_notification_handler_t _handler) = 0; }; } // namespace sd diff --git a/implementation/service_discovery/include/service_discovery_host.hpp b/implementation/service_discovery/include/service_discovery_host.hpp index f9e8f29..9c346fd 100644 --- a/implementation/service_discovery/include/service_discovery_host.hpp +++ b/implementation/service_discovery/include/service_discovery_host.hpp @@ -31,7 +31,6 @@ public: } virtual boost::asio::io_service & get_io() = 0; - virtual const std::shared_ptr<configuration> get_configuration() const = 0; virtual std::shared_ptr<endpoint> create_service_discovery_endpoint( const std::string &_address, uint16_t _port, bool _reliable) = 0; @@ -90,7 +89,7 @@ public: virtual bool has_identified(client_t _client, service_t _service, instance_t _instance, bool _reliable) = 0; - virtual std::chrono::steady_clock::time_point expire_subscriptions() = 0; + virtual std::chrono::steady_clock::time_point expire_subscriptions(bool _force) = 0; virtual std::shared_ptr<serviceinfo> get_offered_service( service_t _service, instance_t _instance) const = 0; diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 995cfbc..77dbae9 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -56,10 +56,10 @@ struct subscriber_t { class service_discovery_impl: public service_discovery, public std::enable_shared_from_this<service_discovery_impl> { public: - service_discovery_impl(service_discovery_host *_host); + service_discovery_impl(service_discovery_host *_host, + std::shared_ptr<configuration> _configuration); virtual ~service_discovery_impl(); - std::shared_ptr<configuration> get_configuration() const; boost::asio::io_service & get_io(); void init(); @@ -102,6 +102,10 @@ public: service_t _service, instance_t _instance, eventgroup_t _eventgroup, client_t _client, bool _acknowledged, const std::shared_ptr<sd_message_identifier_t> &_sd_message_id); + + void register_offer_acceptance_handler(offer_acceptance_handler_t _handler); + void register_reboot_notification_handler( + reboot_notification_handler_t _handler); private: std::pair<session_t, bool> get_session(const boost::asio::ip::address &_address); void increment_session(const boost::asio::ip::address &_address); @@ -149,7 +153,9 @@ private: void process_serviceentry(std::shared_ptr<serviceentry_impl> &_entry, const std::vector<std::shared_ptr<option_impl> > &_options, - bool _unicast_flag, std::vector<std::pair<std::uint16_t, std::shared_ptr<message_impl>>>* _resubscribes); + bool _unicast_flag, + std::vector<std::pair<std::uint16_t, std::shared_ptr<message_impl>>>* _resubscribes, + bool _accept_offers); void process_offerservice_serviceentry( service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl, @@ -210,7 +216,7 @@ private: void stop_subscription_expiration_timer_unlocked(); void expire_subscriptions(const boost::system::error_code &_error); - bool check_ipv4_address(boost::asio::ip::address its_address); + bool check_ipv4_address(const boost::asio::ip::address& its_address) const; bool check_static_header_fields( const std::shared_ptr<const message> &_message) const; @@ -346,6 +352,7 @@ private: private: boost::asio::io_service &io_; service_discovery_host *host_; + std::shared_ptr<configuration> configuration_; boost::asio::ip::address unicast_; uint16_t port_; @@ -443,6 +450,12 @@ private: std::mutex remote_offer_types_mutex_; std::map<std::pair<service_t, instance_t>, remote_offer_type_e> remote_offer_types_; std::map<boost::asio::ip::address, std::set<std::pair<service_t, instance_t>>> remote_offers_by_ip_; + + reboot_notification_handler_t reboot_notification_handler_; + offer_acceptance_handler_t offer_acceptance_handler_; + + std::mutex offer_mutex_; + std::mutex check_ttl_mutex_; }; } // namespace sd diff --git a/implementation/service_discovery/src/configuration_option_impl.cpp b/implementation/service_discovery/src/configuration_option_impl.cpp index e9cf058..e09c7c9 100755 --- a/implementation/service_discovery/src/configuration_option_impl.cpp +++ b/implementation/service_discovery/src/configuration_option_impl.cpp @@ -100,7 +100,7 @@ bool configuration_option_impl::deserialize(vsomeip::deserializer *_from) { is_successful = is_successful && _from->deserialize(l_itemLength);
if (l_itemLength > 0) {
is_successful = is_successful
- && _from->deserialize((uint8_t*) &l_item[0], l_itemLength);
+ && _from->deserialize(l_item, static_cast<std::size_t>(l_itemLength));
if (is_successful) {
size_t l_eqPos = l_item.find('='); //SWS_SD_00292
diff --git a/implementation/service_discovery/src/eventgroupentry_impl.cpp b/implementation/service_discovery/src/eventgroupentry_impl.cpp index f5394be..17fd63b 100755 --- a/implementation/service_discovery/src/eventgroupentry_impl.cpp +++ b/implementation/service_discovery/src/eventgroupentry_impl.cpp @@ -150,37 +150,42 @@ bool eventgroupentry_impl::is_matching_subscribe( // read out ip options of current and _other
std::vector<std::shared_ptr<ip_option_impl>> its_options_current;
std::vector<std::shared_ptr<ip_option_impl>> its_options_other;
+ const std::size_t its_options_size = _options.size();
for (const auto option_run : {0,1}) {
for (const auto option_index : options_[option_run]) {
- switch (_options[option_index]->get_type()) {
- case option_type_e::IP4_ENDPOINT:
- its_options_current.push_back(
- std::static_pointer_cast<ipv4_option_impl>(
- _options[option_index]));
- break;
- case option_type_e::IP6_ENDPOINT:
- its_options_current.push_back(
- std::static_pointer_cast<ipv6_option_impl>(
- _options[option_index]));
- break;
- default:
- break;
+ if (its_options_size > option_index) {
+ switch (_options[option_index]->get_type()) {
+ case option_type_e::IP4_ENDPOINT:
+ its_options_current.push_back(
+ std::static_pointer_cast<ipv4_option_impl>(
+ _options[option_index]));
+ break;
+ case option_type_e::IP6_ENDPOINT:
+ its_options_current.push_back(
+ std::static_pointer_cast<ipv6_option_impl>(
+ _options[option_index]));
+ break;
+ default:
+ break;
+ }
}
}
for (const auto option_index : _other.options_[option_run]) {
- switch (_options[option_index]->get_type()) {
- case option_type_e::IP4_ENDPOINT:
- its_options_other.push_back(
- std::static_pointer_cast<ipv4_option_impl>(
- _options[option_index]));
- break;
- case option_type_e::IP6_ENDPOINT:
- its_options_other.push_back(
- std::static_pointer_cast<ipv6_option_impl>(
- _options[option_index]));
- break;
- default:
- break;
+ if (its_options_size > option_index) {
+ switch (_options[option_index]->get_type()) {
+ case option_type_e::IP4_ENDPOINT:
+ its_options_other.push_back(
+ std::static_pointer_cast<ipv4_option_impl>(
+ _options[option_index]));
+ break;
+ case option_type_e::IP6_ENDPOINT:
+ its_options_other.push_back(
+ std::static_pointer_cast<ipv6_option_impl>(
+ _options[option_index]));
+ break;
+ default:
+ break;
+ }
}
}
}
diff --git a/implementation/service_discovery/src/message_impl.cpp b/implementation/service_discovery/src/message_impl.cpp index 9d5c1ac..e3b54ec 100755 --- a/implementation/service_discovery/src/message_impl.cpp +++ b/implementation/service_discovery/src/message_impl.cpp @@ -196,17 +196,17 @@ bool message_impl::serialize(vsomeip::serializer *_to) const { uint32_t entries_length = uint32_t(entries_.size() * VSOMEIP_SOMEIP_SD_ENTRY_SIZE);
is_successful = is_successful && _to->serialize(entries_length);
- for (auto it = entries_.begin(); it != entries_.end(); ++it)
- is_successful = is_successful && (*it)->serialize(_to);
+ for (const auto& its_entry : entries_)
+ is_successful = is_successful && its_entry && its_entry->serialize(_to);
uint32_t options_length = 0;
- for (auto its_option : options_)
- options_length += its_option->get_length()
- + VSOMEIP_SOMEIP_SD_OPTION_HEADER_SIZE;
+ for (const auto& its_option : options_)
+ options_length += its_option ? its_option->get_length()
+ + VSOMEIP_SOMEIP_SD_OPTION_HEADER_SIZE : 0;
is_successful = is_successful && _to->serialize(options_length);
- for (auto its_option : options_)
- is_successful = is_successful && its_option->serialize(_to);
+ for (const auto& its_option : options_)
+ is_successful = is_successful && its_option && its_option->serialize(_to);
return is_successful;
}
@@ -231,6 +231,14 @@ bool message_impl::deserialize(vsomeip::deserializer *_from) { // backup the current remaining length
uint32_t save_remaining = uint32_t(_from->get_remaining());
+ if (!is_successful) {
+ // couldn't deserialize entries length
+ return is_successful;
+ } else if (entries_length > save_remaining) {
+ // not enough data available to deserialize entries array
+ is_successful = false;
+ return is_successful;
+ }
// set remaining bytes to length of entries array
_from->set_remaining(entries_length);
diff --git a/implementation/service_discovery/src/runtime_impl.cpp b/implementation/service_discovery/src/runtime_impl.cpp index 90743b8..d72d034 100644 --- a/implementation/service_discovery/src/runtime_impl.cpp +++ b/implementation/service_discovery/src/runtime_impl.cpp @@ -25,8 +25,9 @@ runtime_impl::~runtime_impl() { } std::shared_ptr<service_discovery> runtime_impl::create_service_discovery( - service_discovery_host *_host) const { - return std::make_shared < service_discovery_impl > (_host); + service_discovery_host *_host, + std::shared_ptr<configuration> _configuration) const { + return std::make_shared < service_discovery_impl > (_host, _configuration); } std::shared_ptr<message_impl> runtime_impl::create_message() const { diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index 1f93559..de5f190 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -34,21 +34,22 @@ #include "../../routing/include/eventgroupinfo.hpp" #include "../../routing/include/serviceinfo.hpp" #include "../../plugin/include/plugin_manager.hpp" +#include "../../utility/include/byteorder.hpp" namespace vsomeip { namespace sd { -service_discovery_impl::service_discovery_impl(service_discovery_host *_host) +service_discovery_impl::service_discovery_impl(service_discovery_host *_host, + std::shared_ptr<configuration> _configuration) : io_(_host->get_io()), host_(_host), + configuration_(_configuration), port_(VSOMEIP_SD_DEFAULT_PORT), reliable_(false), - serializer_( - std::make_shared<serializer>( - host_->get_configuration()->get_buffer_shrink_threshold())), - deserializer_( - std::make_shared<deserializer>( - host_->get_configuration()->get_buffer_shrink_threshold())), + serializer_(std::make_shared<serializer>( + configuration_->get_buffer_shrink_threshold())), + deserializer_(std::make_shared<deserializer>( + configuration_->get_buffer_shrink_threshold())), ttl_timer_(_host->get_io()), ttl_timer_runtime_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY / 2), ttl_(VSOMEIP_SD_DEFAULT_TTL), @@ -75,10 +76,6 @@ service_discovery_impl::service_discovery_impl(service_discovery_host *_host) service_discovery_impl::~service_discovery_impl() { } -std::shared_ptr<configuration> service_discovery_impl::get_configuration() const { - return host_->get_configuration(); -} - boost::asio::io_service & service_discovery_impl::get_io() { return io_; } @@ -86,62 +83,55 @@ boost::asio::io_service & service_discovery_impl::get_io() { void service_discovery_impl::init() { runtime_ = std::dynamic_pointer_cast<sd::runtime>(plugin_manager::get()->get_plugin(plugin_type_e::SD_RUNTIME_PLUGIN, VSOMEIP_SD_LIBRARY)); - std::shared_ptr < configuration > its_configuration = - host_->get_configuration(); - if (its_configuration) { - unicast_ = its_configuration->get_unicast_address(); - sd_multicast_ = its_configuration->get_sd_multicast(); - boost::system::error_code ec; - sd_multicast_address_ = boost::asio::ip::address::from_string(sd_multicast_, ec); - - port_ = its_configuration->get_sd_port(); - reliable_ = (its_configuration->get_sd_protocol() - == "tcp"); - max_message_size_ = (reliable_ ? VSOMEIP_MAX_TCP_SD_PAYLOAD : - VSOMEIP_MAX_UDP_SD_PAYLOAD); - - ttl_ = its_configuration->get_sd_ttl(); - - // generate random initial delay based on initial delay min and max - std::int32_t initial_delay_min = - its_configuration->get_sd_initial_delay_min(); - if (initial_delay_min < 0) { - initial_delay_min = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MIN; - } - std::int32_t initial_delay_max = - its_configuration->get_sd_initial_delay_max(); - if (initial_delay_max < 0) { - initial_delay_max = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MAX; - } - if (initial_delay_min > initial_delay_max) { - const std::uint32_t tmp(initial_delay_min); - initial_delay_min = initial_delay_max; - initial_delay_max = tmp; - } - - std::random_device r; - std::mt19937 e(r()); - std::uniform_int_distribution<std::uint32_t> distribution( - initial_delay_min, initial_delay_max); - initial_delay_ = std::chrono::milliseconds(distribution(e)); - - - repetitions_base_delay_ = std::chrono::milliseconds( - its_configuration->get_sd_repetitions_base_delay()); - repetitions_max_ = its_configuration->get_sd_repetitions_max(); - cyclic_offer_delay_ = std::chrono::milliseconds( - its_configuration->get_sd_cyclic_offer_delay()); - offer_debounce_time_ = std::chrono::milliseconds( - its_configuration->get_sd_offer_debounce_time()); - ttl_timer_runtime_ = cyclic_offer_delay_ / 2; - - ttl_factor_offers_ = its_configuration->get_ttl_factor_offers(); - ttl_factor_subscriptions_ = its_configuration->get_ttl_factor_subscribes(); - last_msg_received_timer_timeout_ = cyclic_offer_delay_ - + (cyclic_offer_delay_ / 10); - } else { - VSOMEIP_ERROR << "SD: no configuration found!"; + unicast_ = configuration_->get_unicast_address(); + sd_multicast_ = configuration_->get_sd_multicast(); + boost::system::error_code ec; + sd_multicast_address_ = boost::asio::ip::address::from_string(sd_multicast_, ec); + + port_ = configuration_->get_sd_port(); + reliable_ = (configuration_->get_sd_protocol() == "tcp"); + max_message_size_ = (reliable_ ? VSOMEIP_MAX_TCP_SD_PAYLOAD : + VSOMEIP_MAX_UDP_SD_PAYLOAD); + + ttl_ = configuration_->get_sd_ttl(); + + // generate random initial delay based on initial delay min and max + std::int32_t initial_delay_min = + configuration_->get_sd_initial_delay_min(); + if (initial_delay_min < 0) { + initial_delay_min = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MIN; + } + std::int32_t initial_delay_max = + configuration_->get_sd_initial_delay_max(); + if (initial_delay_max < 0) { + initial_delay_max = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MAX; } + if (initial_delay_min > initial_delay_max) { + const std::uint32_t tmp(initial_delay_min); + initial_delay_min = initial_delay_max; + initial_delay_max = tmp; + } + + std::random_device r; + std::mt19937 e(r()); + std::uniform_int_distribution<std::uint32_t> distribution( + initial_delay_min, initial_delay_max); + initial_delay_ = std::chrono::milliseconds(distribution(e)); + + + repetitions_base_delay_ = std::chrono::milliseconds( + configuration_->get_sd_repetitions_base_delay()); + repetitions_max_ = configuration_->get_sd_repetitions_max(); + cyclic_offer_delay_ = std::chrono::milliseconds( + configuration_->get_sd_cyclic_offer_delay()); + offer_debounce_time_ = std::chrono::milliseconds( + configuration_->get_sd_offer_debounce_time()); + ttl_timer_runtime_ = cyclic_offer_delay_ / 2; + + ttl_factor_offers_ = configuration_->get_ttl_factor_offers(); + ttl_factor_subscriptions_ = configuration_->get_ttl_factor_subscribes(); + last_msg_received_timer_timeout_ = cyclic_offer_delay_ + + (cyclic_offer_delay_ / 10); } void service_discovery_impl::start() { @@ -300,7 +290,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, if (its_offer_type == remote_offer_type_e::UNRELIABLE && !its_subscription->get_endpoint(true) && its_subscription->get_endpoint(false)) { - if (its_subscription->get_endpoint(false)->is_connected()) { + if (its_subscription->get_endpoint(false)->is_established()) { insert_subscription(its_message, _service, _instance, _eventgroup, @@ -311,7 +301,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, } else if (its_offer_type == remote_offer_type_e::RELIABLE && its_subscription->get_endpoint(true) && !its_subscription->get_endpoint(false)) { - if (its_subscription->get_endpoint(true)->is_connected()) { + if (its_subscription->get_endpoint(true)->is_established()) { insert_subscription(its_message, _service, _instance, _eventgroup, @@ -322,17 +312,17 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE && its_subscription->get_endpoint(true) && its_subscription->get_endpoint(false)) { - if (its_subscription->get_endpoint(true)->is_connected() && - its_subscription->get_endpoint(false)->is_connected()) { + if (its_subscription->get_endpoint(true)->is_established() && + its_subscription->get_endpoint(false)->is_established()) { insert_subscription(its_message, _service, _instance, _eventgroup, its_subscription, its_offer_type); } else { - if (!its_subscription->get_endpoint(true)->is_connected()) { + if (!its_subscription->get_endpoint(true)->is_established()) { its_subscription->set_tcp_connection_established(false); } - if (!its_subscription->get_endpoint(false)->is_connected()) { + if (!its_subscription->get_endpoint(false)->is_established()) { its_subscription->set_udp_connection_established(false); } } @@ -446,13 +436,7 @@ void service_discovery_impl::unsubscribe_all(service_t _service, instance_t _ins if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { - for (auto &its_eventgroup : found_instance->second) { - for (auto its_client : its_eventgroup.second) { - its_client.second->set_acknowledged(true); - its_client.second->set_endpoint(nullptr, true); - its_client.second->set_endpoint(nullptr, false); - } - } + found_instance->second.clear(); } } } @@ -813,11 +797,15 @@ void service_discovery_impl::insert_offer_entries( for (const auto its_service : _services) { for (const auto its_instance : its_service.second) { if ((!is_suspended_) - && ((!is_diagnosis_) || (is_diagnosis_ && !host_->get_configuration()->is_someip(its_service.first, its_instance.first)))) { + && ((!is_diagnosis_) + || (is_diagnosis_ + && !configuration_->is_someip(its_service.first, + its_instance.first)))) { // Only insert services with configured endpoint(s) if ((_ignore_phase || its_instance.second->is_in_mainphase()) && (its_instance.second->get_endpoint(false) - || its_instance.second->get_endpoint(true))) { + || its_instance.second->get_endpoint(true)) + && its_instance.second->get_ttl() > 0) { if (i >= _start) { if (!insert_offer_service(_message, its_service.first, its_instance.first, its_instance.second, its_size)) { @@ -935,8 +923,7 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared // Two entries: Stop subscribe & subscribe within one SD-Message // One option: Both entries reference it - const std::function<std::shared_ptr<eventgroupentry_impl>(ttl_t)> insert_entry - = [&](ttl_t _ttl) { + auto insert_entry = [&](ttl_t _ttl) { std::shared_ptr<eventgroupentry_impl> its_entry = _message->create_eventgroup_entry(); // SUBSCRIBE_EVENTGROUP and STOP_SUBSCRIBE_EVENTGROUP are identical @@ -955,7 +942,7 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared if (_offer_type == remote_offer_type_e::UNRELIABLE && !its_reliable_endpoint && its_unreliable_endpoint) { - if (its_unreliable_endpoint->is_connected()) { + if (its_unreliable_endpoint->is_established()) { const std::uint16_t its_port = its_unreliable_endpoint->get_local_port(); if (its_port) { std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0); @@ -975,7 +962,7 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared } } else if (_offer_type == remote_offer_type_e::RELIABLE && its_reliable_endpoint && !its_unreliable_endpoint) { - if (its_reliable_endpoint->is_connected()) { + if (its_reliable_endpoint->is_established()) { const std::uint16_t its_port = its_reliable_endpoint->get_local_port(); if (its_port) { std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0); @@ -995,8 +982,8 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared } } else if (_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE && its_reliable_endpoint && its_unreliable_endpoint) { - if (its_reliable_endpoint->is_connected() && - its_unreliable_endpoint->is_connected()) { + if (its_reliable_endpoint->is_established() && + its_unreliable_endpoint->is_established()) { const std::uint16_t its_reliable_port = its_reliable_endpoint->get_local_port(); const std::uint16_t its_unreliable_port = its_unreliable_endpoint->get_local_port(); if (its_reliable_port && its_unreliable_port) { @@ -1021,10 +1008,10 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } } else { - if (!its_reliable_endpoint->is_connected()) { + if (!its_reliable_endpoint->is_established()) { _subscription->set_tcp_connection_established(false); } - if (!its_unreliable_endpoint->is_connected()) { + if (!its_unreliable_endpoint->is_established()) { _subscription->set_udp_connection_established(false); } } @@ -1127,6 +1114,7 @@ bool service_discovery_impl::send(bool _is_announcing) { its_message = its_runtime->create_message(); its_messages.push_back(its_message); + std::lock_guard<std::mutex> its_lock(offer_mutex_); services_t its_offers = host_->get_offered_services(); fill_message_with_offer_entries(its_runtime, its_message, its_messages, its_offers, false); @@ -1149,6 +1137,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; VSOMEIP_INFO << msg.str(); #endif + std::lock_guard<std::mutex> its_lock(check_ttl_mutex_); std::lock_guard<std::mutex> its_session_lock(sessions_received_mutex_); if(is_suspended_) { @@ -1186,8 +1175,18 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, remove_remote_offer_type_by_ip(_sender); host_->expire_subscriptions(_sender); host_->expire_services(_sender); + if (reboot_notification_handler_) { + ip_address_t ip; + if (_sender.is_v4()) { + ip.address_.v4_ = _sender.to_v4().to_bytes(); + ip.is_v4_ = true; + } else { + ip.address_.v6_ = _sender.to_v6().to_bytes(); + ip.is_v4_ = false; + } + reboot_notification_handler_(ip); + } } - std::vector < std::shared_ptr<option_impl> > its_options = its_message->get_options(); @@ -1199,7 +1198,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, std::shared_ptr < message_impl > its_message_response = its_runtime->create_message(); - const std::uint8_t its_required_acks = + std::uint8_t its_required_acks = its_message->get_number_required_acks(); its_message_response->set_number_required_acks(its_required_acks); std::shared_ptr<sd_message_identifier_t> its_message_id = @@ -1215,27 +1214,66 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, const message_impl::entries_t::const_iterator its_end = its_entries.end(); bool is_stop_subscribe_subscribe(false); + bool offer_acceptance_queried(false); + bool accept_offers(false); + bool expired_services(false); + for (auto iter = its_entries.begin(); iter != its_end; iter++) { + if (!offer_acceptance_queried) { + if (offer_acceptance_handler_) { + ip_address_t ip; + if (configuration_->offer_acceptance_required(_sender)) { + if (_sender.is_v4()) { + ip.address_.v4_ = _sender.to_v4().to_bytes(); + ip.is_v4_ = true; + } else { + ip.address_.v6_ = _sender.to_v6().to_bytes(); + ip.is_v4_ = false; + } + accept_offers = offer_acceptance_handler_(ip); + if (!accept_offers && !expired_services) { + + VSOMEIP_INFO << "service_discovery_impl::on_message: Do not accept offer / subscribe from: " + << std::hex << std::setw(4) << std::setfill('0') << _sender.to_string(); + + remove_remote_offer_type_by_ip(_sender); + host_->expire_subscriptions(_sender); + host_->expire_services(_sender); + expired_services = true; + } + } else { + accept_offers = true; + } + offer_acceptance_queried = true; + } else { + offer_acceptance_queried = true; + accept_offers = true; + } + } if ((*iter)->is_service_entry()) { std::shared_ptr < serviceentry_impl > its_service_entry = std::dynamic_pointer_cast < serviceentry_impl > (*iter); bool its_unicast_flag = its_message->get_unicast_flag(); process_serviceentry(its_service_entry, its_options, - its_unicast_flag, &its_resubscribes); + its_unicast_flag, &its_resubscribes, accept_offers); } else { - std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry = - std::dynamic_pointer_cast < eventgroupentry_impl - > (*iter); - bool force_initial_events(false); - if (is_stop_subscribe_subscribe) { - force_initial_events = true; + if (accept_offers) { + std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry = + std::dynamic_pointer_cast < eventgroupentry_impl + > (*iter); + bool force_initial_events(false); + if (is_stop_subscribe_subscribe) { + force_initial_events = true; + } + is_stop_subscribe_subscribe = check_stop_subscribe_subscribe( + iter, its_end, its_message->get_options()); + process_eventgroupentry(its_eventgroup_entry, its_options, + its_message_response, _destination, + its_message_id, is_stop_subscribe_subscribe, force_initial_events); + } else { + its_required_acks = 0; } - is_stop_subscribe_subscribe = check_stop_subscribe_subscribe( - iter, its_end, its_message->get_options()); - process_eventgroupentry(its_eventgroup_entry, its_options, - its_message_response, _destination, - its_message_id, is_stop_subscribe_subscribe, force_initial_events); } } @@ -1283,7 +1321,8 @@ void service_discovery_impl::process_serviceentry( std::shared_ptr<serviceentry_impl> &_entry, const std::vector<std::shared_ptr<option_impl> > &_options, bool _unicast_flag, - std::vector<std::pair<std::uint16_t, std::shared_ptr<message_impl>>>* _resubscribes) { + std::vector<std::pair<std::uint16_t, std::shared_ptr<message_impl>>>* _resubscribes, + bool _accept_offers) { // Read service info from entry entry_type_e its_type = _entry->get_type(); @@ -1365,17 +1404,19 @@ void service_discovery_impl::process_serviceentry( its_major, its_minor, _unicast_flag); break; case entry_type_e::OFFER_SERVICE: - process_offerservice_serviceentry(its_service, its_instance, - its_major, its_minor, its_ttl, - its_reliable_address, its_reliable_port, - its_unreliable_address, its_unreliable_port, _resubscribes); + if (_accept_offers) { + process_offerservice_serviceentry(its_service, its_instance, + its_major, its_minor, its_ttl, + its_reliable_address, its_reliable_port, + its_unreliable_address, its_unreliable_port, _resubscribes); + } break; case entry_type_e::UNKNOWN: default: VSOMEIP_ERROR << "Unsupported serviceentry type"; } - } else { + } else if (_accept_offers) { std::shared_ptr<request> its_request = find_request(its_service, its_instance); if (its_request) { std::lock_guard<std::mutex> its_lock(requested_mutex_); @@ -1491,7 +1532,7 @@ void service_discovery_impl::process_offerservice_serviceentry( if (its_subscription->is_acknowledged()) { if (its_offer_type == remote_offer_type_e::UNRELIABLE) { - if (its_unreliable && its_unreliable->is_connected()) { + if (its_unreliable && its_unreliable->is_established()) { // 28 = 16 (subscription) + 12 (option) check_space(28); const std::size_t options_size_before = @@ -1514,7 +1555,7 @@ void service_discovery_impl::process_offerservice_serviceentry( } } } else if (its_offer_type == remote_offer_type_e::RELIABLE) { - if (its_reliable && its_reliable->is_connected()) { + if (its_reliable && its_reliable->is_established()) { // 28 = 16 (subscription) + 12 (option) check_space(28); const std::size_t options_size_before = @@ -1544,8 +1585,8 @@ void service_discovery_impl::process_offerservice_serviceentry( } } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE) { if (its_reliable && its_unreliable && - its_reliable->is_connected() && - its_unreliable->is_connected()) { + its_reliable->is_established() && + its_unreliable->is_established()) { // 40 = 16 (subscription) + 2x12 (option) check_space(40); const std::size_t options_size_before = @@ -1566,7 +1607,7 @@ void service_discovery_impl::process_offerservice_serviceentry( static_cast<std::uint16_t>( _resubscribes->back().first - 40); } - } else if (its_reliable && !its_reliable->is_connected()) { + } else if (its_reliable && !its_reliable->is_established()) { its_client.second->set_tcp_connection_established(false); // restart TCP endpoint if not connected its_reliable->restart(); @@ -1597,7 +1638,7 @@ void service_discovery_impl::process_offerservice_serviceentry( } // restart TCP endpoint if not connected - if (its_reliable && !its_reliable->is_connected()) { + if (its_reliable && !its_reliable->is_established()) { its_reliable->restart(); } } @@ -1735,14 +1776,14 @@ void service_discovery_impl::on_endpoint_connected( its_subscription->get_endpoint(true)); const std::shared_ptr<const endpoint> its_unreliable_endpoint( its_subscription->get_endpoint(false)); - if(its_reliable_endpoint && its_reliable_endpoint->is_connected()) { + if(its_reliable_endpoint && its_reliable_endpoint->is_established()) { if(its_reliable_endpoint.get() == _endpoint.get()) { // mark tcp as established its_subscription->set_tcp_connection_established(true); } } - if(its_unreliable_endpoint && its_unreliable_endpoint->is_connected()) { - if(its_reliable_endpoint.get() == _endpoint.get()) { + if(its_unreliable_endpoint && its_unreliable_endpoint->is_established()) { + if(its_unreliable_endpoint.get() == _endpoint.get()) { // mark udp as established its_subscription->set_udp_connection_established(true); } @@ -2633,7 +2674,10 @@ void service_discovery_impl::serialize_and_send( _message->set_session(its_session.first); _message->set_reboot_flag(its_session.second); if(!serializer_->serialize(_message.get())) { - VSOMEIP_ERROR << "service_discovery_impl::serialize_and_send: serialization error."; + boost::system::error_code ec; + VSOMEIP_ERROR << "service_discovery_impl::serialize_and_send: serialization error." + << " Remote: " << _address.to_string(ec) << " session: 0x" + << std::hex << its_session.first; return; } if (host_->send_to(endpoint_definition::get(_address, port_, reliable_, _message->get_service(), _message->get_instance()), @@ -2660,7 +2704,10 @@ void service_discovery_impl::stop_ttl_timer() { void service_discovery_impl::check_ttl(const boost::system::error_code &_error) { if (!_error) { - host_->update_routing_info(ttl_timer_runtime_); + { + std::lock_guard<std::mutex> its_lock(check_ttl_mutex_); + host_->update_routing_info(ttl_timer_runtime_); + } start_ttl_timer(); } } @@ -2752,7 +2799,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ = its_runtime->create_message(); if (its_reliable && its_unreliable) { - if (its_reliable->is_connected() && its_unreliable->is_connected()) { + if (its_reliable->is_established() && its_unreliable->is_established()) { insert_subscription(its_message, _service, _instance, found_eventgroup.first, found_client->second, its_offer_type); @@ -2764,7 +2811,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ } } else { if(_reliable) { - if(endpoint->is_connected()) { + if(endpoint->is_established()) { insert_subscription(its_message, _service, _instance, found_eventgroup.first, found_client->second, its_offer_type); @@ -2775,7 +2822,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ found_client->second->set_tcp_connection_established(false); } } else { - if (endpoint->is_connected()) { + if (endpoint->is_established()) { insert_subscription(its_message, _service, _instance, found_eventgroup.first, found_client->second, its_offer_type); @@ -2827,47 +2874,39 @@ void service_discovery_impl::stop_subscription_expiration_timer_unlocked() { void service_discovery_impl::expire_subscriptions(const boost::system::error_code &_error) { if (!_error) { - next_subscription_expiration_ = host_->expire_subscriptions(); + next_subscription_expiration_ = host_->expire_subscriptions(false); start_subscription_expiration_timer(); } } bool service_discovery_impl::check_ipv4_address( - boost::asio::ip::address its_address) { + const boost::asio::ip::address& its_address) const { //Check unallowed ipv4 address bool is_valid = true; - std::shared_ptr<configuration> its_configuration = - host_->get_configuration(); - - if(its_configuration) { - boost::asio::ip::address_v4::bytes_type its_unicast_address = - its_configuration.get()->get_unicast_address().to_v4().to_bytes(); - boost::asio::ip::address_v4::bytes_type endpoint_address = - its_address.to_v4().to_bytes(); - - //same address as unicast address of DUT not allowed - if(its_unicast_address - == endpoint_address) { - VSOMEIP_ERROR << "Subscribers endpoint IP address is same as DUT's address! : " - << its_address.to_string(); - is_valid = false; - } - - // first 3 triples must match - its_unicast_address[3] = 0x00; - endpoint_address[3] = 0x00; - if(its_unicast_address - != endpoint_address) { -#if 1 - VSOMEIP_ERROR<< "First 3 triples of subscribers endpoint IP address are not valid!"; -#endif + static const boost::asio::ip::address_v4::bytes_type its_unicast_address = + unicast_.to_v4().to_bytes(); + const boost::asio::ip::address_v4::bytes_type endpoint_address = + its_address.to_v4().to_bytes(); + static const boost::asio::ip::address_v4::bytes_type its_netmask = + configuration_->get_netmask().to_v4().to_bytes(); + + //same address as unicast address of DUT not allowed + if (its_unicast_address == endpoint_address) { + VSOMEIP_ERROR << "Subscriber's IP address is same as host's address! : " + << its_address; + is_valid = false; + } else { + const std::uint32_t self = VSOMEIP_BYTES_TO_LONG(its_unicast_address[0], + its_unicast_address[1], its_unicast_address[2], its_unicast_address[3]); + const std::uint32_t remote = VSOMEIP_BYTES_TO_LONG(endpoint_address[0], + endpoint_address[1], endpoint_address[2], endpoint_address[3]); + const std::uint32_t netmask = VSOMEIP_BYTES_TO_LONG(its_netmask[0], + its_netmask[1], its_netmask[2], its_netmask[3]); + if ((self & netmask) != (remote & netmask)) { + VSOMEIP_ERROR<< "Subscriber's IP isn't in the same subnet as host's IP: " + << its_address; is_valid = false; - - } else { -#if 0 - VSOMEIP_INFO << "First 3 triples of subscribers endpoint IP address are valid!"; -#endif } } return is_valid; @@ -3012,7 +3051,7 @@ void service_discovery_impl::on_offer_debounce_timer_expired( for (services_t::iterator its_service = collected_offers_.begin(); its_service != collected_offers_.end(); its_service++) { for (auto its_instance : its_service->second) { - if (!host_->get_configuration()->is_someip( + if (!configuration_->is_someip( its_service->first, its_instance.first)) { non_someip_services.push_back(its_service); } @@ -3286,6 +3325,8 @@ bool service_discovery_impl::serialize_and_send_messages( void service_discovery_impl::stop_offer_service( service_t _service, instance_t _instance, std::shared_ptr<serviceinfo> _info) { + std::lock_guard<std::mutex> its_lock(offer_mutex_); + _info->set_ttl(0); bool stop_offer_required(false); // delete from initial phase offers { @@ -3420,19 +3461,11 @@ bool service_discovery_impl::last_offer_shorter_half_offer_delay_ago() { bool service_discovery_impl::check_source_address( const boost::asio::ip::address &its_source_address) const { bool is_valid = true; - std::shared_ptr<configuration> its_configuration = - host_->get_configuration(); - - if(its_configuration) { - boost::asio::ip::address its_unicast_address = - its_configuration.get()->get_unicast_address(); - // check if source address is same as nodes unicast address - if(its_unicast_address - == its_source_address) { - VSOMEIP_ERROR << "Source address of message is same as DUT's unicast address! : " - << its_source_address.to_string(); - is_valid = false; - } + // check if source address is same as nodes unicast address + if(unicast_ == its_source_address) { + VSOMEIP_ERROR << "Source address of message is same as DUT's unicast address! : " + << its_source_address.to_string(); + is_valid = false; } return is_valid; } @@ -3502,8 +3535,10 @@ void service_discovery_impl::update_subscription_expiration_timer( const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); stop_subscription_expiration_timer_unlocked(); + + std::unique_lock<std::mutex> its_message_lock(_message->get_message_lock()); for (const auto &entry : _message->get_entries()) { - if (entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK + if (entry && entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK && entry->get_ttl()) { const std::chrono::steady_clock::time_point its_expiration = now + std::chrono::seconds( @@ -3792,5 +3827,15 @@ service_discovery_impl::get_eventgroups_requiring_initial_events( return its_acks; } +void service_discovery_impl::register_offer_acceptance_handler( + vsomeip::offer_acceptance_handler_t _handler) { + offer_acceptance_handler_ = _handler; +} + +void service_discovery_impl::register_reboot_notification_handler( + reboot_notification_handler_t _handler) { + reboot_notification_handler_ = _handler; +} + } // namespace sd } // namespace vsomeip |