summaryrefslogtreecommitdiff
path: root/implementation/service_discovery/src/service_discovery_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp385
1 files changed, 215 insertions, 170 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index 1f93559..de5f190 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -34,21 +34,22 @@
#include "../../routing/include/eventgroupinfo.hpp"
#include "../../routing/include/serviceinfo.hpp"
#include "../../plugin/include/plugin_manager.hpp"
+#include "../../utility/include/byteorder.hpp"
namespace vsomeip {
namespace sd {
-service_discovery_impl::service_discovery_impl(service_discovery_host *_host)
+service_discovery_impl::service_discovery_impl(service_discovery_host *_host,
+ std::shared_ptr<configuration> _configuration)
: io_(_host->get_io()),
host_(_host),
+ configuration_(_configuration),
port_(VSOMEIP_SD_DEFAULT_PORT),
reliable_(false),
- serializer_(
- std::make_shared<serializer>(
- host_->get_configuration()->get_buffer_shrink_threshold())),
- deserializer_(
- std::make_shared<deserializer>(
- host_->get_configuration()->get_buffer_shrink_threshold())),
+ serializer_(std::make_shared<serializer>(
+ configuration_->get_buffer_shrink_threshold())),
+ deserializer_(std::make_shared<deserializer>(
+ configuration_->get_buffer_shrink_threshold())),
ttl_timer_(_host->get_io()),
ttl_timer_runtime_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY / 2),
ttl_(VSOMEIP_SD_DEFAULT_TTL),
@@ -75,10 +76,6 @@ service_discovery_impl::service_discovery_impl(service_discovery_host *_host)
service_discovery_impl::~service_discovery_impl() {
}
-std::shared_ptr<configuration> service_discovery_impl::get_configuration() const {
- return host_->get_configuration();
-}
-
boost::asio::io_service & service_discovery_impl::get_io() {
return io_;
}
@@ -86,62 +83,55 @@ boost::asio::io_service & service_discovery_impl::get_io() {
void service_discovery_impl::init() {
runtime_ = std::dynamic_pointer_cast<sd::runtime>(plugin_manager::get()->get_plugin(plugin_type_e::SD_RUNTIME_PLUGIN, VSOMEIP_SD_LIBRARY));
- std::shared_ptr < configuration > its_configuration =
- host_->get_configuration();
- if (its_configuration) {
- unicast_ = its_configuration->get_unicast_address();
- sd_multicast_ = its_configuration->get_sd_multicast();
- boost::system::error_code ec;
- sd_multicast_address_ = boost::asio::ip::address::from_string(sd_multicast_, ec);
-
- port_ = its_configuration->get_sd_port();
- reliable_ = (its_configuration->get_sd_protocol()
- == "tcp");
- max_message_size_ = (reliable_ ? VSOMEIP_MAX_TCP_SD_PAYLOAD :
- VSOMEIP_MAX_UDP_SD_PAYLOAD);
-
- ttl_ = its_configuration->get_sd_ttl();
-
- // generate random initial delay based on initial delay min and max
- std::int32_t initial_delay_min =
- its_configuration->get_sd_initial_delay_min();
- if (initial_delay_min < 0) {
- initial_delay_min = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MIN;
- }
- std::int32_t initial_delay_max =
- its_configuration->get_sd_initial_delay_max();
- if (initial_delay_max < 0) {
- initial_delay_max = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MAX;
- }
- if (initial_delay_min > initial_delay_max) {
- const std::uint32_t tmp(initial_delay_min);
- initial_delay_min = initial_delay_max;
- initial_delay_max = tmp;
- }
-
- std::random_device r;
- std::mt19937 e(r());
- std::uniform_int_distribution<std::uint32_t> distribution(
- initial_delay_min, initial_delay_max);
- initial_delay_ = std::chrono::milliseconds(distribution(e));
-
-
- repetitions_base_delay_ = std::chrono::milliseconds(
- its_configuration->get_sd_repetitions_base_delay());
- repetitions_max_ = its_configuration->get_sd_repetitions_max();
- cyclic_offer_delay_ = std::chrono::milliseconds(
- its_configuration->get_sd_cyclic_offer_delay());
- offer_debounce_time_ = std::chrono::milliseconds(
- its_configuration->get_sd_offer_debounce_time());
- ttl_timer_runtime_ = cyclic_offer_delay_ / 2;
-
- ttl_factor_offers_ = its_configuration->get_ttl_factor_offers();
- ttl_factor_subscriptions_ = its_configuration->get_ttl_factor_subscribes();
- last_msg_received_timer_timeout_ = cyclic_offer_delay_
- + (cyclic_offer_delay_ / 10);
- } else {
- VSOMEIP_ERROR << "SD: no configuration found!";
+ unicast_ = configuration_->get_unicast_address();
+ sd_multicast_ = configuration_->get_sd_multicast();
+ boost::system::error_code ec;
+ sd_multicast_address_ = boost::asio::ip::address::from_string(sd_multicast_, ec);
+
+ port_ = configuration_->get_sd_port();
+ reliable_ = (configuration_->get_sd_protocol() == "tcp");
+ max_message_size_ = (reliable_ ? VSOMEIP_MAX_TCP_SD_PAYLOAD :
+ VSOMEIP_MAX_UDP_SD_PAYLOAD);
+
+ ttl_ = configuration_->get_sd_ttl();
+
+ // generate random initial delay based on initial delay min and max
+ std::int32_t initial_delay_min =
+ configuration_->get_sd_initial_delay_min();
+ if (initial_delay_min < 0) {
+ initial_delay_min = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MIN;
+ }
+ std::int32_t initial_delay_max =
+ configuration_->get_sd_initial_delay_max();
+ if (initial_delay_max < 0) {
+ initial_delay_max = VSOMEIP_SD_DEFAULT_INITIAL_DELAY_MAX;
}
+ if (initial_delay_min > initial_delay_max) {
+ const std::uint32_t tmp(initial_delay_min);
+ initial_delay_min = initial_delay_max;
+ initial_delay_max = tmp;
+ }
+
+ std::random_device r;
+ std::mt19937 e(r());
+ std::uniform_int_distribution<std::uint32_t> distribution(
+ initial_delay_min, initial_delay_max);
+ initial_delay_ = std::chrono::milliseconds(distribution(e));
+
+
+ repetitions_base_delay_ = std::chrono::milliseconds(
+ configuration_->get_sd_repetitions_base_delay());
+ repetitions_max_ = configuration_->get_sd_repetitions_max();
+ cyclic_offer_delay_ = std::chrono::milliseconds(
+ configuration_->get_sd_cyclic_offer_delay());
+ offer_debounce_time_ = std::chrono::milliseconds(
+ configuration_->get_sd_offer_debounce_time());
+ ttl_timer_runtime_ = cyclic_offer_delay_ / 2;
+
+ ttl_factor_offers_ = configuration_->get_ttl_factor_offers();
+ ttl_factor_subscriptions_ = configuration_->get_ttl_factor_subscribes();
+ last_msg_received_timer_timeout_ = cyclic_offer_delay_
+ + (cyclic_offer_delay_ / 10);
}
void service_discovery_impl::start() {
@@ -300,7 +290,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
if (its_offer_type == remote_offer_type_e::UNRELIABLE &&
!its_subscription->get_endpoint(true) &&
its_subscription->get_endpoint(false)) {
- if (its_subscription->get_endpoint(false)->is_connected()) {
+ if (its_subscription->get_endpoint(false)->is_established()) {
insert_subscription(its_message,
_service, _instance,
_eventgroup,
@@ -311,7 +301,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
} else if (its_offer_type == remote_offer_type_e::RELIABLE &&
its_subscription->get_endpoint(true) &&
!its_subscription->get_endpoint(false)) {
- if (its_subscription->get_endpoint(true)->is_connected()) {
+ if (its_subscription->get_endpoint(true)->is_established()) {
insert_subscription(its_message,
_service, _instance,
_eventgroup,
@@ -322,17 +312,17 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
} else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE &&
its_subscription->get_endpoint(true) &&
its_subscription->get_endpoint(false)) {
- if (its_subscription->get_endpoint(true)->is_connected() &&
- its_subscription->get_endpoint(false)->is_connected()) {
+ if (its_subscription->get_endpoint(true)->is_established() &&
+ its_subscription->get_endpoint(false)->is_established()) {
insert_subscription(its_message,
_service, _instance,
_eventgroup,
its_subscription, its_offer_type);
} else {
- if (!its_subscription->get_endpoint(true)->is_connected()) {
+ if (!its_subscription->get_endpoint(true)->is_established()) {
its_subscription->set_tcp_connection_established(false);
}
- if (!its_subscription->get_endpoint(false)->is_connected()) {
+ if (!its_subscription->get_endpoint(false)->is_established()) {
its_subscription->set_udp_connection_established(false);
}
}
@@ -446,13 +436,7 @@ void service_discovery_impl::unsubscribe_all(service_t _service, instance_t _ins
if (found_service != subscribed_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
- for (auto &its_eventgroup : found_instance->second) {
- for (auto its_client : its_eventgroup.second) {
- its_client.second->set_acknowledged(true);
- its_client.second->set_endpoint(nullptr, true);
- its_client.second->set_endpoint(nullptr, false);
- }
- }
+ found_instance->second.clear();
}
}
}
@@ -813,11 +797,15 @@ void service_discovery_impl::insert_offer_entries(
for (const auto its_service : _services) {
for (const auto its_instance : its_service.second) {
if ((!is_suspended_)
- && ((!is_diagnosis_) || (is_diagnosis_ && !host_->get_configuration()->is_someip(its_service.first, its_instance.first)))) {
+ && ((!is_diagnosis_)
+ || (is_diagnosis_
+ && !configuration_->is_someip(its_service.first,
+ its_instance.first)))) {
// Only insert services with configured endpoint(s)
if ((_ignore_phase || its_instance.second->is_in_mainphase())
&& (its_instance.second->get_endpoint(false)
- || its_instance.second->get_endpoint(true))) {
+ || its_instance.second->get_endpoint(true))
+ && its_instance.second->get_ttl() > 0) {
if (i >= _start) {
if (!insert_offer_service(_message, its_service.first,
its_instance.first, its_instance.second, its_size)) {
@@ -935,8 +923,7 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared
// Two entries: Stop subscribe & subscribe within one SD-Message
// One option: Both entries reference it
- const std::function<std::shared_ptr<eventgroupentry_impl>(ttl_t)> insert_entry
- = [&](ttl_t _ttl) {
+ auto insert_entry = [&](ttl_t _ttl) {
std::shared_ptr<eventgroupentry_impl> its_entry =
_message->create_eventgroup_entry();
// SUBSCRIBE_EVENTGROUP and STOP_SUBSCRIBE_EVENTGROUP are identical
@@ -955,7 +942,7 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared
if (_offer_type == remote_offer_type_e::UNRELIABLE &&
!its_reliable_endpoint && its_unreliable_endpoint) {
- if (its_unreliable_endpoint->is_connected()) {
+ if (its_unreliable_endpoint->is_established()) {
const std::uint16_t its_port = its_unreliable_endpoint->get_local_port();
if (its_port) {
std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0);
@@ -975,7 +962,7 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared
}
} else if (_offer_type == remote_offer_type_e::RELIABLE &&
its_reliable_endpoint && !its_unreliable_endpoint) {
- if (its_reliable_endpoint->is_connected()) {
+ if (its_reliable_endpoint->is_established()) {
const std::uint16_t its_port = its_reliable_endpoint->get_local_port();
if (its_port) {
std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0);
@@ -995,8 +982,8 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared
}
} else if (_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE &&
its_reliable_endpoint && its_unreliable_endpoint) {
- if (its_reliable_endpoint->is_connected() &&
- its_unreliable_endpoint->is_connected()) {
+ if (its_reliable_endpoint->is_established() &&
+ its_unreliable_endpoint->is_established()) {
const std::uint16_t its_reliable_port = its_reliable_endpoint->get_local_port();
const std::uint16_t its_unreliable_port = its_unreliable_endpoint->get_local_port();
if (its_reliable_port && its_unreliable_port) {
@@ -1021,10 +1008,10 @@ bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared
<< std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
}
} else {
- if (!its_reliable_endpoint->is_connected()) {
+ if (!its_reliable_endpoint->is_established()) {
_subscription->set_tcp_connection_established(false);
}
- if (!its_unreliable_endpoint->is_connected()) {
+ if (!its_unreliable_endpoint->is_established()) {
_subscription->set_udp_connection_established(false);
}
}
@@ -1127,6 +1114,7 @@ bool service_discovery_impl::send(bool _is_announcing) {
its_message = its_runtime->create_message();
its_messages.push_back(its_message);
+ std::lock_guard<std::mutex> its_lock(offer_mutex_);
services_t its_offers = host_->get_offered_services();
fill_message_with_offer_entries(its_runtime, its_message,
its_messages, its_offers, false);
@@ -1149,6 +1137,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
+ std::lock_guard<std::mutex> its_lock(check_ttl_mutex_);
std::lock_guard<std::mutex> its_session_lock(sessions_received_mutex_);
if(is_suspended_) {
@@ -1186,8 +1175,18 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
remove_remote_offer_type_by_ip(_sender);
host_->expire_subscriptions(_sender);
host_->expire_services(_sender);
+ if (reboot_notification_handler_) {
+ ip_address_t ip;
+ if (_sender.is_v4()) {
+ ip.address_.v4_ = _sender.to_v4().to_bytes();
+ ip.is_v4_ = true;
+ } else {
+ ip.address_.v6_ = _sender.to_v6().to_bytes();
+ ip.is_v4_ = false;
+ }
+ reboot_notification_handler_(ip);
+ }
}
-
std::vector < std::shared_ptr<option_impl> > its_options =
its_message->get_options();
@@ -1199,7 +1198,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
std::shared_ptr < message_impl > its_message_response
= its_runtime->create_message();
- const std::uint8_t its_required_acks =
+ std::uint8_t its_required_acks =
its_message->get_number_required_acks();
its_message_response->set_number_required_acks(its_required_acks);
std::shared_ptr<sd_message_identifier_t> its_message_id =
@@ -1215,27 +1214,66 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
const message_impl::entries_t::const_iterator its_end = its_entries.end();
bool is_stop_subscribe_subscribe(false);
+ bool offer_acceptance_queried(false);
+ bool accept_offers(false);
+ bool expired_services(false);
+
for (auto iter = its_entries.begin(); iter != its_end; iter++) {
+ if (!offer_acceptance_queried) {
+ if (offer_acceptance_handler_) {
+ ip_address_t ip;
+ if (configuration_->offer_acceptance_required(_sender)) {
+ if (_sender.is_v4()) {
+ ip.address_.v4_ = _sender.to_v4().to_bytes();
+ ip.is_v4_ = true;
+ } else {
+ ip.address_.v6_ = _sender.to_v6().to_bytes();
+ ip.is_v4_ = false;
+ }
+ accept_offers = offer_acceptance_handler_(ip);
+ if (!accept_offers && !expired_services) {
+
+ VSOMEIP_INFO << "service_discovery_impl::on_message: Do not accept offer / subscribe from: "
+ << std::hex << std::setw(4) << std::setfill('0') << _sender.to_string();
+
+ remove_remote_offer_type_by_ip(_sender);
+ host_->expire_subscriptions(_sender);
+ host_->expire_services(_sender);
+ expired_services = true;
+ }
+ } else {
+ accept_offers = true;
+ }
+ offer_acceptance_queried = true;
+ } else {
+ offer_acceptance_queried = true;
+ accept_offers = true;
+ }
+ }
if ((*iter)->is_service_entry()) {
std::shared_ptr < serviceentry_impl > its_service_entry =
std::dynamic_pointer_cast < serviceentry_impl
> (*iter);
bool its_unicast_flag = its_message->get_unicast_flag();
process_serviceentry(its_service_entry, its_options,
- its_unicast_flag, &its_resubscribes);
+ its_unicast_flag, &its_resubscribes, accept_offers);
} else {
- std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry =
- std::dynamic_pointer_cast < eventgroupentry_impl
- > (*iter);
- bool force_initial_events(false);
- if (is_stop_subscribe_subscribe) {
- force_initial_events = true;
+ if (accept_offers) {
+ std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry =
+ std::dynamic_pointer_cast < eventgroupentry_impl
+ > (*iter);
+ bool force_initial_events(false);
+ if (is_stop_subscribe_subscribe) {
+ force_initial_events = true;
+ }
+ is_stop_subscribe_subscribe = check_stop_subscribe_subscribe(
+ iter, its_end, its_message->get_options());
+ process_eventgroupentry(its_eventgroup_entry, its_options,
+ its_message_response, _destination,
+ its_message_id, is_stop_subscribe_subscribe, force_initial_events);
+ } else {
+ its_required_acks = 0;
}
- is_stop_subscribe_subscribe = check_stop_subscribe_subscribe(
- iter, its_end, its_message->get_options());
- process_eventgroupentry(its_eventgroup_entry, its_options,
- its_message_response, _destination,
- its_message_id, is_stop_subscribe_subscribe, force_initial_events);
}
}
@@ -1283,7 +1321,8 @@ void service_discovery_impl::process_serviceentry(
std::shared_ptr<serviceentry_impl> &_entry,
const std::vector<std::shared_ptr<option_impl> > &_options,
bool _unicast_flag,
- std::vector<std::pair<std::uint16_t, std::shared_ptr<message_impl>>>* _resubscribes) {
+ std::vector<std::pair<std::uint16_t, std::shared_ptr<message_impl>>>* _resubscribes,
+ bool _accept_offers) {
// Read service info from entry
entry_type_e its_type = _entry->get_type();
@@ -1365,17 +1404,19 @@ void service_discovery_impl::process_serviceentry(
its_major, its_minor, _unicast_flag);
break;
case entry_type_e::OFFER_SERVICE:
- process_offerservice_serviceentry(its_service, its_instance,
- its_major, its_minor, its_ttl,
- its_reliable_address, its_reliable_port,
- its_unreliable_address, its_unreliable_port, _resubscribes);
+ if (_accept_offers) {
+ process_offerservice_serviceentry(its_service, its_instance,
+ its_major, its_minor, its_ttl,
+ its_reliable_address, its_reliable_port,
+ its_unreliable_address, its_unreliable_port, _resubscribes);
+ }
break;
case entry_type_e::UNKNOWN:
default:
VSOMEIP_ERROR << "Unsupported serviceentry type";
}
- } else {
+ } else if (_accept_offers) {
std::shared_ptr<request> its_request = find_request(its_service, its_instance);
if (its_request) {
std::lock_guard<std::mutex> its_lock(requested_mutex_);
@@ -1491,7 +1532,7 @@ void service_discovery_impl::process_offerservice_serviceentry(
if (its_subscription->is_acknowledged()) {
if (its_offer_type == remote_offer_type_e::UNRELIABLE) {
- if (its_unreliable && its_unreliable->is_connected()) {
+ if (its_unreliable && its_unreliable->is_established()) {
// 28 = 16 (subscription) + 12 (option)
check_space(28);
const std::size_t options_size_before =
@@ -1514,7 +1555,7 @@ void service_discovery_impl::process_offerservice_serviceentry(
}
}
} else if (its_offer_type == remote_offer_type_e::RELIABLE) {
- if (its_reliable && its_reliable->is_connected()) {
+ if (its_reliable && its_reliable->is_established()) {
// 28 = 16 (subscription) + 12 (option)
check_space(28);
const std::size_t options_size_before =
@@ -1544,8 +1585,8 @@ void service_discovery_impl::process_offerservice_serviceentry(
}
} else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE) {
if (its_reliable && its_unreliable &&
- its_reliable->is_connected() &&
- its_unreliable->is_connected()) {
+ its_reliable->is_established() &&
+ its_unreliable->is_established()) {
// 40 = 16 (subscription) + 2x12 (option)
check_space(40);
const std::size_t options_size_before =
@@ -1566,7 +1607,7 @@ void service_discovery_impl::process_offerservice_serviceentry(
static_cast<std::uint16_t>(
_resubscribes->back().first - 40);
}
- } else if (its_reliable && !its_reliable->is_connected()) {
+ } else if (its_reliable && !its_reliable->is_established()) {
its_client.second->set_tcp_connection_established(false);
// restart TCP endpoint if not connected
its_reliable->restart();
@@ -1597,7 +1638,7 @@ void service_discovery_impl::process_offerservice_serviceentry(
}
// restart TCP endpoint if not connected
- if (its_reliable && !its_reliable->is_connected()) {
+ if (its_reliable && !its_reliable->is_established()) {
its_reliable->restart();
}
}
@@ -1735,14 +1776,14 @@ void service_discovery_impl::on_endpoint_connected(
its_subscription->get_endpoint(true));
const std::shared_ptr<const endpoint> its_unreliable_endpoint(
its_subscription->get_endpoint(false));
- if(its_reliable_endpoint && its_reliable_endpoint->is_connected()) {
+ if(its_reliable_endpoint && its_reliable_endpoint->is_established()) {
if(its_reliable_endpoint.get() == _endpoint.get()) {
// mark tcp as established
its_subscription->set_tcp_connection_established(true);
}
}
- if(its_unreliable_endpoint && its_unreliable_endpoint->is_connected()) {
- if(its_reliable_endpoint.get() == _endpoint.get()) {
+ if(its_unreliable_endpoint && its_unreliable_endpoint->is_established()) {
+ if(its_unreliable_endpoint.get() == _endpoint.get()) {
// mark udp as established
its_subscription->set_udp_connection_established(true);
}
@@ -2633,7 +2674,10 @@ void service_discovery_impl::serialize_and_send(
_message->set_session(its_session.first);
_message->set_reboot_flag(its_session.second);
if(!serializer_->serialize(_message.get())) {
- VSOMEIP_ERROR << "service_discovery_impl::serialize_and_send: serialization error.";
+ boost::system::error_code ec;
+ VSOMEIP_ERROR << "service_discovery_impl::serialize_and_send: serialization error."
+ << " Remote: " << _address.to_string(ec) << " session: 0x"
+ << std::hex << its_session.first;
return;
}
if (host_->send_to(endpoint_definition::get(_address, port_, reliable_, _message->get_service(), _message->get_instance()),
@@ -2660,7 +2704,10 @@ void service_discovery_impl::stop_ttl_timer() {
void service_discovery_impl::check_ttl(const boost::system::error_code &_error) {
if (!_error) {
- host_->update_routing_info(ttl_timer_runtime_);
+ {
+ std::lock_guard<std::mutex> its_lock(check_ttl_mutex_);
+ host_->update_routing_info(ttl_timer_runtime_);
+ }
start_ttl_timer();
}
}
@@ -2752,7 +2799,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
= its_runtime->create_message();
if (its_reliable && its_unreliable) {
- if (its_reliable->is_connected() && its_unreliable->is_connected()) {
+ if (its_reliable->is_established() && its_unreliable->is_established()) {
insert_subscription(its_message, _service,
_instance, found_eventgroup.first,
found_client->second, its_offer_type);
@@ -2764,7 +2811,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
}
} else {
if(_reliable) {
- if(endpoint->is_connected()) {
+ if(endpoint->is_established()) {
insert_subscription(its_message, _service,
_instance, found_eventgroup.first,
found_client->second, its_offer_type);
@@ -2775,7 +2822,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
found_client->second->set_tcp_connection_established(false);
}
} else {
- if (endpoint->is_connected()) {
+ if (endpoint->is_established()) {
insert_subscription(its_message, _service,
_instance, found_eventgroup.first,
found_client->second, its_offer_type);
@@ -2827,47 +2874,39 @@ void service_discovery_impl::stop_subscription_expiration_timer_unlocked() {
void service_discovery_impl::expire_subscriptions(const boost::system::error_code &_error) {
if (!_error) {
- next_subscription_expiration_ = host_->expire_subscriptions();
+ next_subscription_expiration_ = host_->expire_subscriptions(false);
start_subscription_expiration_timer();
}
}
bool service_discovery_impl::check_ipv4_address(
- boost::asio::ip::address its_address) {
+ const boost::asio::ip::address& its_address) const {
//Check unallowed ipv4 address
bool is_valid = true;
- std::shared_ptr<configuration> its_configuration =
- host_->get_configuration();
-
- if(its_configuration) {
- boost::asio::ip::address_v4::bytes_type its_unicast_address =
- its_configuration.get()->get_unicast_address().to_v4().to_bytes();
- boost::asio::ip::address_v4::bytes_type endpoint_address =
- its_address.to_v4().to_bytes();
-
- //same address as unicast address of DUT not allowed
- if(its_unicast_address
- == endpoint_address) {
- VSOMEIP_ERROR << "Subscribers endpoint IP address is same as DUT's address! : "
- << its_address.to_string();
- is_valid = false;
- }
-
- // first 3 triples must match
- its_unicast_address[3] = 0x00;
- endpoint_address[3] = 0x00;
- if(its_unicast_address
- != endpoint_address) {
-#if 1
- VSOMEIP_ERROR<< "First 3 triples of subscribers endpoint IP address are not valid!";
-#endif
+ static const boost::asio::ip::address_v4::bytes_type its_unicast_address =
+ unicast_.to_v4().to_bytes();
+ const boost::asio::ip::address_v4::bytes_type endpoint_address =
+ its_address.to_v4().to_bytes();
+ static const boost::asio::ip::address_v4::bytes_type its_netmask =
+ configuration_->get_netmask().to_v4().to_bytes();
+
+ //same address as unicast address of DUT not allowed
+ if (its_unicast_address == endpoint_address) {
+ VSOMEIP_ERROR << "Subscriber's IP address is same as host's address! : "
+ << its_address;
+ is_valid = false;
+ } else {
+ const std::uint32_t self = VSOMEIP_BYTES_TO_LONG(its_unicast_address[0],
+ its_unicast_address[1], its_unicast_address[2], its_unicast_address[3]);
+ const std::uint32_t remote = VSOMEIP_BYTES_TO_LONG(endpoint_address[0],
+ endpoint_address[1], endpoint_address[2], endpoint_address[3]);
+ const std::uint32_t netmask = VSOMEIP_BYTES_TO_LONG(its_netmask[0],
+ its_netmask[1], its_netmask[2], its_netmask[3]);
+ if ((self & netmask) != (remote & netmask)) {
+ VSOMEIP_ERROR<< "Subscriber's IP isn't in the same subnet as host's IP: "
+ << its_address;
is_valid = false;
-
- } else {
-#if 0
- VSOMEIP_INFO << "First 3 triples of subscribers endpoint IP address are valid!";
-#endif
}
}
return is_valid;
@@ -3012,7 +3051,7 @@ void service_discovery_impl::on_offer_debounce_timer_expired(
for (services_t::iterator its_service = collected_offers_.begin();
its_service != collected_offers_.end(); its_service++) {
for (auto its_instance : its_service->second) {
- if (!host_->get_configuration()->is_someip(
+ if (!configuration_->is_someip(
its_service->first, its_instance.first)) {
non_someip_services.push_back(its_service);
}
@@ -3286,6 +3325,8 @@ bool service_discovery_impl::serialize_and_send_messages(
void service_discovery_impl::stop_offer_service(
service_t _service, instance_t _instance,
std::shared_ptr<serviceinfo> _info) {
+ std::lock_guard<std::mutex> its_lock(offer_mutex_);
+ _info->set_ttl(0);
bool stop_offer_required(false);
// delete from initial phase offers
{
@@ -3420,19 +3461,11 @@ bool service_discovery_impl::last_offer_shorter_half_offer_delay_ago() {
bool service_discovery_impl::check_source_address(
const boost::asio::ip::address &its_source_address) const {
bool is_valid = true;
- std::shared_ptr<configuration> its_configuration =
- host_->get_configuration();
-
- if(its_configuration) {
- boost::asio::ip::address its_unicast_address =
- its_configuration.get()->get_unicast_address();
- // check if source address is same as nodes unicast address
- if(its_unicast_address
- == its_source_address) {
- VSOMEIP_ERROR << "Source address of message is same as DUT's unicast address! : "
- << its_source_address.to_string();
- is_valid = false;
- }
+ // check if source address is same as nodes unicast address
+ if(unicast_ == its_source_address) {
+ VSOMEIP_ERROR << "Source address of message is same as DUT's unicast address! : "
+ << its_source_address.to_string();
+ is_valid = false;
}
return is_valid;
}
@@ -3502,8 +3535,10 @@ void service_discovery_impl::update_subscription_expiration_timer(
const std::chrono::steady_clock::time_point now =
std::chrono::steady_clock::now();
stop_subscription_expiration_timer_unlocked();
+
+ std::unique_lock<std::mutex> its_message_lock(_message->get_message_lock());
for (const auto &entry : _message->get_entries()) {
- if (entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
+ if (entry && entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
&& entry->get_ttl()) {
const std::chrono::steady_clock::time_point its_expiration = now
+ std::chrono::seconds(
@@ -3792,5 +3827,15 @@ service_discovery_impl::get_eventgroups_requiring_initial_events(
return its_acks;
}
+void service_discovery_impl::register_offer_acceptance_handler(
+ vsomeip::offer_acceptance_handler_t _handler) {
+ offer_acceptance_handler_ = _handler;
+}
+
+void service_discovery_impl::register_reboot_notification_handler(
+ reboot_notification_handler_t _handler) {
+ reboot_notification_handler_ = _handler;
+}
+
} // namespace sd
} // namespace vsomeip