summaryrefslogtreecommitdiff
path: root/implementation/service_discovery/src/service_discovery_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp676
1 files changed, 477 insertions, 199 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index e5c4a32..7ae5395 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -35,6 +35,7 @@
#include "../../endpoints/include/udp_server_endpoint_impl.hpp"
#include "../../message/include/serializer.hpp"
#include "../../plugin/include/plugin_manager_impl.hpp"
+#include "../../routing/include/event.hpp"
#include "../../routing/include/eventgroupinfo.hpp"
#include "../../routing/include/serviceinfo.hpp"
#include "../../utility/include/byteorder.hpp"
@@ -73,7 +74,7 @@ service_discovery_impl::service_discovery_impl(
last_msg_received_timer_(_host->get_io()),
last_msg_received_timer_timeout_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY +
(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY / 10)) {
- // TODO: cleanup start condition!
+
next_subscription_expiration_ = std::chrono::steady_clock::now() + std::chrono::hours(24);
}
@@ -234,31 +235,45 @@ void
service_discovery_impl::subscribe(
service_t _service, instance_t _instance,
eventgroup_t _eventgroup, major_version_t _major,
- ttl_t _ttl, client_t _client) {
- {
- std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
- auto found_service = subscribed_.find(_service);
- if (found_service != subscribed_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- auto its_subscription = found_eventgroup->second;
- if (its_subscription->get_major() != _major) {
- VSOMEIP_ERROR
- << "Subscriptions to different versions of the same "
- "service instance are not supported!";
- } else if (its_subscription->is_selective()) {
- if (its_subscription->add_client(_client)) {
- its_subscription->set_state(_client,
- subscription_state_e::ST_NOT_ACKNOWLEDGED);
- send_subscription(its_subscription,
- _service, _instance, _eventgroup,
- _client);
+ ttl_t _ttl, client_t _client,
+ const std::shared_ptr<eventgroupinfo> &_info) {
+#ifdef VSOMEIP_ENABLE_COMPAT
+ bool is_selective(_info ? _info->is_selective() : false);
+#endif // VSOMEIP_ENABLE_COMPAT
+
+ std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
+ auto found_service = subscribed_.find(_service);
+ if (found_service != subscribed_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ auto found_eventgroup = found_instance->second.find(_eventgroup);
+ if (found_eventgroup != found_instance->second.end()) {
+ auto its_subscription = found_eventgroup->second;
+#ifdef VSOMEIP_ENABLE_COMPAT
+ if (!its_subscription->is_selective() && is_selective) {
+ its_subscription->set_selective(true);
+ its_subscription->remove_client(VSOMEIP_ROUTING_CLIENT);
+ for (const auto e : _info->get_events()) {
+ for (const auto c : e->get_subscribers(_eventgroup)) {
+ its_subscription->add_client(c);
}
}
- return;
}
+#endif // VSOMEIP_ENABLE_COMPAT
+ if (its_subscription->get_major() != _major) {
+ VSOMEIP_ERROR
+ << "Subscriptions to different versions of the same "
+ "service instance are not supported!";
+ } else if (its_subscription->is_selective()) {
+ if (its_subscription->add_client(_client)) {
+ its_subscription->set_state(_client,
+ subscription_state_e::ST_NOT_ACKNOWLEDGED);
+ send_subscription(its_subscription,
+ _service, _instance, _eventgroup,
+ _client);
+ }
+ }
+ return;
}
}
}
@@ -267,31 +282,26 @@ service_discovery_impl::subscribe(
get_subscription_endpoints(_service, _instance,
its_reliable, its_unreliable);
- {
- std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
+ // New subscription
+ std::shared_ptr<subscription> its_subscription
+ = create_subscription(
+ _major, _ttl, its_reliable, its_unreliable, _info);
- // New subscription
- std::shared_ptr<subscription> its_subscription
- = create_subscription(
- _service, _instance, _eventgroup, _major,
- _ttl, its_reliable, its_unreliable);
-
- if (!its_subscription) {
- VSOMEIP_ERROR << __func__
- << ": creating subscription failed!";
- return;
- }
+ if (!its_subscription) {
+ VSOMEIP_ERROR << __func__
+ << ": creating subscription failed!";
+ return;
+ }
- subscribed_[_service][_instance][_eventgroup] = its_subscription;
+ subscribed_[_service][_instance][_eventgroup] = its_subscription;
- its_subscription->add_client(_client);
- its_subscription->set_state(_client,
- subscription_state_e::ST_NOT_ACKNOWLEDGED);
+ its_subscription->add_client(_client);
+ its_subscription->set_state(_client,
+ subscription_state_e::ST_NOT_ACKNOWLEDGED);
- send_subscription(its_subscription,
- _service, _instance, _eventgroup,
- _client);
- }
+ send_subscription(its_subscription,
+ _service, _instance, _eventgroup,
+ _client);
}
void
@@ -300,38 +310,36 @@ service_discovery_impl::send_subscription(
const service_t _service, const instance_t _instance,
const eventgroup_t _eventgroup,
const client_t _client) {
+ (void)_client;
+
auto its_reliable = _subscription->get_endpoint(true);
auto its_unreliable = _subscription->get_endpoint(false);
boost::asio::ip::address its_address;
get_subscription_address(its_reliable, its_unreliable, its_address);
-
if (!its_address.is_unspecified()) {
entry_data_t its_data;
-
- const remote_offer_type_e its_offer_type
- = get_remote_offer_type(_service, _instance);
- if (its_offer_type == remote_offer_type_e::UNRELIABLE &&
- !its_reliable && its_unreliable) {
+ const reliability_type_e its_reliability_type =
+ get_eventgroup_reliability(_service, _instance, _eventgroup, _subscription);
+ if (its_reliability_type == reliability_type_e::RT_UNRELIABLE && its_unreliable) {
if (its_unreliable->is_established()) {
its_data = create_eventgroup_entry(_service, _instance,
- _eventgroup, _subscription, its_offer_type);
+ _eventgroup, _subscription, its_reliability_type);
} else {
_subscription->set_udp_connection_established(false);
}
- } else if (its_offer_type == remote_offer_type_e::RELIABLE &&
- its_reliable && !its_unreliable) {
+ } else if (its_reliability_type == reliability_type_e::RT_RELIABLE && its_reliable) {
if (its_reliable->is_established()) {
its_data = create_eventgroup_entry(_service, _instance,
- _eventgroup, _subscription, its_offer_type);
+ _eventgroup, _subscription, its_reliability_type);
} else {
_subscription->set_tcp_connection_established(false);
}
- } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE &&
+ } else if (its_reliability_type == reliability_type_e::RT_BOTH &&
its_reliable && its_unreliable) {
if (its_reliable->is_established() && its_unreliable->is_established()) {
its_data = create_eventgroup_entry(_service, _instance,
- _eventgroup, _subscription, its_offer_type);
+ _eventgroup, _subscription, its_reliability_type);
} else {
if (!its_reliable->is_established()) {
_subscription->set_tcp_connection_established(false);
@@ -340,16 +348,14 @@ service_discovery_impl::send_subscription(
_subscription->set_udp_connection_established(false);
}
}
+ } else if (its_reliability_type == reliability_type_e::RT_UNKNOWN) {
+ VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't determine reliability type for subscription to ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] ";
}
if (its_data.entry_) {
- if (_subscription->is_selective()) {
- auto its_selective_option = std::make_shared<selective_option_impl>();
- (void)its_selective_option->add_client(_client);
-
- its_data.options_.push_back(its_selective_option);
- }
-
// TODO: Implement a simple path, that sends a single message
auto its_current_message = std::make_shared<message_impl>();
std::vector<std::shared_ptr<message_impl> > its_messages;
@@ -368,9 +374,9 @@ service_discovery_impl::get_subscription_endpoints(
std::shared_ptr<endpoint> &_reliable,
std::shared_ptr<endpoint> &_unreliable) const {
_unreliable = host_->find_or_create_remote_client(
- _service, _instance, false, VSOMEIP_ROUTING_CLIENT);
+ _service, _instance, false);
_reliable = host_->find_or_create_remote_client(
- _service, _instance, true, VSOMEIP_ROUTING_CLIENT);
+ _service, _instance, true);
}
void
@@ -423,28 +429,37 @@ service_discovery_impl::unsubscribe(service_t _service,
if (!its_subscription->has_client()) {
its_subscription->set_ttl(0);
} else if (its_subscription->is_selective()) {
- auto its_major = its_subscription->get_major();
-
// create a dummy subscription object to unsubscribe
// the single client.
+ auto its_major = its_subscription->get_major();
+
its_subscription = std::make_shared<subscription>();
its_subscription->set_major(its_major);
its_subscription->set_ttl(0);
its_subscription->set_selective(true);
- its_subscription->add_client(_client);
its_subscription->set_endpoint(its_reliable, true);
its_subscription->set_endpoint(its_unreliable, false);
}
}
- const remote_offer_type_e its_offer_type
- = get_remote_offer_type(its_subscription);
+ // For selective subscriptions, the client must be added again
+ // to generate the selective option
+ if (its_subscription->is_selective())
+ its_subscription->add_client(_client);
+ const reliability_type_e its_reliability_type =
+ get_eventgroup_reliability(_service, _instance, _eventgroup, its_subscription);
auto its_data = create_eventgroup_entry(_service, _instance,
- _eventgroup, its_subscription, its_offer_type);
- if (its_data.entry_) {
+ _eventgroup, its_subscription, its_reliability_type);
+ if (its_data.entry_)
its_current_message->add_entry_data(its_data.entry_, its_data.options_);
- }
+
+ // Remove it again before updating (only impacts last unsubscribe)
+ if (its_subscription->is_selective())
+ (void)its_subscription->remove_client(_client);
+
+ // Ensure to update the "real" subscription
+ its_subscription = found_eventgroup->second;
// Finally update the subscriptions
if (!its_subscription->has_client()) {
@@ -470,8 +485,6 @@ service_discovery_impl::unsubscribe_all(
auto its_current_message = std::make_shared<message_impl>();;
boost::asio::ip::address its_address;
- const remote_offer_type_e its_offer_type
- = get_remote_offer_type(_service, _instance);
{
std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
@@ -482,8 +495,13 @@ service_discovery_impl::unsubscribe_all(
for (auto &its_eventgroup : found_instance->second) {
auto its_subscription = its_eventgroup.second;
its_subscription->set_ttl(0);
+
+ const reliability_type_e its_reliability =
+ get_eventgroup_reliability(_service, _instance,
+ its_eventgroup.first, its_subscription);
+
auto its_data = create_eventgroup_entry(_service, _instance,
- its_eventgroup.first, its_subscription, its_offer_type);
+ its_eventgroup.first, its_subscription, its_reliability);
auto its_reliable = its_subscription->get_endpoint(true);
auto its_unreliable = its_subscription->get_endpoint(false);
get_subscription_address(
@@ -672,7 +690,7 @@ entry_data_t
service_discovery_impl::create_eventgroup_entry(
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
const std::shared_ptr<subscription> &_subscription,
- remote_offer_type_e _offer_type) {
+ reliability_type_e _reliability_type) {
entry_data_t its_data;
its_data.entry_ = nullptr;
@@ -683,18 +701,18 @@ service_discovery_impl::create_eventgroup_entry(
bool insert_reliable(false);
bool insert_unreliable(false);
- switch (_offer_type) {
- case remote_offer_type_e::RELIABLE:
+ switch (_reliability_type) {
+ case reliability_type_e::RT_RELIABLE:
if (its_reliable_endpoint) {
insert_reliable = true;
}
break;
- case remote_offer_type_e::UNRELIABLE:
+ case reliability_type_e::RT_UNRELIABLE:
if (its_unreliable_endpoint) {
insert_unreliable = true;
}
break;
- case remote_offer_type_e::RELIABLE_UNRELIABLE:
+ case reliability_type_e::RT_BOTH:
if (its_reliable_endpoint && its_unreliable_endpoint) {
insert_reliable = true;
insert_unreliable = true;
@@ -705,13 +723,13 @@ service_discovery_impl::create_eventgroup_entry(
}
if (!insert_reliable && !insert_unreliable
- && _offer_type != remote_offer_type_e::UNKNOWN) {
+ && _reliability_type != reliability_type_e::RT_UNKNOWN) {
VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
- "subscription doesn't match offer type: ["
+ "subscription doesn't match reliability type: ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "
- << _offer_type;
+ << (uint16_t) _reliability_type;
return its_data;
}
std::shared_ptr<eventgroupentry_impl> its_entry, its_other;
@@ -753,11 +771,13 @@ service_discovery_impl::create_eventgroup_entry(
auto its_option = create_ip_option(unicast_, its_port, true);
its_data.options_.push_back(its_option);
} else {
- VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
+ VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
"local reliable port is zero: ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ its_data.entry_ = nullptr;
+ its_data.other_ = nullptr;
return its_data;
}
}
@@ -805,21 +825,20 @@ service_discovery_impl::create_eventgroup_entry(
auto its_option = create_ip_option(unicast_, its_port, false);
its_data.options_.push_back(its_option);
} else {
- VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
+ VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
" local unreliable port is zero: ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ its_data.entry_ = nullptr;
+ its_data.other_ = nullptr;
return its_data;
}
}
- if (its_entry
- &&_subscription->is_selective()) {
+ if (its_entry &&_subscription->is_selective()) {
auto its_selective_option = std::make_shared<selective_option_impl>();
- for (const auto &its_client : _subscription->get_clients())
- (void)its_selective_option->add_client(its_client);
-
+ its_selective_option->set_clients(_subscription->get_clients());
its_data.options_.push_back(its_selective_option);
}
@@ -836,7 +855,7 @@ service_discovery_impl::insert_subscription_ack(
const std::shared_ptr<remote_subscription_ack>& _acknowledgement,
const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl,
const std::shared_ptr<endpoint_definition> &_target,
- const client_t _client) {
+ const std::set<client_t> &_clients) {
std::unique_lock<std::recursive_mutex> its_lock(_acknowledgement->get_lock());
auto its_message = _acknowledgement->get_current_message();
@@ -871,11 +890,10 @@ service_discovery_impl::insert_subscription_ack(
}
}
- if (_client != VSOMEIP_ROUTING_CLIENT) {
+ if (_clients.size() > 1 || (*(_clients.begin())) != 0) {
auto its_selective_option = its_eventgroup_entry->get_selective_option();
- if (its_selective_option) {
- its_selective_option->add_client(_client);
- }
+ if (its_selective_option)
+ its_selective_option->set_clients(_clients);
}
return;
@@ -911,10 +929,11 @@ service_discovery_impl::insert_subscription_ack(
its_data.options_.push_back(its_option);
}
}
+
// Selective
- if (_client != VSOMEIP_ROUTING_CLIENT) {
+ if (_clients.size() > 1 || (*(_clients.begin())) != 0) {
auto its_selective_option = std::make_shared<selective_option_impl>();
- (void)its_selective_option->add_client(_client);
+ (void)its_selective_option->set_clients(_clients);
its_data.options_.push_back(its_selective_option);
}
@@ -1031,43 +1050,30 @@ service_discovery_impl::on_message(
bool force_initial_events(false);
bool sd_acceptance_queried(false);
- bool accept_offers(false);
- bool expired_services(false);
+ expired_ports_t expired_ports;
+ sd_acceptance_state_t accept_state(expired_ports);
for (auto iter = its_entries.begin(); iter != its_end; iter++) {
if (!sd_acceptance_queried) {
+ sd_acceptance_queried = true;
if (sd_acceptance_handler_) {
+ accept_state.sd_acceptance_required_
+ = configuration_->is_protected_device(_sender);
remote_info_t remote;
- remote.port_ = ANY_PORT;
remote.first_ = ANY_PORT;
remote.last_ = ANY_PORT;
remote.is_range_ = false;
- if (configuration_->sd_acceptance_required(_sender, ANY_PORT)) {
- if (_sender.is_v4()) {
- remote.ip_.address_.v4_ = _sender.to_v4().to_bytes();
- remote.ip_.is_v4_ = true;
- } else {
- remote.ip_.address_.v6_ = _sender.to_v6().to_bytes();
- remote.ip_.is_v4_ = false;
- }
- accept_offers = sd_acceptance_handler_(remote);
- if (!accept_offers && !expired_services) {
- VSOMEIP_WARNING << "service_discovery_impl::" << __func__
- << ": Do not accept offer / subscribe from "
- << std::hex << std::setw(4) << std::setfill('0')
- << _sender.to_string();
- remove_remote_offer_type_by_ip(_sender);
- host_->expire_subscriptions(_sender);
- host_->expire_services(_sender);
- expired_services = true;
- }
+ if (_sender.is_v4()) {
+ remote.ip_.address_.v4_ = _sender.to_v4().to_bytes();
+ remote.ip_.is_v4_ = true;
} else {
- accept_offers = true;
+ remote.ip_.address_.v6_ = _sender.to_v6().to_bytes();
+ remote.ip_.is_v4_ = false;
}
+ accept_state.accept_entries_ = sd_acceptance_handler_(remote);
} else {
- accept_offers = true;
+ accept_state.accept_entries_ = true;
}
- sd_acceptance_queried = true;
}
if ((*iter)->is_service_entry()) {
std::shared_ptr<serviceentry_impl> its_service_entry
@@ -1075,8 +1081,8 @@ service_discovery_impl::on_message(
bool its_unicast_flag = its_message->get_unicast_flag();
process_serviceentry(its_service_entry, its_options,
its_unicast_flag, its_resubscribes,
- received_via_mcast, accept_offers);
- } else if (accept_offers) {
+ received_via_mcast, accept_state);
+ } else {
std::shared_ptr<eventgroupentry_impl> its_eventgroup_entry
= std::dynamic_pointer_cast<eventgroupentry_impl>(*iter);
@@ -1095,7 +1101,8 @@ service_discovery_impl::on_message(
check_stop_subscribe_subscribe(iter, its_end, its_options);
process_eventgroupentry(its_eventgroup_entry, its_options,
its_acknowledgement, _destination,
- is_stop_subscribe_subscribe, force_initial_events);
+ is_stop_subscribe_subscribe, force_initial_events,
+ accept_state);
}
}
@@ -1139,7 +1146,8 @@ service_discovery_impl::process_serviceentry(
const std::vector<std::shared_ptr<option_impl> > &_options,
bool _unicast_flag,
std::vector<std::shared_ptr<message_impl> > &_resubscribes,
- bool _received_via_mcast, bool _accept_offers) {
+ bool _received_via_mcast,
+ const sd_acceptance_state_t& _sd_ac_state) {
// Read service info from entry
entry_type_e its_type = _entry->get_type();
@@ -1221,20 +1229,17 @@ service_discovery_impl::process_serviceentry(
its_major, its_minor, _unicast_flag);
break;
case entry_type_e::OFFER_SERVICE:
- if (_accept_offers) {
process_offerservice_serviceentry(its_service, its_instance,
its_major, its_minor, its_ttl,
its_reliable_address, its_reliable_port,
its_unreliable_address, its_unreliable_port, _resubscribes,
- _received_via_mcast);
- }
+ _received_via_mcast, _sd_ac_state);
break;
case entry_type_e::UNKNOWN:
default:
VSOMEIP_ERROR << __func__ << ": Unsupported service entry type";
}
-
- } else if (_accept_offers) {
+ } else if (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_) {
std::shared_ptr<request> its_request = find_request(its_service, its_instance);
if (its_request) {
std::lock_guard<std::mutex> its_lock(requested_mutex_);
@@ -1242,8 +1247,8 @@ service_discovery_impl::process_serviceentry(
its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
}
remove_remote_offer_type(its_service, its_instance,
- (its_reliable_port != ILLEGAL_PORT ?
- its_reliable_address : its_unreliable_address));
+ its_reliable_address, its_reliable_port,
+ its_unreliable_address, its_unreliable_port);
unsubscribe_all(its_service, its_instance);
if (!is_diagnosis_ && !is_suspended_) {
host_->del_routing_info(its_service, its_instance,
@@ -1262,7 +1267,7 @@ service_discovery_impl::process_offerservice_serviceentry(
const boost::asio::ip::address &_unreliable_address,
uint16_t _unreliable_port,
std::vector<std::shared_ptr<message_impl> > &_resubscribes,
- bool _received_via_mcast) {
+ bool _received_via_mcast, const sd_acceptance_state_t& _sd_ac_state) {
std::shared_ptr < runtime > its_runtime = runtime_.lock();
if (!its_runtime)
return;
@@ -1290,11 +1295,106 @@ service_discovery_impl::process_offerservice_serviceentry(
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
}
- if (update_remote_offer_type(_service,_instance, offer_type,
- _reliable_address, _unreliable_address)) {
+ if (_sd_ac_state.sd_acceptance_required_) {
+
+ auto expire_subscriptions_and_services =
+ [this, &_sd_ac_state](const boost::asio::ip::address& _address,
+ std::uint16_t _port, bool _reliable) {
+ const auto its_port_pair = std::make_pair(_reliable, _port);
+ if (_sd_ac_state.expired_ports_.find(its_port_pair) ==
+ _sd_ac_state.expired_ports_.end()) {
+ VSOMEIP_WARNING << "service_discovery_impl::" << __func__
+ << ": Do not accept offer from "
+ << _address.to_string() << ":"
+ << std::dec << _port << " reliable=" << _reliable;
+ remove_remote_offer_type_by_ip(_address, _port, _reliable);
+ host_->expire_subscriptions(_address, _port, _reliable);
+ host_->expire_services(_address, _port, _reliable);
+ _sd_ac_state.expired_ports_.insert(its_port_pair);
+ }
+ };
+
+ // return if the registered sd_acceptance handler returned false
+ // and for the provided port sd_acceptance is necessary
+ switch (offer_type) {
+ case remote_offer_type_e::UNRELIABLE:
+ if (!_sd_ac_state.accept_entries_
+ && configuration_->is_protected_port(
+ _unreliable_address, _unreliable_port, false)) {
+ expire_subscriptions_and_services(_unreliable_address,
+ _unreliable_port, false);
+ return;
+ }
+ break;
+ case remote_offer_type_e::RELIABLE:
+ if (!_sd_ac_state.accept_entries_
+ && configuration_->is_protected_port(
+ _reliable_address, _reliable_port, true)) {
+ expire_subscriptions_and_services(_reliable_address,
+ _reliable_port, true);
+ return;
+ }
+ break;
+ case remote_offer_type_e::RELIABLE_UNRELIABLE:
+ if (!_sd_ac_state.accept_entries_
+ && (configuration_->is_protected_port(
+ _unreliable_address, _unreliable_port, false)
+ || configuration_->is_protected_port(
+ _reliable_address, _reliable_port, true))) {
+ expire_subscriptions_and_services(_unreliable_address,
+ _unreliable_port, false);
+ expire_subscriptions_and_services(_reliable_address,
+ _reliable_port, true);
+ return;
+ }
+ break;
+ case remote_offer_type_e::UNKNOWN:
+ default:
+ break;
+ }
+ }
+
+ if (update_remote_offer_type(_service, _instance, offer_type,
+ _reliable_address, _reliable_port,
+ _unreliable_address, _unreliable_port)) {
VSOMEIP_WARNING << __func__ << ": Remote offer type changed ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
+
+ // Only update eventgroup reliability type if it was initially unknown
+ auto its_eventgroups = host_->get_subscribed_eventgroups(_service, _instance);
+ for (auto eg : its_eventgroups) {
+ auto its_info = host_->find_eventgroup(
+ _service, _instance, eg);
+ if (its_info) {
+ if (its_info->is_reliability_auto_mode()) {
+ reliability_type_e its_reliability(reliability_type_e::RT_UNKNOWN);
+ switch (offer_type) {
+ case remote_offer_type_e::RELIABLE:
+ its_reliability = reliability_type_e::RT_RELIABLE;
+ break;
+ case remote_offer_type_e::UNRELIABLE:
+ its_reliability = reliability_type_e::RT_UNRELIABLE;
+ break;
+ case remote_offer_type_e::RELIABLE_UNRELIABLE:
+ its_reliability = reliability_type_e::RT_BOTH;
+ break;
+ default:
+ ;
+ }
+ if (its_reliability != reliability_type_e::RT_UNKNOWN
+ && its_reliability != its_info->get_reliability()) {
+ VSOMEIP_WARNING << "sd::" << __func__ << ": eventgroup reliability type changed ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << eg << "]"
+ << " using reliability type: "
+ << std::hex << std::setw(4) << std::setfill('0') << (uint16_t) its_reliability;
+ its_info->set_reliability(its_reliability);
+ }
+ }
+ }
+ }
}
host_->add_routing_info(_service, _instance,
@@ -1314,8 +1414,6 @@ service_discovery_impl::process_offerservice_serviceentry(
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
if (0 < found_instance->second.size()) {
- const remote_offer_type_e its_offer_type =
- get_remote_offer_type(_service, _instance);
for (const auto& its_eventgroup : found_instance->second) {
auto its_subscription = its_eventgroup.second;
std::shared_ptr<endpoint> its_reliable, its_unreliable;
@@ -1333,8 +1431,12 @@ service_discovery_impl::process_offerservice_serviceentry(
subscription_state_e::ST_RESUBSCRIBING_NOT_ACKNOWLEDGED);
}
}
+ const reliability_type_e its_reliability =
+ get_eventgroup_reliability(_service, _instance,
+ its_eventgroup.first, its_subscription);
+
auto its_data = create_eventgroup_entry(_service, _instance,
- its_eventgroup.first, its_subscription, its_offer_type);
+ its_eventgroup.first, its_subscription, its_reliability);
if (its_data.entry_) {
add_entry_data(_resubscribes, its_data);
}
@@ -1443,8 +1545,6 @@ service_discovery_impl::on_endpoint_connected(
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
if (0 < found_instance->second.size()) {
- const remote_offer_type_e its_offer_type =
- get_remote_offer_type(_service, _instance);
for (const auto &its_eventgroup : found_instance->second) {
std::shared_ptr<subscription> its_subscription(its_eventgroup.second);
if (its_subscription) {
@@ -1487,8 +1587,10 @@ service_discovery_impl::on_endpoint_connected(
its_subscription->set_state(its_client,
subscription_state_e::ST_NOT_ACKNOWLEDGED);
+ const reliability_type_e its_reliability_type =
+ get_eventgroup_reliability(_service, _instance, its_eventgroup.first, its_subscription);
auto its_data = create_eventgroup_entry(_service, _instance,
- its_eventgroup.first, its_subscription, its_offer_type);
+ its_eventgroup.first, its_subscription, its_reliability_type);
if (its_data.entry_) {
add_entry_data(its_messages, its_data);
@@ -1571,7 +1673,10 @@ service_discovery_impl::process_eventgroupentry(
const std::vector<std::shared_ptr<option_impl> > &_options,
std::shared_ptr<remote_subscription_ack> &_acknowledgement,
const boost::asio::ip::address &_destination,
- bool _is_stop_subscribe_subscribe, bool _force_initial_events) {
+ bool _is_stop_subscribe_subscribe, bool _force_initial_events,
+ const sd_acceptance_state_t& _sd_ac_state) {
+
+ std::set<client_t> its_clients({0}); // maybe overridden for selectives
auto its_sender = _acknowledgement->get_target_address();
auto its_session = _entry->get_owning_message()->get_session();
@@ -1601,7 +1706,7 @@ service_discovery_impl::process_eventgroupentry(
<< "] session: " << std::hex << std::setw(4) << std::setfill('0')
<< its_session << ", ttl: " << its_ttl;
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
} else {
// We received a subscription [n]ack for an eventgroup that does not exist.
@@ -1617,7 +1722,7 @@ service_discovery_impl::process_eventgroupentry(
<< its_sender.to_string(ec) << " session: "
<< std::hex << std::setw(4) << std::setfill('0') << its_session;
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
return;
}
@@ -1630,7 +1735,7 @@ service_discovery_impl::process_eventgroupentry(
<< its_sender.to_string(ec) << " session: "
<< std::hex << std::setw(4) << std::setfill('0') << its_session;
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
return;
}
@@ -1644,7 +1749,7 @@ service_discovery_impl::process_eventgroupentry(
if (its_ttl > 0) {
// increase number of required acks by one as number required acks
// is calculated based on the number of referenced options
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
return;
}
@@ -1655,7 +1760,7 @@ service_discovery_impl::process_eventgroupentry(
<< its_sender.to_string(ec) << " session: "
<< std::hex << std::setw(4) << std::setfill('0') << its_session;
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
return;
}
@@ -1672,7 +1777,7 @@ service_discovery_impl::process_eventgroupentry(
<< std::hex << std::setw(4) << std::setfill('0') << its_session;
if (its_ttl > 0) {
// set to 0 to ensure an answer containing at least this subscribe_nack is sent out
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
return;
}
@@ -1697,7 +1802,6 @@ service_discovery_impl::process_eventgroupentry(
boost::asio::ip::address its_second_address;
uint16_t its_second_port(ILLEGAL_PORT);
bool is_second_reliable(false);
- std::set<client_t> its_clients({0}); // maybe overridden for selectives
for (auto i : { 1, 2 }) {
for (auto its_index : _entry->get_options(uint8_t(i))) {
@@ -1718,7 +1822,7 @@ service_discovery_impl::process_eventgroupentry(
<< std::hex << std::setw(4) << std::setfill('0')
<< its_session;
if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type && its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
return;
}
@@ -1733,7 +1837,7 @@ service_discovery_impl::process_eventgroupentry(
its_ipv4_option->get_address());
if (!check_layer_four_protocol(its_ipv4_option)) {
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
return;
}
@@ -1749,7 +1853,7 @@ service_discovery_impl::process_eventgroupentry(
if (is_first_reliable == is_second_reliable
&& its_second_port != ILLEGAL_PORT) {
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
boost::system::error_code ec;
VSOMEIP_ERROR << __func__
@@ -1763,7 +1867,7 @@ service_discovery_impl::process_eventgroupentry(
if (!check_ipv4_address(its_first_address)
|| 0 == its_first_port) {
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
boost::system::error_code ec;
VSOMEIP_ERROR << __func__
@@ -1784,7 +1888,7 @@ service_discovery_impl::process_eventgroupentry(
if (is_second_reliable == is_first_reliable
&& its_first_port != ILLEGAL_PORT) {
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
boost::system::error_code ec;
VSOMEIP_ERROR << __func__
@@ -1798,7 +1902,7 @@ service_discovery_impl::process_eventgroupentry(
if (!check_ipv4_address(its_second_address)
|| 0 == its_second_port) {
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
boost::system::error_code ec;
VSOMEIP_ERROR << __func__
@@ -1829,7 +1933,7 @@ service_discovery_impl::process_eventgroupentry(
its_ipv6_option->get_address());
if (!check_layer_four_protocol(its_ipv6_option)) {
if(its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
boost::system::error_code ec;
VSOMEIP_ERROR << "Invalid layer 4 protocol type in IPv6 endpoint option specified! "
@@ -1847,7 +1951,7 @@ service_discovery_impl::process_eventgroupentry(
if (is_first_reliable == is_second_reliable
&& its_second_port != ILLEGAL_PORT) {
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
boost::system::error_code ec;
VSOMEIP_ERROR << __func__
@@ -1867,7 +1971,7 @@ service_discovery_impl::process_eventgroupentry(
if (is_second_reliable == is_first_reliable
&& its_first_port != ILLEGAL_PORT) {
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
}
boost::system::error_code ec;
VSOMEIP_ERROR << __func__
@@ -1983,7 +2087,7 @@ service_discovery_impl::process_eventgroupentry(
<< its_sender.to_string(ec) << " session: "
<< std::hex << std::setw(4) << std::setfill('0') << its_session;
if (its_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
return;
}
break;
@@ -1997,7 +2101,7 @@ service_discovery_impl::process_eventgroupentry(
its_first_address, its_first_port, is_first_reliable,
its_second_address, its_second_port, is_second_reliable,
_acknowledgement, _is_stop_subscribe_subscribe,
- _force_initial_events, its_clients, its_info);
+ _force_initial_events, its_clients, _sd_ac_state, its_info);
} else {
if (entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) { //this type is used for ACK and NACK messages
if (its_ttl > 0) {
@@ -2025,6 +2129,7 @@ service_discovery_impl::handle_eventgroup_subscription(
std::shared_ptr<remote_subscription_ack> &_acknowledgement,
bool _is_stop_subscribe_subscribe, bool _force_initial_events,
const std::set<client_t> &_clients,
+ const sd_acceptance_state_t& _sd_ac_state,
const std::shared_ptr<eventgroupinfo>& _info) {
(void)_counter;
(void)_reserved;
@@ -2062,7 +2167,7 @@ service_discovery_impl::handle_eventgroup_subscription(
}
}
if (reliablility_nack && _ttl > 0) {
- insert_subscription_ack(_acknowledgement, _info, 0);
+ insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients);
boost::system::error_code ec;
// TODO: Add sender and session id
VSOMEIP_WARNING << __func__
@@ -2103,13 +2208,13 @@ service_discovery_impl::handle_eventgroup_subscription(
<< (uint32_t) _info->get_major() << "] subscriber: "
<< _first_address.to_string(ec) << ":" << std::dec << _first_port;
if (_ttl > 0) {
- insert_subscription_ack(_acknowledgement, its_info, 0);
+ insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, _clients);
}
return;
} else {
boost::asio::ip::address its_first_address, its_second_address;
- uint16_t its_first_port, its_second_port;
if (ILLEGAL_PORT != _first_port) {
+ uint16_t its_first_port(0);
its_subscriber = endpoint_definition::get(
_first_address, _first_port, _is_first_reliable, _service, _instance);
if (!_is_first_reliable &&
@@ -2121,7 +2226,7 @@ service_discovery_impl::handle_eventgroup_subscription(
its_reliable = its_subscriber;
// check if TCP connection is established by client
if (_ttl > 0 && !is_tcp_connected(_service, _instance, its_reliable)) {
- insert_subscription_ack(_acknowledgement, _info, 0);
+ insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients);
boost::system::error_code ec;
// TODO: Add sender and session id
VSOMEIP_ERROR << "TCP connection to target1: ["
@@ -2139,6 +2244,7 @@ service_discovery_impl::handle_eventgroup_subscription(
}
if (ILLEGAL_PORT != _second_port) {
+ uint16_t its_second_port(0);
its_subscriber = endpoint_definition::get(
_second_address, _second_port, _is_second_reliable, _service, _instance);
if (!_is_second_reliable &&
@@ -2150,7 +2256,7 @@ service_discovery_impl::handle_eventgroup_subscription(
its_reliable = its_subscriber;
// check if TCP connection is established by client
if (_ttl > 0 && !is_tcp_connected(_service, _instance, its_reliable)) {
- insert_subscription_ack(_acknowledgement, _info, 0);
+ insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients);
boost::system::error_code ec;
// TODO: Add sender and session id
VSOMEIP_ERROR << "TCP connection to target2 : ["
@@ -2168,6 +2274,26 @@ service_discovery_impl::handle_eventgroup_subscription(
}
}
+ // check if the subscription should be rejected because of sd_acceptance_handling
+ if (_ttl > 0 && _sd_ac_state.sd_acceptance_required_) {
+ bool insert_nack(false);
+ if (_first_port != ILLEGAL_PORT && !_sd_ac_state.accept_entries_
+ && configuration_->is_protected_port(_first_address,
+ _first_port, _is_first_reliable)) {
+ insert_nack = true;
+ }
+ if (!insert_nack && _second_port != ILLEGAL_PORT
+ && !_sd_ac_state.accept_entries_
+ && configuration_->is_protected_port(_second_address,
+ _second_port, _is_second_reliable)) {
+ insert_nack = true;
+ }
+ if (insert_nack) {
+ insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients);
+ return;
+ }
+ }
+
// Create subscription object
auto its_subscription = std::make_shared<remote_subscription>();
its_subscription->set_eventgroupinfo(_info);
@@ -2953,9 +3079,10 @@ service_discovery_impl::last_offer_shorter_half_offer_delay_ago() {
bool
service_discovery_impl::check_source_address(
const boost::asio::ip::address &its_source_address) const {
+
bool is_valid = true;
// Check if source address is same as nodes unicast address
- if(unicast_ == its_source_address) {
+ if (unicast_ == its_source_address) {
VSOMEIP_ERROR << "Source address of message is same as DUT's unicast address! : "
<< its_source_address.to_string();
is_valid = false;
@@ -2965,18 +3092,21 @@ service_discovery_impl::check_source_address(
void
service_discovery_impl::set_diagnosis_mode(const bool _activate) {
+
is_diagnosis_ = _activate;
}
bool
service_discovery_impl::get_diagnosis_mode() {
+
return is_diagnosis_;
}
void
service_discovery_impl::update_remote_subscription(
const std::shared_ptr<remote_subscription> &_subscription) {
- if (!_subscription->is_pending()) {
+
+ if (!_subscription->is_pending() || 0 == _subscription->get_answers()) {
std::shared_ptr<remote_subscription_ack> its_ack;
{
std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
@@ -2995,13 +3125,15 @@ service_discovery_impl::update_remote_subscription(
void
service_discovery_impl::update_acknowledgement(
const std::shared_ptr<remote_subscription_ack> &_acknowledgement) {
+
if (_acknowledgement->is_complete()
&& !_acknowledgement->is_pending()
&& !_acknowledgement->is_done()) {
+
send_subscription_ack(_acknowledgement);
std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
- for (const auto& its_subscription : _acknowledgement->get_subscriptions())
+ for (const auto &its_subscription : _acknowledgement->get_subscriptions())
pending_remote_subscriptions_.erase(its_subscription);
}
}
@@ -3241,7 +3373,9 @@ service_discovery_impl::update_remote_offer_type(
service_t _service, instance_t _instance,
remote_offer_type_e _offer_type,
const boost::asio::ip::address &_reliable_address,
- const boost::asio::ip::address &_unreliable_address) {
+ std::uint16_t _reliable_port,
+ const boost::asio::ip::address &_unreliable_address,
+ std::uint16_t _unreliable_port) {
bool ret(false);
std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
const std::pair<service_t, instance_t> its_si_pair = std::make_pair(_service, _instance);
@@ -3256,13 +3390,18 @@ service_discovery_impl::update_remote_offer_type(
}
switch (_offer_type) {
case remote_offer_type_e::UNRELIABLE:
- remote_offers_by_ip_[_unreliable_address].insert(its_si_pair);
+ remote_offers_by_ip_[_unreliable_address][std::make_pair(false,
+ _unreliable_port)].insert(its_si_pair);
break;
case remote_offer_type_e::RELIABLE:
- remote_offers_by_ip_[_reliable_address].insert(its_si_pair);
+ remote_offers_by_ip_[_reliable_address][std::make_pair(true,
+ _reliable_port)].insert(its_si_pair);
break;
case remote_offer_type_e::RELIABLE_UNRELIABLE:
- remote_offers_by_ip_[_unreliable_address].insert(its_si_pair);
+ remote_offers_by_ip_[_unreliable_address][std::make_pair(false,
+ _unreliable_port)].insert(its_si_pair);
+ remote_offers_by_ip_[_unreliable_address][std::make_pair(true,
+ _reliable_port)].insert(its_si_pair);
break;
case remote_offer_type_e::UNKNOWN:
default:
@@ -3278,35 +3417,83 @@ service_discovery_impl::update_remote_offer_type(
void
service_discovery_impl::remove_remote_offer_type(
service_t _service, instance_t _instance,
- const boost::asio::ip::address &_address) {
+ const boost::asio::ip::address &_reliable_address,
+ std::uint16_t _reliable_port,
+ const boost::asio::ip::address &_unreliable_address,
+ std::uint16_t _unreliable_port) {
std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
const std::pair<service_t, instance_t> its_si_pair =
std::make_pair(_service, _instance);
remote_offer_types_.erase(its_si_pair);
- auto found_services = remote_offers_by_ip_.find(_address);
- if (found_services != remote_offers_by_ip_.end()) {
- found_services->second.erase(its_si_pair);
+
+ auto delete_from_remote_offers_by_ip = [&](
+ const boost::asio::ip::address& _address, std::uint16_t _port,
+ bool _reliable) {
+ const auto found_address = remote_offers_by_ip_.find(_address);
+ if (found_address != remote_offers_by_ip_.end()) {
+ auto found_port = found_address->second.find(
+ std::make_pair(_reliable, _port));
+ if (found_port != found_address->second.end()) {
+ if (found_port->second.erase(std::make_pair(_service, _instance))) {
+ if (found_port->second.empty()) {
+ found_address->second.erase(found_port);
+ if (found_address->second.empty()) {
+ remote_offers_by_ip_.erase(found_address);
+ }
+ }
+ }
+ }
+ }
+ };
+ if (_reliable_port != ILLEGAL_PORT) {
+ delete_from_remote_offers_by_ip(_reliable_address, _reliable_port,
+ true);
+ }
+ if (_unreliable_port != ILLEGAL_PORT) {
+ delete_from_remote_offers_by_ip(_unreliable_address, _unreliable_port,
+ false);
}
}
void service_discovery_impl::remove_remote_offer_type_by_ip(
const boost::asio::ip::address &_address) {
+ remove_remote_offer_type_by_ip(_address, ANY_PORT, false);
+}
+
+void service_discovery_impl::remove_remote_offer_type_by_ip(
+ const boost::asio::ip::address &_address, std::uint16_t _port, bool _reliable) {
std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
- auto found_services = remote_offers_by_ip_.find(_address);
- if (found_services != remote_offers_by_ip_.end()) {
- for (const auto& si : found_services->second) {
- remote_offer_types_.erase(si);
+ const auto found_address = remote_offers_by_ip_.find(_address);
+ if (found_address != remote_offers_by_ip_.end()) {
+ if (_port == ANY_PORT) {
+ for (const auto& port : found_address->second) {
+ for (const auto& si : port.second) {
+ remote_offer_types_.erase(si);
+ }
+ }
+ remote_offers_by_ip_.erase(_address);
+ } else {
+ const auto its_port_reliability = std::make_pair(_reliable, _port);
+ const auto found_port = found_address->second.find(its_port_reliability);
+ if (found_port != found_address->second.end()) {
+ for (const auto& si : found_port->second) {
+ remote_offer_types_.erase(si);
+ }
+ found_address->second.erase(found_port);
+ if (found_address->second.empty()) {
+ remote_offers_by_ip_.erase(found_address);
+ }
+ }
}
}
- remote_offers_by_ip_.erase(_address);
}
std::shared_ptr<subscription>
service_discovery_impl::create_subscription(
- service_t _service, instance_t _instance, eventgroup_t _eventgroup,
major_version_t _major, ttl_t _ttl,
const std::shared_ptr<endpoint> &_reliable,
- const std::shared_ptr<endpoint> &_unreliable) {
+ const std::shared_ptr<endpoint> &_unreliable,
+ const std::shared_ptr<eventgroupinfo> &_info) {
auto its_subscription = std::make_shared<subscription>();
its_subscription->set_major(_major);
its_subscription->set_ttl(_ttl);
@@ -3322,10 +3509,9 @@ service_discovery_impl::create_subscription(
}
// check whether the eventgroup is selective
- auto its_eventgroup = host_->find_eventgroup(_service, _instance, _eventgroup);
- if (its_eventgroup) {
- its_subscription->set_selective(its_eventgroup->is_selective());
- }
+ its_subscription->set_selective(_info->is_selective());
+
+ its_subscription->set_eventgroupinfo(_info);
return its_subscription;
}
@@ -3333,33 +3519,70 @@ service_discovery_impl::create_subscription(
void
service_discovery_impl::send_subscription_ack(
const std::shared_ptr<remote_subscription_ack> &_acknowledgement) {
+
if (_acknowledgement->is_done())
return;
_acknowledgement->done();
- std::uint32_t its_answers(1);
+ std::uint32_t its_max_answers(1); // Must be 1 as "_acknowledgement" not
+ // necessarily contains subscriptions
+ bool do_not_answer(false);
+ std::shared_ptr<remote_subscription> its_parent;
// Find highest number of necessary answers
for (const auto& its_subscription : _acknowledgement->get_subscriptions()) {
- if (its_subscription->get_answers() > its_answers)
- its_answers = its_subscription->get_answers();
+ auto its_answers = its_subscription->get_answers();
+ if (its_answers > its_max_answers) {
+ its_max_answers = its_answers;
+ } else if (its_answers == 0) {
+ do_not_answer = true;
+ its_parent = its_subscription->get_parent();
+ }
+ }
+
+ if (do_not_answer) {
+ if (its_parent) {
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ auto its_parent_ack = pending_remote_subscriptions_[its_parent];
+ if (its_parent_ack) {
+ for (const auto &its_subscription : its_parent_ack->get_subscriptions()) {
+ if (its_subscription != its_parent)
+ its_subscription->set_answers(its_subscription->get_answers() + 1);
+ }
+ }
+ }
+ return;
}
// send messages
- for (std::uint32_t i = 0; i < its_answers; i++) {
- for (const auto& its_subscription : _acknowledgement->get_subscriptions()) {
+ for (std::uint32_t i = 0; i < its_max_answers; i++) {
+ for (const auto &its_subscription : _acknowledgement->get_subscriptions()) {
if (i < its_subscription->get_answers()) {
if (its_subscription->get_ttl() > 0) {
auto its_info = its_subscription->get_eventgroupinfo();
if (its_info) {
+ std::set<client_t> its_acked;
+ std::set<client_t> its_nacked;
for (const auto& its_client : its_subscription->get_clients()) {
- auto its_ttl = (its_subscription->get_client_state(its_client)
- == remote_subscription_state_e::SUBSCRIPTION_ACKED ?
- its_subscription->get_ttl() : 0);
+ if (its_subscription->get_client_state(its_client)
+ == remote_subscription_state_e::SUBSCRIPTION_ACKED) {
+ its_acked.insert(its_client);
+ } else {
+ its_nacked.insert(its_client);
+ }
+ }
- insert_subscription_ack(_acknowledgement, its_info, its_ttl,
- its_subscription->get_subscriber(), its_client);
+ if (0 < its_acked.size()) {
+ insert_subscription_ack(_acknowledgement, its_info,
+ its_subscription->get_ttl(),
+ its_subscription->get_subscriber(), its_acked);
+ }
+
+ if (0 < its_nacked.size()) {
+ insert_subscription_ack(_acknowledgement, its_info,
+ 0,
+ its_subscription->get_subscriber(), its_nacked);
}
}
}
@@ -3371,6 +3594,8 @@ service_discovery_impl::send_subscription_ack(
update_subscription_expiration_timer(its_messages);
}
+ std::this_thread::yield();
+
// We might need to send initial events
for (const auto &its_subscription : _acknowledgement->get_subscriptions()) {
// Assumption: We do _NOT_ need to check whether this is a child
@@ -3430,5 +3655,58 @@ service_discovery_impl::register_reboot_notification_handler(
reboot_notification_handler_ = _handler;
}
+reliability_type_e service_discovery_impl::get_eventgroup_reliability(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ const std::shared_ptr<subscription>& _subscription) {
+ reliability_type_e its_reliability = reliability_type_e::RT_UNKNOWN;
+ auto its_info = _subscription->get_eventgroupinfo().lock();
+ if (its_info) {
+ its_reliability = its_info->get_reliability();
+ if (its_reliability == reliability_type_e::RT_UNKNOWN
+ && its_info->is_reliability_auto_mode()) {
+ // fallback: determine how service is offered
+ // and update reliability type of eventgroup
+ switch (get_remote_offer_type(_service, _instance)) {
+ case remote_offer_type_e::RELIABLE:
+ its_reliability = reliability_type_e::RT_RELIABLE;
+ break;
+ case remote_offer_type_e::UNRELIABLE:
+ its_reliability = reliability_type_e::RT_UNRELIABLE;
+ break;
+ case remote_offer_type_e::RELIABLE_UNRELIABLE:
+ its_reliability = reliability_type_e::RT_BOTH;
+ break;
+ default:
+ ;
+ }
+ VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't determine eventgroup reliability type for ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
+ << " using reliability type: "
+ << std::hex << std::setw(4) << std::setfill('0') << (uint16_t) its_reliability;
+ its_info->set_reliability(its_reliability);
+ }
+ } else {
+ VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't lock eventgroupinfo ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] ";
+ auto its_eg_info = host_->find_eventgroup(_service, _instance, _eventgroup);
+ if (its_eg_info) {
+ _subscription->set_eventgroupinfo(its_eg_info);
+ its_reliability = its_eg_info->get_reliability();
+ }
+ }
+
+ if (its_reliability == reliability_type_e::RT_UNKNOWN) {
+ VSOMEIP_WARNING << "sd::" << __func__ << ": eventgroup reliability type is unknown ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ }
+ return its_reliability;
+}
+
} // namespace sd
} // namespace vsomeip_v3