summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp361
1 files changed, 278 insertions, 83 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 34150fe..07c0740 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -532,8 +532,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
_minor, DEFAULT_TTL);
}
its_info->add_client(_client);
- ep_mgr_impl_->find_or_create_remote_client(
- _service, _instance, true);
+ ep_mgr_impl_->find_or_create_remote_client(_service, _instance);
}
}
}
@@ -633,6 +632,7 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid,
std::unique_lock<std::mutex> its_critical(remote_subscription_state_mutex_);
bool inserted = insert_subscription(_service, _instance, _eventgroup,
_event, _client, &its_already_subscribed_events);
+ const bool subscriber_is_rm_host = (get_client() == _client);
if (inserted) {
if (0 == its_local_client) {
handle_subscription_state(_client, _service, _instance, _eventgroup, _event);
@@ -642,7 +642,11 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid,
_eventgroup, _event, its_already_subscribed_events);
auto its_info = find_eventgroup(_service, _instance, _eventgroup);
- if (its_info) {
+ // if the subscriber is the rm_host itself: check if service
+ // is available before subscribing via SD otherwise we sent
+ // a StopSubscribe/Subscribe once the first offer is received
+ if (its_info &&
+ (!subscriber_is_rm_host || find_service(_service, _instance))) {
discovery_->subscribe(_service, _instance, _eventgroup,
_major, configured_ttl,
its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT,
@@ -657,7 +661,7 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid,
}
}
}
- if (get_client() == _client) {
+ if (subscriber_is_rm_host) {
std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
subscription_data_t subscription = {
_service, _instance, _eventgroup, _major, _event, _uid, _gid
@@ -701,6 +705,10 @@ void routing_manager_impl::unsubscribe(client_t _client, uid_t _uid, gid_t _gid,
host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, false,
[](const bool _subscription_accepted){ (void)_subscription_accepted; });
if (0 == find_local_client(_service, _instance)) {
+ if (get_client() == _client) {
+ std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
+ remove_pending_subscription(_service, _instance, _eventgroup, _event);
+ }
if (last_subscriber_removed) {
unset_all_eventpayloads(_service, _instance, _eventgroup);
{
@@ -784,7 +792,8 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
if (its_target) {
#ifdef USE_DLT
if ((is_request && its_client == get_client()) ||
- (is_response && find_local_client(its_service, _instance) == get_client())) {
+ (is_response && find_local_client(its_service, _instance) == get_client()) ||
+ (is_notification && find_local_client(its_service, _instance) == VSOMEIP_ROUTING_CLIENT)) {
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
@@ -1201,6 +1210,37 @@ void routing_manager_impl::notify_one(service_t _service, instance_t _instance,
void routing_manager_impl::on_availability(service_t _service, instance_t _instance,
bool _is_available, major_version_t _major, minor_version_t _minor) {
+
+ // insert subscriptions of routing manager into service discovery
+ // to send SubscribeEventgroup after StopOffer / Offer was received
+ if (_is_available) {
+ if (discovery_) {
+ const client_t its_local_client = find_local_client(_service, _instance);
+ // remote service
+ if (VSOMEIP_ROUTING_CLIENT == its_local_client) {
+ static const ttl_t configured_ttl(configuration_->get_sd_ttl());
+
+ std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
+ for (auto &ps : pending_subscriptions_) {
+ if (ps.service_ == _service
+ && ps.instance_ == _instance
+ && ps.major_ == _major) {
+ auto its_info = find_eventgroup(_service, _instance, ps.eventgroup_);
+ if (its_info) {
+ discovery_->subscribe(
+ _service,
+ _instance,
+ ps.eventgroup_,
+ _major,
+ configured_ttl,
+ its_info->is_selective() ? get_client() : VSOMEIP_ROUTING_CLIENT,
+ its_info);
+ }
+ }
+ }
+ }
+ }
+ }
host_->on_availability(_service, _instance, _is_available, _major, _minor);
}
@@ -1719,16 +1759,23 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se
erase_offer_command(_service, _instance);
}
- std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_);
- auto find_service = eventgroups_.find(_service);
- if (find_service != eventgroups_.end()) {
- auto find_instance = find_service->second.find(_instance);
- if (find_instance != find_service->second.end()) {
- for (auto e : find_instance->second) {
- e.second->clear_remote_subscriptions();
+ std::set<std::shared_ptr<eventgroupinfo> > its_eventgroup_info_set;
+ {
+ std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_);
+ auto find_service = eventgroups_.find(_service);
+ if (find_service != eventgroups_.end()) {
+ auto find_instance = find_service->second.find(_instance);
+ if (find_instance != find_service->second.end()) {
+ for (auto e : find_instance->second) {
+ its_eventgroup_info_set.insert(e.second);
+ }
}
}
}
+
+ for (auto e : its_eventgroup_info_set) {
+ e->clear_remote_subscriptions();
+ }
} else {
erase_offer_command(_service, _instance);
}
@@ -1902,6 +1949,11 @@ bool routing_manager_impl::deliver_notification(
}
}
if (!cache_event) {
+ VSOMEIP_WARNING << __func__ << ": dropping ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event_id
+ << "]. No subscription to corresponding eventgroup.";
return true; // as there is nothing to do
}
}
@@ -1996,13 +2048,10 @@ std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoin
its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE,
_address, _port);
if (!_reliable) {
-#if defined(_WIN32) || defined(ANDROID)
- dynamic_cast<udp_server_endpoint_impl*>(
- its_service_endpoint.get())->join(_address);
-#else
- reinterpret_cast<udp_server_endpoint_impl*>(
- its_service_endpoint.get())->join(_address);
-#endif
+ auto its_udp_server_endpoint_impl = std::dynamic_pointer_cast<
+ udp_server_endpoint_impl>(its_service_endpoint);
+ if (its_udp_server_endpoint_impl)
+ its_udp_server_endpoint_impl->join(_address);
}
} else {
VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. "
@@ -2150,6 +2199,7 @@ bool routing_manager_impl::is_field(service_t _service, instance_t _instance,
return false;
}
+//only called from the SD
void routing_manager_impl::add_routing_info(
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor, ttl_t _ttl,
@@ -2158,6 +2208,12 @@ void routing_manager_impl::add_routing_info(
const boost::asio::ip::address &_unreliable_address,
uint16_t _unreliable_port) {
+ std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
+ if (routing_state_ == routing_state_e::RS_SUSPENDED) {
+ VSOMEIP_INFO << "rmi::" << __func__ << " We are suspened --> do nothing.";
+ return;
+ }
+
// Create/Update service info
std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
if (!its_info) {
@@ -2259,6 +2315,7 @@ void routing_manager_impl::add_routing_info(
}
} else if (_reliable_port != ILLEGAL_PORT && is_reliable_known) {
std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
+ bool connected(false);
for(const auto &client_id : requested_services_) {
auto found_service = client_id.second.find(_service);
if (found_service != client_id.second.end()) {
@@ -2272,21 +2329,34 @@ void routing_manager_impl::add_routing_info(
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
std::shared_ptr<endpoint> ep = its_info->get_endpoint(true);
- if (ep && ep->is_established() &&
+ if (ep) {
+ if (ep->is_established() &&
!stub_->contained_in_routing_info(
VSOMEIP_ROUTING_CLIENT, _service, _instance,
its_info->get_major(),
its_info->get_minor())) {
- on_availability(_service, _instance,
- true, its_info->get_major(), its_info->get_minor());
- stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT,
- _service, _instance,
- its_info->get_major(),
- its_info->get_minor());
- if (discovery_) {
- discovery_->on_endpoint_connected(
- _service, _instance, ep);
+ on_availability(_service, _instance,
+ true, its_info->get_major(), its_info->get_minor());
+ stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT,
+ _service, _instance,
+ its_info->get_major(),
+ its_info->get_minor());
+ if (discovery_) {
+ discovery_->on_endpoint_connected(
+ _service, _instance, ep);
+ }
+ }
+ } else {
+ // no endpoint yet, but requested -> create one
+
+ // SWS_SD_00376 establish TCP connection to service
+ // service is marked as available later in on_connect()
+ if (!connected) {
+ ep_mgr_impl_->find_or_create_remote_client(
+ _service, _instance, true);
+ connected = true;
}
+ its_info->add_client(client_id.first);
}
break;
}
@@ -2358,7 +2428,8 @@ void routing_manager_impl::add_routing_info(
&& (major_minor_pair.second <= _minor
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
- if (!stub_->contained_in_routing_info(
+ if (_reliable_port == ILLEGAL_PORT && !is_reliable_known &&
+ !stub_->contained_in_routing_info(
VSOMEIP_ROUTING_CLIENT, _service, _instance,
its_info->get_major(),
its_info->get_minor())) {
@@ -2411,18 +2482,15 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst
// do no longer exist and the last received payload is no
// longer valid.
for (auto &its_event : its_eventgroup.second->get_events()) {
- const auto its_subscribers = its_event->get_subscribers();
- for (const auto& its_subscriber : its_subscribers) {
+ const auto& its_subscribers = its_event->get_subscribers();
+ for (const auto its_subscriber : its_subscribers) {
if (its_subscriber != get_client()) {
its_event->remove_subscriber(
its_eventgroup.first, its_subscriber);
}
}
its_events.push_back(its_event);
- remove_pending_subscription(_service, _instance,
- its_eventgroup.first, its_event->get_event());
}
-
}
}
}
@@ -2436,14 +2504,14 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst
std::set<std::tuple<
service_t, instance_t, eventgroup_t, client_t> > its_invalid;
- for (const auto& its_state : remote_subscription_state_) {
+ for (const auto &its_state : remote_subscription_state_) {
if (std::get<0>(its_state.first) == _service
&& std::get<1>(its_state.first) == _instance) {
its_invalid.insert(its_state.first);
}
}
- for (const auto& its_key : its_invalid)
+ for (const auto &its_key : its_invalid)
remote_subscription_state_.erase(its_key);
}
@@ -2599,8 +2667,15 @@ routing_manager_impl::expire_subscriptions(
const bool expire_all = (_range.first == ANY_PORT
&& _range.second == ANY_PORT);
- std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- for (const auto &its_service : eventgroups_) {
+ std::map<service_t,
+ std::map<instance_t,
+ std::map<eventgroup_t,
+ std::shared_ptr<eventgroupinfo> > > >its_eventgroups;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ its_eventgroups = eventgroups_;
+ }
+ for (const auto &its_service : its_eventgroups) {
for (const auto &its_instance : its_service.second) {
for (const auto &its_eventgroup : its_instance.second) {
const auto its_info = its_eventgroup.second;
@@ -2743,10 +2818,12 @@ void routing_manager_impl::on_remote_subscribe(
// not exist or is still (partly) pending.
remote_subscription_id_t its_id;
std::set<client_t> its_added;
+ update_remote_subscription_mutex_.lock();
auto its_result = its_eventgroupinfo->update_remote_subscription(
_subscription, its_expiration, its_added, its_id, true);
if (its_result) {
if (!_subscription->is_pending()) { // resubscription without change
+ update_remote_subscription_mutex_.unlock();
_callback(_subscription);
} else if (!its_added.empty()) { // new clients for a selective subscription
const client_t its_offering_client
@@ -2754,6 +2831,7 @@ void routing_manager_impl::on_remote_subscribe(
send_subscription(its_offering_client,
its_service, its_instance, its_eventgroup, its_major,
its_added, _subscription->get_id());
+ update_remote_subscription_mutex_.unlock();
} else { // identical subscription is not yet processed
std::stringstream its_warning;
its_warning << __func__ << " a remote subscription is already pending ["
@@ -2774,9 +2852,21 @@ void routing_manager_impl::on_remote_subscribe(
if (its_reliable && its_unreliable)
its_warning << "]";
VSOMEIP_WARNING << its_warning.str();
+
+ update_remote_subscription_mutex_.unlock();
_callback(_subscription);
}
} else { // new subscription
+ if (its_eventgroupinfo->is_remote_subscription_limit_reached(
+ _subscription)) {
+ _subscription->set_all_client_states(
+ remote_subscription_state_e::SUBSCRIPTION_NACKED);
+
+ update_remote_subscription_mutex_.unlock();
+ _callback(_subscription);
+ return;
+ }
+
auto its_id
= its_eventgroupinfo->add_remote_subscription(_subscription);
@@ -2785,6 +2875,7 @@ void routing_manager_impl::on_remote_subscribe(
send_subscription(its_offering_client,
its_service, its_instance, its_eventgroup, its_major,
_subscription->get_clients(), its_id);
+ update_remote_subscription_mutex_.unlock();
}
}
@@ -2820,6 +2911,7 @@ void routing_manager_impl::on_remote_unsubscribe(
remote_subscription_id_t its_id(0);
std::set<client_t> its_removed;
+ update_remote_subscription_mutex_.lock();
auto its_result = its_info->update_remote_subscription(
_subscription, std::chrono::steady_clock::now(),
its_removed, its_id, false);
@@ -2831,6 +2923,8 @@ void routing_manager_impl::on_remote_unsubscribe(
its_service, its_instance, its_eventgroup, its_major,
its_removed, its_id);
}
+
+ update_remote_subscription_mutex_.unlock();
}
void routing_manager_impl::on_subscribe_ack_with_multicast(
@@ -2931,13 +3025,20 @@ std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client(
void routing_manager_impl::on_subscribe_nack(client_t _client,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- event_t _event, remote_subscription_id_t _id) {
+ event_t _event, remote_subscription_id_t _id, bool _simulated) {
(void)_event; // TODO: Remove completely?
auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
if (its_eventgroup) {
auto its_subscription = its_eventgroup->get_remote_subscription(_id);
if (its_subscription) {
+ if (_simulated) {
+ // method was called because a subscription for unoffered
+ // service was received. Therefore, remove the remote_subscription
+ // from the eventgroupinfo to ensure subsequent similar
+ // subscriptions are handled like a new/unknown subscription
+ its_eventgroup->remove_remote_subscription(_id);
+ }
its_subscription->set_client_state(_client,
remote_subscription_state_e::SUBSCRIPTION_NACKED);
@@ -3110,6 +3211,10 @@ void routing_manager_impl::clear_remote_subscriber(
std::chrono::steady_clock::time_point
routing_manager_impl::expire_subscriptions(bool _force) {
+ std::map<service_t,
+ std::map<instance_t,
+ std::map<eventgroup_t,
+ std::shared_ptr<eventgroupinfo> > > >its_eventgroups;
std::map<std::shared_ptr<remote_subscription>,
std::set<client_t> > its_expired_subscriptions;
@@ -3119,24 +3224,25 @@ routing_manager_impl::expire_subscriptions(bool _force) {
= std::chrono::steady_clock::now() + std::chrono::hours(24);
{
std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
-
- for (auto &its_service : eventgroups_) {
- for (auto &its_instance : its_service.second) {
- for (auto &its_eventgroup : its_instance.second) {
- auto its_subscriptions
- = its_eventgroup.second->get_remote_subscriptions();
- for (auto &s : its_subscriptions) {
- for (auto its_client : s->get_clients()) {
- if (_force) {
- its_expired_subscriptions[s].insert(its_client);
- } else {
- auto its_expiration = s->get_expiration(its_client);
- if (its_expiration != std::chrono::steady_clock::time_point()) {
- if (its_expiration < now) {
- its_expired_subscriptions[s].insert(its_client);
- } else if (its_expiration < its_next_expiration) {
- its_next_expiration = its_expiration;
- }
+ its_eventgroups = eventgroups_;
+ }
+
+ for (auto &its_service : its_eventgroups) {
+ for (auto &its_instance : its_service.second) {
+ for (auto &its_eventgroup : its_instance.second) {
+ auto its_subscriptions
+ = its_eventgroup.second->get_remote_subscriptions();
+ for (auto &s : its_subscriptions) {
+ for (auto its_client : s->get_clients()) {
+ if (_force) {
+ its_expired_subscriptions[s].insert(its_client);
+ } else {
+ auto its_expiration = s->get_expiration(its_client);
+ if (its_expiration != std::chrono::steady_clock::time_point()) {
+ if (its_expiration < now) {
+ its_expired_subscriptions[s].insert(its_client);
+ } else if (its_expiration < its_next_expiration) {
+ its_next_expiration = its_expiration;
}
}
}
@@ -3152,22 +3258,53 @@ routing_manager_impl::expire_subscriptions(bool _force) {
auto its_service = its_info->get_service();
auto its_instance = its_info->get_instance();
auto its_eventgroup = its_info->get_eventgroup();
- auto its_major = its_info->get_major();
remote_subscription_id_t its_id;
+ update_remote_subscription_mutex_.lock();
auto its_result = its_info->update_remote_subscription(
s.first, std::chrono::steady_clock::now(),
s.second, its_id, false);
if (its_result) {
const client_t its_offering_client
= find_local_client(its_service, its_instance);
- send_unsubscription(its_offering_client,
- its_service, its_instance, its_eventgroup, its_major,
+ const auto its_subscription = its_info->get_remote_subscription(its_id);
+ if (its_subscription) {
+ its_info->remove_remote_subscription(its_id);
+
+ std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
+ remote_subscribers_[its_service][its_instance].erase(its_offering_client);
+
+ if (its_info->get_remote_subscriptions().size() == 0) {
+ for (const auto &its_event : its_info->get_events()) {
+ bool has_remote_subscriber(false);
+ for (const auto &its_eventgroup : its_event->get_eventgroups()) {
+ const auto its_eventgroup_info
+ = find_eventgroup(its_service, its_instance, its_eventgroup);
+ if (its_eventgroup_info
+ && its_eventgroup_info->get_remote_subscriptions().size() > 0) {
+ has_remote_subscriber = true;
+ }
+ }
+ if (!has_remote_subscriber && its_event->is_shadow()) {
+ its_event->unset_payload();
+ }
+ }
+ }
+ } else {
+ VSOMEIP_ERROR << __func__
+ << ": Unknown expired subscription " << std::dec << its_id << " for eventgroup ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
+ }
+ send_expired_subscription(its_offering_client,
+ its_service, its_instance, its_eventgroup,
s.second, s.first->get_id());
}
+ update_remote_subscription_mutex_.unlock();
if (s.first->get_unreliable()) {
- VSOMEIP_INFO << "Expired subscription ["
+ VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription ["
<< std::hex << std::setfill('0') << std::setw(4) << its_service << "."
<< std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
<< std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] unreliable from "
@@ -3176,7 +3313,7 @@ routing_manager_impl::expire_subscriptions(bool _force) {
}
if (s.first->get_reliable()) {
- VSOMEIP_INFO << "Expired subscription ["
+ VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription ["
<< std::hex << std::setfill('0') << std::setw(4) << its_service << "."
<< std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
<< std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] reliable from "
@@ -3205,7 +3342,7 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const
}
std::stringstream its_last_resume;
{
- std::lock_guard<std::mutex> its_lock(last_resume_mutex_);
+ std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
if (last_resume_ != std::chrono::steady_clock::time_point::min()) {
its_last_resume << " | " << std::dec
<< std::chrono::duration_cast<std::chrono::seconds>(
@@ -3491,15 +3628,31 @@ void routing_manager_impl::send_subscribe(client_t _client, service_t _service,
}
void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
+
+ // Ignore setting to the current routing state
+ {
+ std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
+ if (routing_state_ == _routing_state) {
+ VSOMEIP_INFO << "rmi::" << __func__ << " No routing state change --> do nothing.";
+ return;
+ }
+
+ routing_state_ = _routing_state;
+ }
+
if(discovery_) {
switch (_routing_state) {
case routing_state_e::RS_SUSPENDED:
{
- VSOMEIP_INFO << "Set routing to suspend mode, diagnosis mode is "
- << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode, diagnosis mode is "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+
// stop processing of incoming SD messages
discovery_->stop();
+ VSOMEIP_INFO << "rmi::" << __func__ << " Inform all applications that we are going to suspend";
+ send_suspend();
+
// remove all remote subscriptions to remotely offered services on this node
expire_subscriptions(true);
@@ -3520,6 +3673,10 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
remote_subscription_state_.clear();
}
+
+ // send StopSubscribes and clear subscribed_ map
+ discovery_->unsubscribe_all_on_suspend();
+
// mark all external services as offline
services_t its_remote_services;
{
@@ -3528,11 +3685,6 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}
for (const auto &s : its_remote_services) {
for (const auto &i : s.second) {
- // determine existing subscriptions to remote service and send StopSubscribe
- for (auto its_eventgroup : get_subscribed_eventgroups(s.first, i.first)) {
- discovery_->unsubscribe(s.first, i.first, its_eventgroup, VSOMEIP_ROUTING_CLIENT);
- }
-
const bool has_reliable(i.second->get_endpoint(true));
const bool has_unreliable(i.second->get_endpoint(false));
del_routing_info(s.first, i.first, has_reliable, has_unreliable);
@@ -3541,14 +3693,18 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
unset_all_eventpayloads(s.first, i.first);
}
}
+
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode done, diagnosis mode is "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+
break;
}
case routing_state_e::RS_RESUMED:
{
- VSOMEIP_INFO << "Set routing to resume mode, diagnosis mode was "
- << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode, diagnosis mode was "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
{
- std::lock_guard<std::mutex> its_lock(last_resume_mutex_);
+ std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
last_resume_ = std::chrono::steady_clock::now();
}
@@ -3575,11 +3731,14 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
discovery_->offer_service(its_instance.second);
}
}
+
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode done, diagnosis mode was "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
break;
}
case routing_state_e::RS_DIAGNOSIS:
{
- VSOMEIP_INFO << "Set routing to diagnosis mode.";
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode.";
discovery_->set_diagnosis_mode(true);
// send StopOffer messages for all someip protocol services
@@ -3591,11 +3750,13 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}
}
}
+
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode done.";
break;
}
case routing_state_e::RS_RUNNING:
- VSOMEIP_INFO << "Set routing to running mode, diagnosis mode was "
- << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode, diagnosis mode was "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
// Reset relevant in service info
for (const auto &its_service : get_offered_services()) {
@@ -3619,6 +3780,9 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}
}
}
+
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode done, diagnosis mode was "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
break;
default:
break;
@@ -3724,7 +3888,7 @@ void routing_manager_impl::requested_service_remove(client_t _client,
if (!found_service->second.size()) {
found_client->second.erase(_service);
if (!found_client->second.size()) {
- requested_services_.erase(client_);
+ requested_services_.erase(_client);
}
}
}
@@ -4057,6 +4221,7 @@ routing_manager_impl::on_unsubscribe_ack(client_t _client,
std::shared_ptr<eventgroupinfo> its_info
= find_eventgroup(_service, _instance, _eventgroup);
if (its_info) {
+ update_remote_subscription_mutex_.lock();
const auto its_subscription = its_info->get_remote_subscription(_id);
if (its_subscription) {
its_info->remove_remote_subscription(_id);
@@ -4088,6 +4253,7 @@ routing_manager_impl::on_unsubscribe_ack(client_t _client,
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
}
+ update_remote_subscription_mutex_.unlock();
} else {
VSOMEIP_ERROR << __func__
<< ": Received StopSubscribe for unknown eventgroup: ("
@@ -4122,7 +4288,7 @@ void routing_manager_impl::send_subscription(
&routing_manager_stub_host::on_subscribe_nack,
std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
its_client, _service, _instance,
- _eventgroup, ANY_EVENT, _id);
+ _eventgroup, ANY_EVENT, _id, false);
io_.post(its_callback);
} else {
const auto its_callback = std::bind(
@@ -4146,7 +4312,7 @@ void routing_manager_impl::send_subscription(
&routing_manager_stub_host::on_subscribe_nack,
std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
its_client, _service, _instance, _eventgroup,
- ANY_EVENT, _id);
+ ANY_EVENT, _id, true);
io_.post(its_callback);
} catch (const std::exception &e) {
VSOMEIP_ERROR << __func__ << e.what();
@@ -4270,7 +4436,7 @@ routing_manager_impl::send_unsubscription(client_t _offering_client,
host_->on_subscription(_service, _instance,
_eventgroup, its_client, own_uid_, own_gid_, false,
[this, self, _service, _instance, _eventgroup,
- its_client, _id, _offering_client]
+ its_client, _id]
(const bool _is_accepted) {
(void)_is_accepted;
try {
@@ -4303,6 +4469,30 @@ routing_manager_impl::send_unsubscription(client_t _offering_client,
}
}
+void
+routing_manager_impl::send_expired_subscription(client_t _offering_client,
+ service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup,
+ const std::set<client_t> &_removed,
+ remote_subscription_id_t _id) {
+
+ if (host_->get_client() == _offering_client) {
+ auto self = shared_from_this();
+ for (const auto its_client : _removed) {
+ host_->on_subscription(_service, _instance,
+ _eventgroup, its_client, own_uid_, own_gid_, false,
+ [] (const bool _subscription_accepted){
+ (void)_subscription_accepted;
+ });
+ }
+ } else {
+ for (const auto its_client : _removed) {
+ stub_->send_expired_subscription(find_local(_offering_client), its_client,
+ _service, _instance, _eventgroup, ANY_EVENT, _id);
+ }
+ }
+}
+
bool
routing_manager_impl::update_security_policy_configuration(
uint32_t _uid, uint32_t _gid,
@@ -4347,7 +4537,7 @@ bool routing_manager_impl::insert_event_statistics(service_t _service, instance_
// check list for entry with least counter value
uint32_t its_min_count(0xFFFFFFFF);
auto its_tuple_to_discard = std::make_tuple(0xFFFF, 0xFFFF, 0xFFFF);
- for (const auto& it : message_statistics_) {
+ for (const auto &it : message_statistics_) {
if (it.second.counter_ < its_min_count) {
its_min_count = it.second.counter_;
its_tuple_to_discard = it.first;
@@ -4389,7 +4579,7 @@ void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code co
std::stringstream its_log;
{
std::lock_guard<std::mutex> its_lock(message_statistics_mutex_);
- for (const auto& s : message_statistics_) {
+ for (const auto &s : message_statistics_) {
if (s.second.counter_ / (its_interval / 1000) >= its_min_freq) {
uint16_t its_subscribed(0);
std::shared_ptr<event> its_event = find_event(std::get<0>(s.first), std::get<1>(s.first), std::get<2>(s.first));
@@ -4431,4 +4621,9 @@ void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code co
}
}
+void routing_manager_impl::send_suspend() const {
+
+ stub_->send_suspend();
+}
+
} // namespace vsomeip_v3