summaryrefslogtreecommitdiff
path: root/implementation/service_discovery/src/service_discovery_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp258
1 files changed, 170 insertions, 88 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index ca4e131..74e509b 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -165,9 +165,10 @@ service_discovery_impl::start() {
}
}
if (endpoint_ && !reliable_) {
- // rejoin multicast group
- dynamic_cast<udp_server_endpoint_impl*>(
- endpoint_.get())->join(sd_multicast_);
+ auto its_endpoint = std::dynamic_pointer_cast<
+ udp_server_endpoint_impl>(endpoint_);
+ if (its_endpoint)
+ its_endpoint->join(sd_multicast_);
}
}
is_suspended_ = false;
@@ -213,17 +214,17 @@ service_discovery_impl::release_service(
}
}
-std::shared_ptr<request>
-service_discovery_impl::find_request(service_t _service, instance_t _instance) {
+void
+service_discovery_impl::update_request(service_t _service, instance_t _instance) {
std::lock_guard<std::mutex> its_lock(requested_mutex_);
auto find_service = requested_.find(_service);
if (find_service != requested_.end()) {
auto find_instance = find_service->second.find(_instance);
if (find_instance != find_service->second.end()) {
- return find_instance->second;
+ find_instance->second->set_sent_counter(
+ std::uint8_t(repetitions_max_ + 1));
}
}
- return nullptr;
}
void
@@ -232,6 +233,13 @@ service_discovery_impl::subscribe(
eventgroup_t _eventgroup, major_version_t _major,
ttl_t _ttl, client_t _client,
const std::shared_ptr<eventgroupinfo> &_info) {
+
+ if (is_suspended_) {
+ VSOMEIP_WARNING << "service_discovery::" << __func__
+ << ": Ignoring subscription as we are suspended.";
+ return;
+ }
+
#ifdef VSOMEIP_ENABLE_COMPAT
bool is_selective(_info ? _info->is_selective() : false);
#endif // VSOMEIP_ENABLE_COMPAT
@@ -248,8 +256,8 @@ service_discovery_impl::subscribe(
if (!its_subscription->is_selective() && is_selective) {
its_subscription->set_selective(true);
its_subscription->remove_client(VSOMEIP_ROUTING_CLIENT);
- for (const auto& e : _info->get_events()) {
- for (const auto& c : e->get_subscribers(_eventgroup)) {
+ for (const auto &e : _info->get_events()) {
+ for (const auto &c : e->get_subscribers(_eventgroup)) {
its_subscription->add_client(c);
}
}
@@ -517,23 +525,65 @@ service_discovery_impl::unsubscribe_all(
}
void
-service_discovery_impl::reset_subscriptions(
+service_discovery_impl::unsubscribe_all_on_suspend() {
+
+ std::map<boost::asio::ip::address,
+ std::vector<std::shared_ptr<message_impl> > > its_stopsubscribes;
+
+ {
+ std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
+ for (auto its_service : subscribed_) {
+ for (auto its_instance : its_service.second) {
+ for (auto &its_eventgroup : its_instance.second) {
+ boost::asio::ip::address its_address;
+ auto its_current_message = std::make_shared<message_impl>();
+ auto its_subscription = its_eventgroup.second;
+ its_subscription->set_ttl(0);
+ const reliability_type_e its_reliability =
+ get_eventgroup_reliability(its_service.first, its_instance.first,
+ its_eventgroup.first, its_subscription);
+ auto its_data = create_eventgroup_entry(its_service.first, its_instance.first,
+ its_eventgroup.first, its_subscription, its_reliability);
+ auto its_reliable = its_subscription->get_endpoint(true);
+ auto its_unreliable = its_subscription->get_endpoint(false);
+ get_subscription_address(
+ its_reliable, its_unreliable, its_address);
+ if (its_data.entry_
+ && its_current_message->add_entry_data(its_data.entry_, its_data.options_)) {
+ its_stopsubscribes[its_address].push_back(its_current_message);
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Failed to create StopSubscribe entry for: "
+ << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup.first
+ << " address: " << its_address.to_string();
+ }
+ }
+ its_instance.second.clear();
+ }
+ its_service.second.clear();
+ }
+ subscribed_.clear();
+ }
+
+ for (auto its_address : its_stopsubscribes) {
+ if (!serialize_and_send(its_address.second, its_address.first)) {
+ VSOMEIP_WARNING << __func__ << ": Failed to send StopSubscribe to address: "
+ << its_address.first.to_string();
+ }
+ }
+}
+
+void
+service_discovery_impl::remove_subscriptions(
service_t _service, instance_t _instance) {
std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
auto found_service = subscribed_.find(_service);
if (found_service != subscribed_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- for (auto &its_eventgroup : found_instance->second) {
- auto its_subscription = its_eventgroup.second;
- for (auto its_client : its_subscription->get_clients()) {
- its_subscription->set_state(its_client,
- subscription_state_e::ST_UNKNOWN);
- }
- its_subscription->set_endpoint(nullptr, true);
- its_subscription->set_endpoint(nullptr, false);
- }
+ found_service->second.erase(_instance);
+ if (found_service->second.empty()) {
+ subscribed_.erase(found_service);
}
}
}
@@ -641,16 +691,16 @@ void
service_discovery_impl::insert_find_entries(
std::vector<std::shared_ptr<message_impl> > &_messages,
const requests_t &_requests) {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
entry_data_t its_data;
its_data.entry_ = its_data.other_ = nullptr;
for (const auto& its_service : _requests) {
for (const auto& its_instance : its_service.second) {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
auto its_request = its_instance.second;
- // check if release_service was called
+ // check if release_service was called / offer was received
auto the_service = requested_.find(its_service.first);
if ( the_service != requested_.end() ) {
auto the_instance = the_service->second.find(its_instance.first);
@@ -722,17 +772,37 @@ service_discovery_impl::create_eventgroup_entry(
case reliability_type_e::RT_RELIABLE:
if (its_reliable_endpoint) {
insert_reliable = true;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
+ "reliable endpoint is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
}
break;
case reliability_type_e::RT_UNRELIABLE:
if (its_unreliable_endpoint) {
insert_unreliable = true;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
+ "unreliable endpoint is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
}
break;
case reliability_type_e::RT_BOTH:
if (its_reliable_endpoint && its_unreliable_endpoint) {
insert_reliable = true;
insert_unreliable = true;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
+ "endpoint is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
+ << " reliable: " << !!its_reliable_endpoint
+ << " unreliable: " << !!its_unreliable_endpoint;
}
break;
default:
@@ -817,7 +887,6 @@ service_discovery_impl::create_eventgroup_entry(
its_entry->set_counter(0);
its_entry->set_major_version(_subscription->get_major());
its_entry->set_ttl(_subscription->get_ttl());
-
its_data.entry_ = its_entry;
}
@@ -1008,14 +1077,18 @@ service_discovery_impl::on_message(
}
const bool received_via_mcast = (_destination == sd_multicast_address_);
if (received_via_mcast) {
- std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
+ static bool must_start_last_msg_received_timer(true);
boost::system::error_code ec;
- last_msg_received_timer_.cancel(ec);
- last_msg_received_timer_.expires_from_now(
- last_msg_received_timer_timeout_, ec);
- last_msg_received_timer_.async_wait(
- std::bind(&service_discovery_impl::on_last_msg_received_timer_expired,
- shared_from_this(), std::placeholders::_1));
+
+ std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
+ if (0 < last_msg_received_timer_.cancel(ec) || must_start_last_msg_received_timer) {
+ must_start_last_msg_received_timer = false;
+ last_msg_received_timer_.expires_from_now(
+ last_msg_received_timer_timeout_, ec);
+ last_msg_received_timer_.async_wait(
+ std::bind(&service_discovery_impl::on_last_msg_received_timer_expired,
+ shared_from_this(), std::placeholders::_1));
+ }
}
current_remote_address_ = _sender;
@@ -1256,17 +1329,15 @@ service_discovery_impl::process_serviceentry(
default:
VSOMEIP_ERROR << __func__ << ": Unsupported service entry type";
}
- } else if (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_) {
- std::shared_ptr<request> its_request = find_request(its_service, its_instance);
- if (its_request) {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
- // ID: SIP_SD_830
- its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
- }
+ } else if (its_type != entry_type_e::FIND_SERVICE
+ && (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_)) {
+ // stop sending find service in repetition phase
+ update_request(its_service, its_instance);
+
remove_remote_offer_type(its_service, its_instance,
its_reliable_address, its_reliable_port,
its_unreliable_address, its_unreliable_port);
- reset_subscriptions(its_service, its_instance);
+ remove_subscriptions(its_service, its_instance);
if (!is_diagnosis_ && !is_suspended_) {
host_->del_routing_info(its_service, its_instance,
(its_reliable_port != ILLEGAL_PORT),
@@ -1304,11 +1375,9 @@ service_discovery_impl::process_offerservice_serviceentry(
return;
}
- std::shared_ptr<request> its_request = find_request(_service, _instance);
- if (its_request) {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
- its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
- }
+ // stop sending find service in repetition phase
+ update_request(_service, _instance);
+
remote_offer_type_e offer_type(remote_offer_type_e::UNKNOWN);
if (_reliable_port != ILLEGAL_PORT
&& _unreliable_port != ILLEGAL_PORT
@@ -1430,17 +1499,9 @@ service_discovery_impl::process_offerservice_serviceentry(
}
}
- host_->add_routing_info(_service, _instance,
- _major, _minor,
- _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_),
- _reliable_address, _reliable_port,
- _unreliable_address, _unreliable_port);
// No need to resubscribe for unicast offers
if (_received_via_mcast) {
- std::int32_t its_remaining = VSOMEIP_MAX_UDP_MESSAGE_SIZE;
- its_remaining -= _resubscribes.back()->get_size();
-
std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
auto found_service = subscribed_.find(_service);
if (found_service != subscribed_.end()) {
@@ -1482,6 +1543,12 @@ service_discovery_impl::process_offerservice_serviceentry(
}
}
}
+
+ host_->add_routing_info(_service, _instance,
+ _major, _minor,
+ _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_),
+ _reliable_address, _reliable_port,
+ _unreliable_address, _unreliable_port);
}
void
@@ -1616,7 +1683,7 @@ service_discovery_impl::on_endpoint_connected(
its_subscription->set_endpoint(its_reliable, true);
its_subscription->set_endpoint(its_unreliable, false);
- for (const auto& its_client : its_subscription->get_clients())
+ for (const auto its_client : its_subscription->get_clients())
its_subscription->set_state(its_client,
subscription_state_e::ST_NOT_ACKNOWLEDGED);
@@ -1726,7 +1793,7 @@ service_discovery_impl::process_eventgroupentry(
// We received a subscription for a non-existing eventgroup.
// --> Create dummy eventgroupinfo to send Nack.
its_info = std::make_shared<eventgroupinfo>(its_service, its_instance,
- its_eventgroup, its_major, its_ttl);
+ its_eventgroup, its_major, its_ttl, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS);
boost::system::error_code ec;
VSOMEIP_ERROR << __func__
<< ": Received a SubscribeEventGroup entry for unknown eventgroup "
@@ -1743,6 +1810,16 @@ service_discovery_impl::process_eventgroupentry(
// We received a subscription [n]ack for an eventgroup that does not exist.
// --> Remove subscription.
unsubscribe(its_service, its_instance, its_eventgroup, VSOMEIP_ROUTING_CLIENT);
+
+ boost::system::error_code ec;
+ VSOMEIP_WARNING << __func__
+ << ": Received a SubscribeEventGroup[N]Ack entry for unknown eventgroup "
+ << " from: " << its_sender.to_string(ec) << " for: ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup
+ << "] session: " << std::hex << std::setw(4) << std::setfill('0')
+ << its_session << ", ttl: " << its_ttl;
}
return;
}
@@ -2226,7 +2303,7 @@ service_discovery_impl::handle_eventgroup_subscription(
if (_major != _info->get_major()) {
// Create a temporary info object with TTL=0 --> send NACK
auto its_info = std::make_shared<eventgroupinfo>(_service, _instance,
- _eventgroup, _major, 0);
+ _eventgroup, _major, 0, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS);
boost::system::error_code ec;
// TODO: Add session id
VSOMEIP_ERROR << __func__
@@ -2325,42 +2402,44 @@ service_discovery_impl::handle_eventgroup_subscription(
}
}
- // Create subscription object
- auto its_subscription = std::make_shared<remote_subscription>();
- its_subscription->set_eventgroupinfo(_info);
- its_subscription->set_subscriber(its_subscriber);
- its_subscription->set_reliable(its_reliable);
- its_subscription->set_unreliable(its_unreliable);
- its_subscription->reset(_clients);
+ if (its_subscriber) {
+ // Create subscription object
+ auto its_subscription = std::make_shared<remote_subscription>();
+ its_subscription->set_eventgroupinfo(_info);
+ its_subscription->set_subscriber(its_subscriber);
+ its_subscription->set_reliable(its_reliable);
+ its_subscription->set_unreliable(its_unreliable);
+ its_subscription->reset(_clients);
- if (_ttl == 0) { // --> unsubscribe
- its_subscription->set_ttl(0);
- if (!_is_stop_subscribe_subscribe) {
- {
- std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
- pending_remote_subscriptions_[its_subscription] = _acknowledgement;
- _acknowledgement->add_subscription(its_subscription);
+ if (_ttl == 0) { // --> unsubscribe
+ its_subscription->set_ttl(0);
+ if (!_is_stop_subscribe_subscribe) {
+ {
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ pending_remote_subscriptions_[its_subscription] = _acknowledgement;
+ _acknowledgement->add_subscription(its_subscription);
+ }
+ host_->on_remote_unsubscribe(its_subscription);
}
- host_->on_remote_unsubscribe(its_subscription);
+ return;
}
- return;
- }
- if (_force_initial_events) {
- its_subscription->set_force_initial_events(true);
- }
- its_subscription->set_ttl(_ttl
- * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_));
+ if (_force_initial_events) {
+ its_subscription->set_force_initial_events(true);
+ }
+ its_subscription->set_ttl(_ttl
+ * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_));
- {
- std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
- pending_remote_subscriptions_[its_subscription] = _acknowledgement;
- _acknowledgement->add_subscription(its_subscription);
- }
+ {
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ pending_remote_subscriptions_[its_subscription] = _acknowledgement;
+ _acknowledgement->add_subscription(its_subscription);
+ }
- host_->on_remote_subscribe(its_subscription,
- std::bind(&service_discovery_impl::update_remote_subscription,
- shared_from_this(), std::placeholders::_1));
+ host_->on_remote_subscribe(its_subscription,
+ std::bind(&service_discovery_impl::update_remote_subscription,
+ shared_from_this(), std::placeholders::_1));
+ }
}
void
@@ -2380,7 +2459,7 @@ service_discovery_impl::handle_eventgroup_subscription_nack(
for (const auto& its_client : _clients) {
host_->on_subscribe_nack(its_client,
_service, _instance, _eventgroup, ANY_EVENT,
- PENDING_SUBSCRIPTION_ID); // TODO: This is a dummy call...
+ PENDING_SUBSCRIPTION_ID, false); // TODO: This is a dummy call...
}
@@ -3339,6 +3418,7 @@ service_discovery_impl::get_ttl_factor(
void
service_discovery_impl::on_last_msg_received_timer_expired(
const boost::system::error_code &_error) {
+
if (!_error) {
// We didn't receive a multicast message within 110% of the cyclic_offer_delay_
VSOMEIP_WARNING << "Didn't receive a multicast SD message for " <<
@@ -3346,8 +3426,10 @@ service_discovery_impl::on_last_msg_received_timer_expired(
// Rejoin multicast group
if (endpoint_ && !reliable_) {
- dynamic_cast<udp_server_endpoint_impl*>(
- endpoint_.get())->join(sd_multicast_);
+ auto its_endpoint = std::dynamic_pointer_cast<
+ udp_server_endpoint_impl>(endpoint_);
+ if (its_endpoint)
+ its_endpoint->join(sd_multicast_);
}
{
boost::system::error_code ec;