diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_impl.cpp | 361 |
1 files changed, 278 insertions, 83 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 34150fe..07c0740 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -532,8 +532,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, _minor, DEFAULT_TTL); } its_info->add_client(_client); - ep_mgr_impl_->find_or_create_remote_client( - _service, _instance, true); + ep_mgr_impl_->find_or_create_remote_client(_service, _instance); } } } @@ -633,6 +632,7 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid, std::unique_lock<std::mutex> its_critical(remote_subscription_state_mutex_); bool inserted = insert_subscription(_service, _instance, _eventgroup, _event, _client, &its_already_subscribed_events); + const bool subscriber_is_rm_host = (get_client() == _client); if (inserted) { if (0 == its_local_client) { handle_subscription_state(_client, _service, _instance, _eventgroup, _event); @@ -642,7 +642,11 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid, _eventgroup, _event, its_already_subscribed_events); auto its_info = find_eventgroup(_service, _instance, _eventgroup); - if (its_info) { + // if the subscriber is the rm_host itself: check if service + // is available before subscribing via SD otherwise we sent + // a StopSubscribe/Subscribe once the first offer is received + if (its_info && + (!subscriber_is_rm_host || find_service(_service, _instance))) { discovery_->subscribe(_service, _instance, _eventgroup, _major, configured_ttl, its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT, @@ -657,7 +661,7 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid, } } } - if (get_client() == _client) { + if (subscriber_is_rm_host) { std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); subscription_data_t subscription = { _service, _instance, _eventgroup, _major, _event, _uid, _gid @@ -701,6 +705,10 @@ void routing_manager_impl::unsubscribe(client_t _client, uid_t _uid, gid_t _gid, host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; }); if (0 == find_local_client(_service, _instance)) { + if (get_client() == _client) { + std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); + remove_pending_subscription(_service, _instance, _eventgroup, _event); + } if (last_subscriber_removed) { unset_all_eventpayloads(_service, _instance, _eventgroup); { @@ -784,7 +792,8 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, if (its_target) { #ifdef USE_DLT if ((is_request && its_client == get_client()) || - (is_response && find_local_client(its_service, _instance) == get_client())) { + (is_response && find_local_client(its_service, _instance) == get_client()) || + (is_notification && find_local_client(its_service, _instance) == VSOMEIP_ROUTING_CLIENT)) { const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); @@ -1201,6 +1210,37 @@ void routing_manager_impl::notify_one(service_t _service, instance_t _instance, void routing_manager_impl::on_availability(service_t _service, instance_t _instance, bool _is_available, major_version_t _major, minor_version_t _minor) { + + // insert subscriptions of routing manager into service discovery + // to send SubscribeEventgroup after StopOffer / Offer was received + if (_is_available) { + if (discovery_) { + const client_t its_local_client = find_local_client(_service, _instance); + // remote service + if (VSOMEIP_ROUTING_CLIENT == its_local_client) { + static const ttl_t configured_ttl(configuration_->get_sd_ttl()); + + std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); + for (auto &ps : pending_subscriptions_) { + if (ps.service_ == _service + && ps.instance_ == _instance + && ps.major_ == _major) { + auto its_info = find_eventgroup(_service, _instance, ps.eventgroup_); + if (its_info) { + discovery_->subscribe( + _service, + _instance, + ps.eventgroup_, + _major, + configured_ttl, + its_info->is_selective() ? get_client() : VSOMEIP_ROUTING_CLIENT, + its_info); + } + } + } + } + } + } host_->on_availability(_service, _instance, _is_available, _major, _minor); } @@ -1719,16 +1759,23 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se erase_offer_command(_service, _instance); } - std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_); - auto find_service = eventgroups_.find(_service); - if (find_service != eventgroups_.end()) { - auto find_instance = find_service->second.find(_instance); - if (find_instance != find_service->second.end()) { - for (auto e : find_instance->second) { - e.second->clear_remote_subscriptions(); + std::set<std::shared_ptr<eventgroupinfo> > its_eventgroup_info_set; + { + std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_); + auto find_service = eventgroups_.find(_service); + if (find_service != eventgroups_.end()) { + auto find_instance = find_service->second.find(_instance); + if (find_instance != find_service->second.end()) { + for (auto e : find_instance->second) { + its_eventgroup_info_set.insert(e.second); + } } } } + + for (auto e : its_eventgroup_info_set) { + e->clear_remote_subscriptions(); + } } else { erase_offer_command(_service, _instance); } @@ -1902,6 +1949,11 @@ bool routing_manager_impl::deliver_notification( } } if (!cache_event) { + VSOMEIP_WARNING << __func__ << ": dropping [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_event_id + << "]. No subscription to corresponding eventgroup."; return true; // as there is nothing to do } } @@ -1996,13 +2048,10 @@ std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoin its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE, _address, _port); if (!_reliable) { -#if defined(_WIN32) || defined(ANDROID) - dynamic_cast<udp_server_endpoint_impl*>( - its_service_endpoint.get())->join(_address); -#else - reinterpret_cast<udp_server_endpoint_impl*>( - its_service_endpoint.get())->join(_address); -#endif + auto its_udp_server_endpoint_impl = std::dynamic_pointer_cast< + udp_server_endpoint_impl>(its_service_endpoint); + if (its_udp_server_endpoint_impl) + its_udp_server_endpoint_impl->join(_address); } } else { VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. " @@ -2150,6 +2199,7 @@ bool routing_manager_impl::is_field(service_t _service, instance_t _instance, return false; } +//only called from the SD void routing_manager_impl::add_routing_info( service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl, @@ -2158,6 +2208,12 @@ void routing_manager_impl::add_routing_info( const boost::asio::ip::address &_unreliable_address, uint16_t _unreliable_port) { + std::lock_guard<std::mutex> its_lock(routing_state_mutex_); + if (routing_state_ == routing_state_e::RS_SUSPENDED) { + VSOMEIP_INFO << "rmi::" << __func__ << " We are suspened --> do nothing."; + return; + } + // Create/Update service info std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance)); if (!its_info) { @@ -2259,6 +2315,7 @@ void routing_manager_impl::add_routing_info( } } else if (_reliable_port != ILLEGAL_PORT && is_reliable_known) { std::lock_guard<std::mutex> its_lock(requested_services_mutex_); + bool connected(false); for(const auto &client_id : requested_services_) { auto found_service = client_id.second.find(_service); if (found_service != client_id.second.end()) { @@ -2272,21 +2329,34 @@ void routing_manager_impl::add_routing_info( || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { std::shared_ptr<endpoint> ep = its_info->get_endpoint(true); - if (ep && ep->is_established() && + if (ep) { + if (ep->is_established() && !stub_->contained_in_routing_info( VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor())) { - on_availability(_service, _instance, - true, its_info->get_major(), its_info->get_minor()); - stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, - _service, _instance, - its_info->get_major(), - its_info->get_minor()); - if (discovery_) { - discovery_->on_endpoint_connected( - _service, _instance, ep); + on_availability(_service, _instance, + true, its_info->get_major(), its_info->get_minor()); + stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, + _service, _instance, + its_info->get_major(), + its_info->get_minor()); + if (discovery_) { + discovery_->on_endpoint_connected( + _service, _instance, ep); + } + } + } else { + // no endpoint yet, but requested -> create one + + // SWS_SD_00376 establish TCP connection to service + // service is marked as available later in on_connect() + if (!connected) { + ep_mgr_impl_->find_or_create_remote_client( + _service, _instance, true); + connected = true; } + its_info->add_client(client_id.first); } break; } @@ -2358,7 +2428,8 @@ void routing_manager_impl::add_routing_info( && (major_minor_pair.second <= _minor || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { - if (!stub_->contained_in_routing_info( + if (_reliable_port == ILLEGAL_PORT && !is_reliable_known && + !stub_->contained_in_routing_info( VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor())) { @@ -2411,18 +2482,15 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst // do no longer exist and the last received payload is no // longer valid. for (auto &its_event : its_eventgroup.second->get_events()) { - const auto its_subscribers = its_event->get_subscribers(); - for (const auto& its_subscriber : its_subscribers) { + const auto& its_subscribers = its_event->get_subscribers(); + for (const auto its_subscriber : its_subscribers) { if (its_subscriber != get_client()) { its_event->remove_subscriber( its_eventgroup.first, its_subscriber); } } its_events.push_back(its_event); - remove_pending_subscription(_service, _instance, - its_eventgroup.first, its_event->get_event()); } - } } } @@ -2436,14 +2504,14 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst std::set<std::tuple< service_t, instance_t, eventgroup_t, client_t> > its_invalid; - for (const auto& its_state : remote_subscription_state_) { + for (const auto &its_state : remote_subscription_state_) { if (std::get<0>(its_state.first) == _service && std::get<1>(its_state.first) == _instance) { its_invalid.insert(its_state.first); } } - for (const auto& its_key : its_invalid) + for (const auto &its_key : its_invalid) remote_subscription_state_.erase(its_key); } @@ -2599,8 +2667,15 @@ routing_manager_impl::expire_subscriptions( const bool expire_all = (_range.first == ANY_PORT && _range.second == ANY_PORT); - std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - for (const auto &its_service : eventgroups_) { + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, + std::shared_ptr<eventgroupinfo> > > >its_eventgroups; + { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + its_eventgroups = eventgroups_; + } + for (const auto &its_service : its_eventgroups) { for (const auto &its_instance : its_service.second) { for (const auto &its_eventgroup : its_instance.second) { const auto its_info = its_eventgroup.second; @@ -2743,10 +2818,12 @@ void routing_manager_impl::on_remote_subscribe( // not exist or is still (partly) pending. remote_subscription_id_t its_id; std::set<client_t> its_added; + update_remote_subscription_mutex_.lock(); auto its_result = its_eventgroupinfo->update_remote_subscription( _subscription, its_expiration, its_added, its_id, true); if (its_result) { if (!_subscription->is_pending()) { // resubscription without change + update_remote_subscription_mutex_.unlock(); _callback(_subscription); } else if (!its_added.empty()) { // new clients for a selective subscription const client_t its_offering_client @@ -2754,6 +2831,7 @@ void routing_manager_impl::on_remote_subscribe( send_subscription(its_offering_client, its_service, its_instance, its_eventgroup, its_major, its_added, _subscription->get_id()); + update_remote_subscription_mutex_.unlock(); } else { // identical subscription is not yet processed std::stringstream its_warning; its_warning << __func__ << " a remote subscription is already pending [" @@ -2774,9 +2852,21 @@ void routing_manager_impl::on_remote_subscribe( if (its_reliable && its_unreliable) its_warning << "]"; VSOMEIP_WARNING << its_warning.str(); + + update_remote_subscription_mutex_.unlock(); _callback(_subscription); } } else { // new subscription + if (its_eventgroupinfo->is_remote_subscription_limit_reached( + _subscription)) { + _subscription->set_all_client_states( + remote_subscription_state_e::SUBSCRIPTION_NACKED); + + update_remote_subscription_mutex_.unlock(); + _callback(_subscription); + return; + } + auto its_id = its_eventgroupinfo->add_remote_subscription(_subscription); @@ -2785,6 +2875,7 @@ void routing_manager_impl::on_remote_subscribe( send_subscription(its_offering_client, its_service, its_instance, its_eventgroup, its_major, _subscription->get_clients(), its_id); + update_remote_subscription_mutex_.unlock(); } } @@ -2820,6 +2911,7 @@ void routing_manager_impl::on_remote_unsubscribe( remote_subscription_id_t its_id(0); std::set<client_t> its_removed; + update_remote_subscription_mutex_.lock(); auto its_result = its_info->update_remote_subscription( _subscription, std::chrono::steady_clock::now(), its_removed, its_id, false); @@ -2831,6 +2923,8 @@ void routing_manager_impl::on_remote_unsubscribe( its_service, its_instance, its_eventgroup, its_major, its_removed, its_id); } + + update_remote_subscription_mutex_.unlock(); } void routing_manager_impl::on_subscribe_ack_with_multicast( @@ -2931,13 +3025,20 @@ std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client( void routing_manager_impl::on_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event, remote_subscription_id_t _id) { + event_t _event, remote_subscription_id_t _id, bool _simulated) { (void)_event; // TODO: Remove completely? auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); if (its_eventgroup) { auto its_subscription = its_eventgroup->get_remote_subscription(_id); if (its_subscription) { + if (_simulated) { + // method was called because a subscription for unoffered + // service was received. Therefore, remove the remote_subscription + // from the eventgroupinfo to ensure subsequent similar + // subscriptions are handled like a new/unknown subscription + its_eventgroup->remove_remote_subscription(_id); + } its_subscription->set_client_state(_client, remote_subscription_state_e::SUBSCRIPTION_NACKED); @@ -3110,6 +3211,10 @@ void routing_manager_impl::clear_remote_subscriber( std::chrono::steady_clock::time_point routing_manager_impl::expire_subscriptions(bool _force) { + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, + std::shared_ptr<eventgroupinfo> > > >its_eventgroups; std::map<std::shared_ptr<remote_subscription>, std::set<client_t> > its_expired_subscriptions; @@ -3119,24 +3224,25 @@ routing_manager_impl::expire_subscriptions(bool _force) { = std::chrono::steady_clock::now() + std::chrono::hours(24); { std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - - for (auto &its_service : eventgroups_) { - for (auto &its_instance : its_service.second) { - for (auto &its_eventgroup : its_instance.second) { - auto its_subscriptions - = its_eventgroup.second->get_remote_subscriptions(); - for (auto &s : its_subscriptions) { - for (auto its_client : s->get_clients()) { - if (_force) { - its_expired_subscriptions[s].insert(its_client); - } else { - auto its_expiration = s->get_expiration(its_client); - if (its_expiration != std::chrono::steady_clock::time_point()) { - if (its_expiration < now) { - its_expired_subscriptions[s].insert(its_client); - } else if (its_expiration < its_next_expiration) { - its_next_expiration = its_expiration; - } + its_eventgroups = eventgroups_; + } + + for (auto &its_service : its_eventgroups) { + for (auto &its_instance : its_service.second) { + for (auto &its_eventgroup : its_instance.second) { + auto its_subscriptions + = its_eventgroup.second->get_remote_subscriptions(); + for (auto &s : its_subscriptions) { + for (auto its_client : s->get_clients()) { + if (_force) { + its_expired_subscriptions[s].insert(its_client); + } else { + auto its_expiration = s->get_expiration(its_client); + if (its_expiration != std::chrono::steady_clock::time_point()) { + if (its_expiration < now) { + its_expired_subscriptions[s].insert(its_client); + } else if (its_expiration < its_next_expiration) { + its_next_expiration = its_expiration; } } } @@ -3152,22 +3258,53 @@ routing_manager_impl::expire_subscriptions(bool _force) { auto its_service = its_info->get_service(); auto its_instance = its_info->get_instance(); auto its_eventgroup = its_info->get_eventgroup(); - auto its_major = its_info->get_major(); remote_subscription_id_t its_id; + update_remote_subscription_mutex_.lock(); auto its_result = its_info->update_remote_subscription( s.first, std::chrono::steady_clock::now(), s.second, its_id, false); if (its_result) { const client_t its_offering_client = find_local_client(its_service, its_instance); - send_unsubscription(its_offering_client, - its_service, its_instance, its_eventgroup, its_major, + const auto its_subscription = its_info->get_remote_subscription(its_id); + if (its_subscription) { + its_info->remove_remote_subscription(its_id); + + std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); + remote_subscribers_[its_service][its_instance].erase(its_offering_client); + + if (its_info->get_remote_subscriptions().size() == 0) { + for (const auto &its_event : its_info->get_events()) { + bool has_remote_subscriber(false); + for (const auto &its_eventgroup : its_event->get_eventgroups()) { + const auto its_eventgroup_info + = find_eventgroup(its_service, its_instance, its_eventgroup); + if (its_eventgroup_info + && its_eventgroup_info->get_remote_subscriptions().size() > 0) { + has_remote_subscriber = true; + } + } + if (!has_remote_subscriber && its_event->is_shadow()) { + its_event->unset_payload(); + } + } + } + } else { + VSOMEIP_ERROR << __func__ + << ": Unknown expired subscription " << std::dec << its_id << " for eventgroup [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]"; + } + send_expired_subscription(its_offering_client, + its_service, its_instance, its_eventgroup, s.second, s.first->get_id()); } + update_remote_subscription_mutex_.unlock(); if (s.first->get_unreliable()) { - VSOMEIP_INFO << "Expired subscription [" + VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription [" << std::hex << std::setfill('0') << std::setw(4) << its_service << "." << std::hex << std::setfill('0') << std::setw(4) << its_instance << "." << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] unreliable from " @@ -3176,7 +3313,7 @@ routing_manager_impl::expire_subscriptions(bool _force) { } if (s.first->get_reliable()) { - VSOMEIP_INFO << "Expired subscription [" + VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription [" << std::hex << std::setfill('0') << std::setw(4) << its_service << "." << std::hex << std::setfill('0') << std::setw(4) << its_instance << "." << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] reliable from " @@ -3205,7 +3342,7 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const } std::stringstream its_last_resume; { - std::lock_guard<std::mutex> its_lock(last_resume_mutex_); + std::lock_guard<std::mutex> its_lock(routing_state_mutex_); if (last_resume_ != std::chrono::steady_clock::time_point::min()) { its_last_resume << " | " << std::dec << std::chrono::duration_cast<std::chrono::seconds>( @@ -3491,15 +3628,31 @@ void routing_manager_impl::send_subscribe(client_t _client, service_t _service, } void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { + + // Ignore setting to the current routing state + { + std::lock_guard<std::mutex> its_lock(routing_state_mutex_); + if (routing_state_ == _routing_state) { + VSOMEIP_INFO << "rmi::" << __func__ << " No routing state change --> do nothing."; + return; + } + + routing_state_ = _routing_state; + } + if(discovery_) { switch (_routing_state) { case routing_state_e::RS_SUSPENDED: { - VSOMEIP_INFO << "Set routing to suspend mode, diagnosis mode is " - << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode, diagnosis mode is " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + // stop processing of incoming SD messages discovery_->stop(); + VSOMEIP_INFO << "rmi::" << __func__ << " Inform all applications that we are going to suspend"; + send_suspend(); + // remove all remote subscriptions to remotely offered services on this node expire_subscriptions(true); @@ -3520,6 +3673,10 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); remote_subscription_state_.clear(); } + + // send StopSubscribes and clear subscribed_ map + discovery_->unsubscribe_all_on_suspend(); + // mark all external services as offline services_t its_remote_services; { @@ -3528,11 +3685,6 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } for (const auto &s : its_remote_services) { for (const auto &i : s.second) { - // determine existing subscriptions to remote service and send StopSubscribe - for (auto its_eventgroup : get_subscribed_eventgroups(s.first, i.first)) { - discovery_->unsubscribe(s.first, i.first, its_eventgroup, VSOMEIP_ROUTING_CLIENT); - } - const bool has_reliable(i.second->get_endpoint(true)); const bool has_unreliable(i.second->get_endpoint(false)); del_routing_info(s.first, i.first, has_reliable, has_unreliable); @@ -3541,14 +3693,18 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { unset_all_eventpayloads(s.first, i.first); } } + + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode done, diagnosis mode is " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + break; } case routing_state_e::RS_RESUMED: { - VSOMEIP_INFO << "Set routing to resume mode, diagnosis mode was " - << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode, diagnosis mode was " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); { - std::lock_guard<std::mutex> its_lock(last_resume_mutex_); + std::lock_guard<std::mutex> its_lock(routing_state_mutex_); last_resume_ = std::chrono::steady_clock::now(); } @@ -3575,11 +3731,14 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { discovery_->offer_service(its_instance.second); } } + + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode done, diagnosis mode was " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); break; } case routing_state_e::RS_DIAGNOSIS: { - VSOMEIP_INFO << "Set routing to diagnosis mode."; + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode."; discovery_->set_diagnosis_mode(true); // send StopOffer messages for all someip protocol services @@ -3591,11 +3750,13 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } } } + + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode done."; break; } case routing_state_e::RS_RUNNING: - VSOMEIP_INFO << "Set routing to running mode, diagnosis mode was " - << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode, diagnosis mode was " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); // Reset relevant in service info for (const auto &its_service : get_offered_services()) { @@ -3619,6 +3780,9 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } } } + + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode done, diagnosis mode was " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); break; default: break; @@ -3724,7 +3888,7 @@ void routing_manager_impl::requested_service_remove(client_t _client, if (!found_service->second.size()) { found_client->second.erase(_service); if (!found_client->second.size()) { - requested_services_.erase(client_); + requested_services_.erase(_client); } } } @@ -4057,6 +4221,7 @@ routing_manager_impl::on_unsubscribe_ack(client_t _client, std::shared_ptr<eventgroupinfo> its_info = find_eventgroup(_service, _instance, _eventgroup); if (its_info) { + update_remote_subscription_mutex_.lock(); const auto its_subscription = its_info->get_remote_subscription(_id); if (its_subscription) { its_info->remove_remote_subscription(_id); @@ -4088,6 +4253,7 @@ routing_manager_impl::on_unsubscribe_ack(client_t _client, << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } + update_remote_subscription_mutex_.unlock(); } else { VSOMEIP_ERROR << __func__ << ": Received StopSubscribe for unknown eventgroup: (" @@ -4122,7 +4288,7 @@ void routing_manager_impl::send_subscription( &routing_manager_stub_host::on_subscribe_nack, std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), its_client, _service, _instance, - _eventgroup, ANY_EVENT, _id); + _eventgroup, ANY_EVENT, _id, false); io_.post(its_callback); } else { const auto its_callback = std::bind( @@ -4146,7 +4312,7 @@ void routing_manager_impl::send_subscription( &routing_manager_stub_host::on_subscribe_nack, std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), its_client, _service, _instance, _eventgroup, - ANY_EVENT, _id); + ANY_EVENT, _id, true); io_.post(its_callback); } catch (const std::exception &e) { VSOMEIP_ERROR << __func__ << e.what(); @@ -4270,7 +4436,7 @@ routing_manager_impl::send_unsubscription(client_t _offering_client, host_->on_subscription(_service, _instance, _eventgroup, its_client, own_uid_, own_gid_, false, [this, self, _service, _instance, _eventgroup, - its_client, _id, _offering_client] + its_client, _id] (const bool _is_accepted) { (void)_is_accepted; try { @@ -4303,6 +4469,30 @@ routing_manager_impl::send_unsubscription(client_t _offering_client, } } +void +routing_manager_impl::send_expired_subscription(client_t _offering_client, + service_t _service, instance_t _instance, + eventgroup_t _eventgroup, + const std::set<client_t> &_removed, + remote_subscription_id_t _id) { + + if (host_->get_client() == _offering_client) { + auto self = shared_from_this(); + for (const auto its_client : _removed) { + host_->on_subscription(_service, _instance, + _eventgroup, its_client, own_uid_, own_gid_, false, + [] (const bool _subscription_accepted){ + (void)_subscription_accepted; + }); + } + } else { + for (const auto its_client : _removed) { + stub_->send_expired_subscription(find_local(_offering_client), its_client, + _service, _instance, _eventgroup, ANY_EVENT, _id); + } + } +} + bool routing_manager_impl::update_security_policy_configuration( uint32_t _uid, uint32_t _gid, @@ -4347,7 +4537,7 @@ bool routing_manager_impl::insert_event_statistics(service_t _service, instance_ // check list for entry with least counter value uint32_t its_min_count(0xFFFFFFFF); auto its_tuple_to_discard = std::make_tuple(0xFFFF, 0xFFFF, 0xFFFF); - for (const auto& it : message_statistics_) { + for (const auto &it : message_statistics_) { if (it.second.counter_ < its_min_count) { its_min_count = it.second.counter_; its_tuple_to_discard = it.first; @@ -4389,7 +4579,7 @@ void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code co std::stringstream its_log; { std::lock_guard<std::mutex> its_lock(message_statistics_mutex_); - for (const auto& s : message_statistics_) { + for (const auto &s : message_statistics_) { if (s.second.counter_ / (its_interval / 1000) >= its_min_freq) { uint16_t its_subscribed(0); std::shared_ptr<event> its_event = find_event(std::get<0>(s.first), std::get<1>(s.first), std::get<2>(s.first)); @@ -4431,4 +4621,9 @@ void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code co } } +void routing_manager_impl::send_suspend() const { + + stub_->send_suspend(); +} + } // namespace vsomeip_v3 |