diff options
Diffstat (limited to 'implementation/routing/src')
-rw-r--r-- | implementation/routing/src/routing_manager_base.cpp | 9 | ||||
-rw-r--r-- | implementation/routing/src/routing_manager_impl.cpp | 560 | ||||
-rw-r--r-- | implementation/routing/src/routing_manager_proxy.cpp | 92 | ||||
-rw-r--r-- | implementation/routing/src/routing_manager_stub.cpp | 428 |
4 files changed, 707 insertions, 382 deletions
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index 5b1149e..e187d93 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -176,7 +176,10 @@ void routing_manager_base::release_service(client_t _client, if (found_service != local_services_history_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { - local_services_history_.erase(_service); + found_service->second.erase(_instance); + if (found_service->second.empty()) { + local_services_history_.erase(_service); + } } } } @@ -1136,10 +1139,11 @@ bool routing_manager_base::send_local_notification(client_t _client, _data[VSOMEIP_METHOD_POS_MAX]); service_t its_service = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); + std::shared_ptr<event> its_event = find_event(its_service, _instance, its_method); if (its_event && !its_event->is_shadow()) { - for (auto its_client : its_event->get_subscribers()) { + // local if (its_client == VSOMEIP_ROUTING_CLIENT) { has_remote = true; @@ -1150,6 +1154,7 @@ bool routing_manager_base::send_local_notification(client_t _client, has_local = true; } #endif + std::shared_ptr<endpoint> its_local_target = ep_mgr_->find_local(its_client); if (its_local_target) { send_local(its_local_target, _client, _data, _size, diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index f141c57..74e556c 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -83,7 +83,8 @@ routing_manager_impl::routing_manager_impl(routing_manager_host *_host) : ep_mgr_impl_(std::make_shared<endpoint_manager_impl>(this, io_, configuration_)), pending_remote_offer_id_(0), last_resume_(std::chrono::steady_clock::now().min()), - pending_security_update_id_(0) + statistics_log_timer_(_host->get_io()), + ignored_statistics_counter_(0) { } @@ -212,6 +213,15 @@ void routing_manager_impl::start() { std::bind(&routing_manager_impl::status_log_timer_cbk, this, std::placeholders::_1)); } + + if (configuration_->log_statistics()) { + std::lock_guard<std::mutex> its_lock(statistics_log_timer_mutex_); + boost::system::error_code ec; + statistics_log_timer_.expires_from_now(std::chrono::seconds(0), ec); + statistics_log_timer_.async_wait( + std::bind(&routing_manager_impl::statistics_log_timer_cbk, this, + std::placeholders::_1)); + } } void routing_manager_impl::stop() { @@ -255,13 +265,20 @@ void routing_manager_impl::stop() { boost::system::error_code ec; status_log_timer_.cancel(ec); } + + { + std::lock_guard<std::mutex> its_lock(statistics_log_timer_mutex_); + boost::system::error_code ec; + statistics_log_timer_.cancel(ec); + } + host_->on_state(state_type_e::ST_DEREGISTERED); if (discovery_) discovery_->stop(); stub_->stop(); - for (auto client: ep_mgr_->get_connected_clients()) { + for (const auto client : ep_mgr_->get_connected_clients()) { if (client != VSOMEIP_ROUTING_CLIENT) { remove_local(client, true); } @@ -396,11 +413,14 @@ bool routing_manager_impl::offer_service(client_t _client, && ps.major_ == _major) { insert_subscription(ps.service_, ps.instance_, ps.eventgroup_, ps.event_, client_, &its_already_subscribed_events); +#if 0 VSOMEIP_ERROR << __func__ << ": event=" << std::hex << ps.service_ << "." << std::hex << ps.instance_ << "." - << std::hex << ps.event_; } + << std::hex << ps.event_; +#endif + } } send_pending_subscriptions(_service, _instance, _major); @@ -784,7 +804,8 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, // TODO: Find out how to handle session id here is_sent = deliver_message(_data, _size, _instance, _reliable, VSOMEIP_ROUTING_CLIENT, _credentials, _status_check); } else { - e2e_buffer outputBuffer; + e2e_buffer its_buffer; + if (e2e_provider_) { if ( !is_service_discovery) { service_t its_service = VSOMEIP_BYTES_TO_WORD( @@ -793,12 +814,18 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); #ifndef ANDROID if (e2e_provider_->is_protected({its_service, its_method})) { - outputBuffer.assign(_data, _data + VSOMEIP_PAYLOAD_POS); - e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data +_size); - e2e_provider_->protect({its_service, its_method}, inputBuffer); - outputBuffer.resize(inputBuffer.size() + VSOMEIP_PAYLOAD_POS); - std::copy(inputBuffer.begin(), inputBuffer.end(), outputBuffer.begin() + VSOMEIP_PAYLOAD_POS); - _data = outputBuffer.data(); + // Find out where the protected area starts + size_t its_base = e2e_provider_->get_protection_base({its_service, its_method}); + + // Build a corresponding buffer + its_buffer.assign(_data + its_base, _data + _size); + + e2e_provider_->protect({ its_service, its_method }, its_buffer, _instance); + + // Prepend header + its_buffer.insert(its_buffer.begin(), _data, _data + its_base); + + _data = its_buffer.data(); } #endif } @@ -979,7 +1006,7 @@ bool routing_manager_impl::send_to( if (its_serializer->serialize(_message.get())) { const byte_t *its_data = its_serializer->get_data(); length_t its_size = its_serializer->get_size(); - e2e_buffer its_output_buffer; + e2e_buffer its_buffer; if (e2e_provider_) { service_t its_service = VSOMEIP_BYTES_TO_WORD( its_data[VSOMEIP_SERVICE_POS_MIN], @@ -988,14 +1015,12 @@ bool routing_manager_impl::send_to( its_data[VSOMEIP_METHOD_POS_MIN], its_data[VSOMEIP_METHOD_POS_MAX]); #ifndef ANDROID - if(e2e_provider_->is_protected({its_service, its_method})) { - its_output_buffer.assign(its_data, its_data + VSOMEIP_PAYLOAD_POS); - e2e_buffer its_input_buffer(its_data + VSOMEIP_PAYLOAD_POS, its_data + its_size); - e2e_provider_->protect({its_service, its_method}, its_input_buffer); - its_output_buffer.resize(its_input_buffer.size() + VSOMEIP_PAYLOAD_POS); - std::copy(its_input_buffer.begin(), its_input_buffer.end(), - its_output_buffer.begin() + VSOMEIP_PAYLOAD_POS); - its_data = its_output_buffer.data(); + if (e2e_provider_->is_protected({its_service, its_method})) { + auto its_base = e2e_provider_->get_protection_base({its_service, its_method}); + its_buffer.assign(its_data + its_base, its_data + its_size); + e2e_provider_->protect({its_service, its_method}, its_buffer, _message->get_instance()); + its_buffer.insert(its_buffer.begin(), its_data, its_data + its_base); + its_data = its_buffer.data(); } #endif } @@ -1326,7 +1351,11 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, } } } else { - its_instance = ep_mgr_impl_->find_instance(its_service, _receiver); + if(_destination.is_multicast()) { + its_instance = ep_mgr_impl_->find_instance_multicast(its_service, _remote_address); + } else { + its_instance = ep_mgr_impl_->find_instance(its_service, _receiver); + } if (its_instance == 0xFFFF) { its_method = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_METHOD_POS_MIN], @@ -1403,17 +1432,19 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); #ifndef ANDROID - if( e2e_provider_->is_checked({its_service, its_method})) { - e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data + _size); - e2e_provider_->check({its_service, its_method}, inputBuffer, its_check_status); - - if ( its_check_status != e2e::profile_interface::generic_check_status::E2E_OK ) { - VSOMEIP_INFO << std::hex << "E2E protection: CRC check failed for service: " << its_service << " method: " << its_method; + if (e2e_provider_->is_checked({its_service, its_method})) { + auto its_base = e2e_provider_->get_protection_base({its_service, its_method}); + e2e_buffer its_buffer(_data + its_base, _data + _size); + e2e_provider_->check({its_service, its_method}, + its_buffer, its_instance, its_check_status); + + if (its_check_status != e2e::profile_interface::generic_check_status::E2E_OK) { + VSOMEIP_INFO << "E2E protection: CRC check failed for service: " + << std::hex << its_service << " method: " << its_method; } } #endif } - // Common way of message handling #ifdef USE_DLT is_forwarded = @@ -1888,6 +1919,13 @@ bool routing_manager_impl::deliver_notification( } } + // incoming events statistics + (void) insert_event_statistics( + _service, + _instance, + its_event_id, + utility::get_payload_size(_data, _length)); + if (its_event->get_type() != event_type_e::ET_SELECTIVE_EVENT) { for (const auto its_local_client : its_event->get_subscribers()) { if (its_local_client == host_->get_client()) { @@ -2035,11 +2073,12 @@ void routing_manager_impl::init_service_info( const bool is_someip = configuration_->is_someip(_service, _instance); uint16_t its_reliable_port = configuration_->get_reliable_port( _service, _instance); + bool _is_found(false); if (ILLEGAL_PORT != its_reliable_port) { std::shared_ptr<endpoint> its_reliable_endpoint = ep_mgr_impl_->find_or_create_server_endpoint( its_reliable_port, true, is_someip, _service, - _instance); + _instance, _is_found); if (its_reliable_endpoint) { its_info->set_endpoint(its_reliable_endpoint, true); } @@ -2050,7 +2089,7 @@ void routing_manager_impl::init_service_info( std::shared_ptr<endpoint> its_unreliable_endpoint = ep_mgr_impl_->find_or_create_server_endpoint( its_unreliable_port, false, is_someip, _service, - _instance); + _instance, _is_found); if (its_unreliable_endpoint) { its_info->set_endpoint(its_unreliable_endpoint, false); } @@ -2570,10 +2609,15 @@ routing_manager_impl::expire_subscriptions( // Note: get_remote_subscription delivers a copied // set of subscriptions. Thus, its is possible to // to remove them within the loop. - const auto its_ep_definition = - (_reliable) ? its_subscription->get_reliable() : + auto its_ep_definition = (_reliable) ? + its_subscription->get_reliable() : its_subscription->get_unreliable(); + if (!its_ep_definition && expire_all) + its_ep_definition = (!_reliable) ? + its_subscription->get_reliable() : + its_subscription->get_unreliable(); + if (its_ep_definition && its_ep_definition->get_address() == _address && (expire_all || @@ -2583,7 +2627,7 @@ routing_manager_impl::expire_subscriptions( // TODO: Check whether subscriptions to different hosts are valid. // IF yes, we probably need to simply reset the corresponding // endpoint instead of removing the subscription... - VSOMEIP_ERROR << __func__ + VSOMEIP_INFO << __func__ << ": removing subscription to " << std::hex << its_info->get_service() << "." << std::hex << its_info->get_instance() << "." @@ -2591,21 +2635,23 @@ routing_manager_impl::expire_subscriptions( << " from target " << its_ep_definition->get_address() << ":" << std::dec << its_ep_definition->get_port() - << " reliable=" << _reliable; + << " reliable=" + << std::boolalpha << its_ep_definition->is_reliable(); if (expire_all) { - const auto its_ep_definition2 = - (!_reliable) ? its_subscription->get_reliable() : - its_subscription->get_unreliable(); - if (its_ep_definition2) { - VSOMEIP_ERROR << __func__ + its_ep_definition = (!its_ep_definition->is_reliable()) ? + its_subscription->get_reliable() : + its_subscription->get_unreliable(); + if (its_ep_definition) { + VSOMEIP_INFO << __func__ << ": removing subscription to " << std::hex << its_info->get_service() << "." << std::hex << its_info->get_instance() << "." << std::hex << its_info->get_eventgroup() << " from target " - << its_ep_definition2->get_address() << ":" - << std::dec << its_ep_definition2->get_port() - << " reliable=" << !_reliable; + << its_ep_definition->get_address() << ":" + << std::dec << its_ep_definition->get_port() + << " reliable=" + << std::boolalpha << its_ep_definition->is_reliable(); } } on_remote_unsubscribe(its_subscription); @@ -2789,9 +2835,10 @@ void routing_manager_impl::on_remote_unsubscribe( void routing_manager_impl::on_subscribe_ack_with_multicast( service_t _service, instance_t _instance, + const boost::asio::ip::address &_sender, const boost::asio::ip::address &_address, uint16_t _port) { ep_mgr_impl_->find_or_create_multicast_endpoint(_service, - _instance, _address, _port); + _instance, _sender, _address, _port); } void routing_manager_impl::on_subscribe_ack(client_t _client, @@ -4164,307 +4211,6 @@ void routing_manager_impl::on_resend_provided_events_response( } } -void routing_manager_impl::on_security_update_timeout( - const boost::system::error_code& _error, - pending_security_update_id_t _id, - std::shared_ptr<boost::asio::steady_timer> _timer) { - (void)_timer; - if (_error) { - // timer was cancelled - return; - } - security_update_state_e its_state = security_update_state_e::SU_UNKNOWN_USER_ID; - std::unordered_set<client_t> its_missing_clients = pending_security_update_get(_id); - { - // erase timer - std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); - security_update_timers_.erase(_id); - } - { - // print missing responses and check if some clients did not respond because they already disconnected - if (!its_missing_clients.empty()) { - for (auto its_client : its_missing_clients) { - VSOMEIP_INFO << __func__ << ": Client 0x" << std::hex << its_client - << " did not respond to the policy update / removal with ID: 0x" << std::hex << _id; - if (!find_local(its_client)) { - VSOMEIP_INFO << __func__ << ": Client 0x" << std::hex << its_client - << " is not connected anymore, do not expect answer for policy update / removal with ID: 0x" - << std::hex << _id; - pending_security_update_remove(_id, its_client); - } - } - } - - its_missing_clients = pending_security_update_get(_id); - if (its_missing_clients.empty()) { - VSOMEIP_INFO << __func__ << ": Received all responses for " - "security update/removal ID: 0x" << std::hex << _id; - its_state = security_update_state_e::SU_SUCCESS; - } - { - // erase pending security update - std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); - pending_security_updates_.erase(_id); - } - - // call handler with error on timeout or with SUCCESS if missing clients are not connected - std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); - const auto found_handler = security_update_handlers_.find(_id); - if (found_handler != security_update_handlers_.end()) { - found_handler->second(its_state); - security_update_handlers_.erase(found_handler); - } else { - VSOMEIP_WARNING << __func__ << ": Callback not found for security update / removal with ID: 0x" - << std::hex << _id; - } - } -} - -bool routing_manager_impl::update_security_policy_configuration( - uint32_t _uid, uint32_t _gid, - const std::shared_ptr<policy>& _policy, - const std::shared_ptr<payload>& _payload, - const security_update_handler_t& _handler) { - bool ret(true); - // cache security policy payload for later distribution to new registering clients - stub_->policy_cache_add(_uid, _payload); - - // update security policy from configuration - security::get()->update_security_policy(_uid, _gid, _policy); - - // determine currently connected clients - std::unordered_set<client_t> its_clients_to_inform = ep_mgr_impl_->get_connected_clients(); - - // add handler - pending_security_update_id_t its_id; - if (!its_clients_to_inform.empty()) { - its_id = pending_security_update_add(its_clients_to_inform); - { - std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); - security_update_handlers_[its_id] = _handler; - } - - { - std::shared_ptr<boost::asio::steady_timer> its_timer = - std::make_shared<boost::asio::steady_timer>(io_); - boost::system::error_code ec; - its_timer->expires_from_now(std::chrono::milliseconds(3000), ec); - if (!ec) { - its_timer->async_wait( - std::bind( - &routing_manager_impl::on_security_update_timeout, - std::static_pointer_cast<routing_manager_impl>( - shared_from_this()), - std::placeholders::_1, its_id, its_timer)); - } else { - VSOMEIP_ERROR << __func__ << ": timer creation: " << ec.message(); - } - std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); - security_update_timers_[its_id] = its_timer; - } - - // trigger all currently connected clients to update the security policy - uint32_t sent_counter(0); - uint32_t its_tranche = - uint32_t(its_clients_to_inform.size() >= 10 ? (its_clients_to_inform.size() / 10) : 1); - VSOMEIP_INFO << __func__ << ": Informing [" << std::dec << its_clients_to_inform.size() - << "] currently connected clients about policy update for UID: " - << std::dec << _uid << " with update ID: 0x" << std::hex << its_id; - for (auto its_client : its_clients_to_inform) { - if (!stub_->send_update_security_policy_request(its_client, its_id, _uid, _payload)) { - VSOMEIP_INFO << __func__ << ": Couldn't send update security policy " - << "request to client 0x" << std::hex << std::setw(4) - << std::setfill('0') << its_client << " policy UID: " - << std::hex << std::setw(4) << std::setfill('0') << _uid << " GID: " - << std::hex << std::setw(4) << std::setfill('0') << _gid - << " with update ID: 0x" << std::hex << its_id - << " as client already disconnected"; - // remove client from expected answer list - pending_security_update_remove(its_id, its_client); - } - sent_counter++; - // Prevent burst - if (sent_counter % its_tranche == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - } - } else { - // if routing manager has no client call the handler directly - _handler(security_update_state_e::SU_SUCCESS); - } - return ret; -} - -bool routing_manager_impl::remove_security_policy_configuration( - uint32_t _uid, uint32_t _gid, const security_update_handler_t& _handler) { - bool ret(true); - - // remove security policy from configuration (only if there was a updateACL call before) - if (stub_->is_policy_cached(_uid)) { - if (!security::get()->remove_security_policy(_uid, _gid)) { - _handler(security_update_state_e::SU_UNKNOWN_USER_ID); - ret = false; - } else { - // remove policy from cache to prevent sending it to registering clients - stub_->policy_cache_remove(_uid); - - // add handler - pending_security_update_id_t its_id; - - // determine currently connected clients - std::unordered_set<client_t> its_clients_to_inform = ep_mgr_impl_->get_connected_clients(); - - if (!its_clients_to_inform.empty()) { - its_id = pending_security_update_add(its_clients_to_inform); - { - std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); - security_update_handlers_[its_id] = _handler; - } - - { - std::shared_ptr<boost::asio::steady_timer> its_timer = - std::make_shared<boost::asio::steady_timer>(io_); - boost::system::error_code ec; - its_timer->expires_from_now(std::chrono::milliseconds(3000), ec); - if (!ec) { - its_timer->async_wait( - std::bind( - &routing_manager_impl::on_security_update_timeout, - std::static_pointer_cast<routing_manager_impl>( - shared_from_this()), - std::placeholders::_1, its_id, its_timer)); - } else { - VSOMEIP_ERROR << __func__ << ": timer creation: " << ec.message(); - } - std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); - security_update_timers_[its_id] = its_timer; - } - - // trigger all clients to remove the security policy - uint32_t sent_counter(0); - uint32_t its_tranche = - uint32_t(its_clients_to_inform.size() >= 10 ? (its_clients_to_inform.size() / 10) : 1); - VSOMEIP_INFO << __func__ << ": Informing [" << std::dec << its_clients_to_inform.size() - << "] currently connected clients about policy removal for UID: " - << std::dec << _uid << " with update ID: " << its_id; - for (auto its_client : its_clients_to_inform) { - if (!stub_->send_remove_security_policy_request(its_client, its_id, _uid, _gid)) { - VSOMEIP_INFO << __func__ << ": Couldn't send remove security policy " - << "request to client 0x" << std::hex << std::setw(4) - << std::setfill('0') << its_client << " policy UID: " - << std::hex << std::setw(4) << std::setfill('0') << _uid << " GID: " - << std::hex << std::setw(4) << std::setfill('0') << _gid - << " with update ID: 0x" << std::hex << its_id - << " as client already disconnected"; - // remove client from expected answer list - pending_security_update_remove(its_id, its_client); - } - sent_counter++; - // Prevent burst - if (sent_counter % its_tranche == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - } - } else { - // if routing manager has no client call the handler directly - _handler(security_update_state_e::SU_SUCCESS); - } - } - } - else { - _handler(security_update_state_e::SU_UNKNOWN_USER_ID); - ret = false; - } - return ret; -} - -pending_security_update_id_t routing_manager_impl::pending_security_update_add( - const std::unordered_set<client_t>& _clients) { - std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); - if (++pending_security_update_id_ == 0) { - pending_security_update_id_++; - } - pending_security_updates_[pending_security_update_id_] = _clients; - return pending_security_update_id_; -} - -std::unordered_set<client_t> routing_manager_impl::pending_security_update_get( - pending_security_update_id_t _id) { - std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); - std::unordered_set<client_t> its_missing_clients; - auto found_si = pending_security_updates_.find(_id); - if (found_si != pending_security_updates_.end()) { - its_missing_clients = pending_security_updates_[_id]; - } - return its_missing_clients; -} - -bool routing_manager_impl::pending_security_update_remove( - pending_security_update_id_t _id, client_t _client) { - std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); - auto found_si = pending_security_updates_.find(_id); - if (found_si != pending_security_updates_.end()) { - if (found_si->second.erase(_client)) { - return true; - } - } - return false; -} - -bool routing_manager_impl::is_pending_security_update_finished( - pending_security_update_id_t _id) { - std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); - bool ret(false); - auto found_si = pending_security_updates_.find(_id); - if (found_si != pending_security_updates_.end()) { - if (!found_si->second.size()) { - ret = true; - } - } - if (ret) { - pending_security_updates_.erase(_id); - } - return ret; -} - -void routing_manager_impl::on_security_update_response( - pending_security_update_id_t _id, client_t _client) { - if (pending_security_update_remove(_id, _client)) { - if (is_pending_security_update_finished(_id)) { - // cancel timeout timer - { - std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); - auto found_timer = security_update_timers_.find(_id); - if (found_timer != security_update_timers_.end()) { - boost::system::error_code ec; - found_timer->second->cancel(ec); - security_update_timers_.erase(found_timer); - } else { - VSOMEIP_WARNING << __func__ << ": Received all responses " - "for security update/removal ID: 0x" - << std::hex << _id << " but timeout already happened"; - } - } - - // call handler - { - std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); - auto found_handler = security_update_handlers_.find(_id); - if (found_handler != security_update_handlers_.end()) { - found_handler->second(security_update_state_e::SU_SUCCESS); - security_update_handlers_.erase(found_handler); - VSOMEIP_INFO << __func__ << ": Received all responses for " - "security update/removal ID: 0x" << std::hex << _id; - } else { - VSOMEIP_WARNING << __func__ << ": Received all responses " - "for security update/removal ID: 0x" - << std::hex << _id << " but didn't find handler"; - } - } - } - } -} - void routing_manager_impl::print_stub_status() const { stub_->print_endpoint_status(); } @@ -4557,4 +4303,132 @@ routing_manager_impl::send_unsubscription(client_t _offering_client, } } +bool +routing_manager_impl::update_security_policy_configuration( + uint32_t _uid, uint32_t _gid, + const std::shared_ptr<policy> &_policy, + const std::shared_ptr<payload> &_payload, + const security_update_handler_t &_handler) { + + if (stub_) + return stub_->update_security_policy_configuration(_uid, _gid, + _policy, _payload, _handler); + + return (false); +} + +bool +routing_manager_impl::remove_security_policy_configuration( + uint32_t _uid, uint32_t _gid, + const security_update_handler_t &_handler) { + + if (stub_) + return stub_->remove_security_policy_configuration(_uid, _gid, + _handler); + + return (false); +} + +bool routing_manager_impl::insert_event_statistics(service_t _service, instance_t _instance, + method_t _method, length_t _length) { + + static uint32_t its_max_messages = configuration_->get_statistics_max_messages(); + std::lock_guard<std::mutex> its_lock(message_statistics_mutex_); + const auto its_tuple = std::make_tuple(_service, _instance, _method); + const auto its_main_s = message_statistics_.find(its_tuple); + if (its_main_s != message_statistics_.end()) { + // increase counter and calculate moving avergae for payload length + its_main_s->second.avg_length_ = + (its_main_s->second.avg_length_ * its_main_s->second.counter_ + _length) / + (its_main_s->second.counter_ + 1); + its_main_s->second.counter_++; + + if (its_tuple == message_to_discard_) { + // check list for entry with least counter value + uint32_t its_min_count(0xFFFFFFFF); + auto its_tuple_to_discard = std::make_tuple(0xFFFF, 0xFFFF, 0xFFFF); + for (const auto it : message_statistics_) { + if (it.second.counter_ < its_min_count) { + its_min_count = it.second.counter_; + its_tuple_to_discard = it.first; + } + } + if (its_min_count != 0xFFFF + && its_min_count < its_main_s->second.counter_) { + // update message to discard with current message + message_to_discard_ = its_tuple; + } + } + } else { + if (message_statistics_.size() < its_max_messages) { + message_statistics_[its_tuple] = {1, _length}; + message_to_discard_ = its_tuple; + } else { + // no slot empty + const auto it = message_statistics_.find(message_to_discard_); + if (it != message_statistics_.end() + && it->second.counter_ == 1) { + message_statistics_.erase(message_to_discard_); + message_statistics_[its_tuple] = {1, _length}; + message_to_discard_ = its_tuple; + } else { + // ignore message + ignored_statistics_counter_++; + return false; + } + } + } + return true; +} + +void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code const & _error) { + if (!_error) { + static uint32_t its_interval = configuration_->get_statistics_interval(); + its_interval = its_interval >= 1000 ? its_interval : 1000; + static uint32_t its_min_freq = configuration_->get_statistics_min_freq(); + std::stringstream its_log; + { + std::lock_guard<std::mutex> its_lock(message_statistics_mutex_); + for (const auto s : message_statistics_) { + if (s.second.counter_ / (its_interval / 1000) >= its_min_freq) { + uint16_t its_subscribed(0); + std::shared_ptr<event> its_event = find_event(std::get<0>(s.first), std::get<1>(s.first), std::get<2>(s.first)); + if (its_event) { + if (!its_event->is_provided()) { + its_subscribed = static_cast<std::uint16_t>(its_event->get_subscribers().size()); + } + } + its_log << std::hex << std::setw(4) << std::setfill('0') + << std::get<0>(s.first) << "." + << std::get<1>(s.first) << "." + << std::get<2>(s.first) << ": #=" + << std::dec << s.second.counter_ << " L=" + << s.second.avg_length_ << " S=" + << std::dec << its_subscribed << ", "; + } + } + + if (ignored_statistics_counter_) { + its_log << std::dec << " #ignored: " << ignored_statistics_counter_; + } + + message_statistics_.clear(); + message_to_discard_ = std::make_tuple(0x00, 0x00, 0x00); + ignored_statistics_counter_ = 0; + } + + if (its_log.str().length() > 0) { + VSOMEIP_INFO << "Received events statistics: [" << its_log.str() << "]"; + } + + { + std::lock_guard<std::mutex> its_lock(statistics_log_timer_mutex_); + statistics_log_timer_.expires_from_now(std::chrono::milliseconds(its_interval)); + statistics_log_timer_.async_wait( + std::bind(&routing_manager_impl::statistics_log_timer_cbk, + this, std::placeholders::_1)); + } + } +} + } // namespace vsomeip_v3 diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index 3b95a2d..410559b 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -125,7 +125,7 @@ void routing_manager_proxy::stop() { sender_ = nullptr; } - for (auto client: ep_mgr_->get_connected_clients()) { + for (const auto client : ep_mgr_->get_connected_clients()) { if (client != VSOMEIP_ROUTING_CLIENT) { remove_local(client, true); } @@ -202,12 +202,17 @@ void routing_manager_proxy::stop_offer_service(client_t _client, (void)_client; - routing_manager_base::stop_offer_service(_client, _service, _instance, _major, _minor); - clear_remote_subscriber_count(_service, _instance); + { + // Hold the mutex to ensure no placeholder event is created inbetween. + std::lock_guard<std::mutex> its_lock(stop_mutex_); + + routing_manager_base::stop_offer_service(_client, _service, _instance, _major, _minor); + clear_remote_subscriber_count(_service, _instance); - // Reliable/Unreliable unimportant as routing_proxy does not - // create server endpoints which needs to be freed - clear_service_info(_service, _instance, false); + // Note: The last argument does not matter here as a proxy + // does not manage endpoints to the external network. + clear_service_info(_service, _instance, false); + } { std::lock_guard<std::mutex> its_lock(state_mutex_); @@ -876,6 +881,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, client_t its_subscriber; remote_subscription_id_t its_subscription_id(PENDING_SUBSCRIPTION_ID); std::uint32_t its_remote_subscriber_count(0); + bool is_internal_policy_update(false); std::uint32_t its_sender_uid = std::get<0>(_credentials); std::uint32_t its_sender_gid = std::get<1>(_credentials); @@ -1389,6 +1395,9 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, << its_client << ")"; break; } + case VSOMEIP_UPDATE_SECURITY_POLICY_INT: + is_internal_policy_update = true; + /* Fallthrough */ case VSOMEIP_UPDATE_SECURITY_POLICY: { if (_size < VSOMEIP_COMMAND_HEADER_SIZE + sizeof(pending_security_update_id_t) || _size - VSOMEIP_COMMAND_HEADER_SIZE != its_length) { @@ -1397,23 +1406,33 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, } if (!its_security->is_enabled() || message_from_routing) { pending_security_update_id_t its_update_id(0); - uint32_t its_uid(0); - uint32_t its_gid(0); std::memcpy(&its_update_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(pending_security_update_id_t)); std::shared_ptr<policy> its_policy(std::make_shared<policy>()); - const byte_t* buffer_ptr = _data + (VSOMEIP_COMMAND_PAYLOAD_POS + + const byte_t *its_policy_data = _data + (VSOMEIP_COMMAND_PAYLOAD_POS + sizeof(pending_security_update_id_t)); - uint32_t its_size = uint32_t(_size - (VSOMEIP_COMMAND_PAYLOAD_POS + uint32_t its_policy_size = uint32_t(_size - (VSOMEIP_COMMAND_PAYLOAD_POS + sizeof(pending_security_update_id_t))); - its_security->parse_policy(buffer_ptr, its_size, its_uid, its_gid, its_policy); - if (its_security->is_policy_update_allowed(its_uid, its_policy)) { - its_security->update_security_policy(its_uid, its_gid, its_policy); - send_update_security_policy_response(its_update_id); + bool is_valid = its_policy->deserialize(its_policy_data, its_policy_size); + if (is_valid) { + uint32_t its_uid; + uint32_t its_gid; + is_valid = its_policy->get_uid_gid(its_uid, its_gid); + if (is_valid) { + if (is_internal_policy_update + || its_security->is_policy_update_allowed(its_uid, its_policy)) { + its_security->update_security_policy(its_uid, its_gid, its_policy); + send_update_security_policy_response(its_update_id); + } + } else { + VSOMEIP_ERROR << "vSomeIP Security: Policy has no valid uid/gid!"; + } + } else { + VSOMEIP_ERROR << "vSomeIP Security: Policy deserialization failed!"; } } else { VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() @@ -2350,21 +2369,29 @@ bool routing_manager_proxy::create_placeholder_event_and_subscribe( service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _notifier, client_t _client) { + std::lock_guard<std::mutex> its_lock(stop_mutex_); + bool is_inserted(false); - // we received a event which was not yet requested/offered - // create a placeholder field until someone requests/offers this event with - // full information like eventgroup, field or not etc. - std::set<eventgroup_t> its_eventgroups({ _eventgroup }); - // routing_manager_proxy: Always register with own client id and shadow = false - routing_manager_base::register_event(host_->get_client(), - _service, _instance, _notifier, - its_eventgroups, event_type_e::ET_UNKNOWN, reliability_type_e::RT_UNKNOWN, - std::chrono::milliseconds::zero(), false, true, nullptr, false, false, - true); - std::shared_ptr<event> its_event = find_event(_service, _instance, _notifier); - if (its_event) { - is_inserted = its_event->add_subscriber(_eventgroup, _client, false); + + if (find_service(_service, _instance)) { + // We received an event for an existing service which was not yet + // requested/offered. Create a placeholder field until someone + // requests/offers this event with full information like eventgroup, + // field/event, etc. + std::set<eventgroup_t> its_eventgroups({ _eventgroup }); + // routing_manager_proxy: Always register with own client id and shadow = false + routing_manager_base::register_event(host_->get_client(), + _service, _instance, _notifier, + its_eventgroups, event_type_e::ET_UNKNOWN, reliability_type_e::RT_UNKNOWN, + std::chrono::milliseconds::zero(), false, true, nullptr, false, false, + true); + + std::shared_ptr<event> its_event = find_event(_service, _instance, _notifier); + if (its_event) { + is_inserted = its_event->add_subscriber(_eventgroup, _client, false); + } } + return is_inserted; } @@ -2561,7 +2588,8 @@ void routing_manager_proxy::on_update_security_credentials(const byte_t *_data, uint32_t i = 0; while ( (i + sizeof(uint32_t) + sizeof(uint32_t)) <= _size) { std::shared_ptr<policy> its_policy(std::make_shared<policy>()); - ranges_t its_uid_ranges, its_gid_ranges; + + boost::icl::interval_set<uint32_t> its_gid_set; uint32_t its_uid, its_gid; std::memcpy(&its_uid, &_data[i], sizeof(uint32_t)); @@ -2569,11 +2597,13 @@ void routing_manager_proxy::on_update_security_credentials(const byte_t *_data, std::memcpy(&its_gid, &_data[i], sizeof(uint32_t)); i += uint32_t(sizeof(uint32_t)); - its_uid_ranges.insert(std::make_pair(its_uid, its_uid)); - its_gid_ranges.insert(std::make_pair(its_gid, its_gid)); + its_gid_set.insert(its_gid); + its_policy->credentials_ += std::make_pair( + boost::icl::interval<uid_t>::closed(its_uid, its_uid), its_gid_set); its_policy->allow_who_ = true; - its_policy->ids_.insert(std::make_pair(its_uid_ranges, its_gid_ranges)); + its_policy->allow_what_ = true; + its_security->add_security_credentials(its_uid, its_gid, its_policy, get_client()); } } diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp index d7a1dd2..72668ea 100644 --- a/implementation/routing/src/routing_manager_stub.cpp +++ b/implementation/routing/src/routing_manager_stub.cpp @@ -47,7 +47,8 @@ routing_manager_stub::routing_manager_stub( client_registration_running_(false), max_local_message_size_(configuration_->get_max_message_size_local()), configured_watchdog_timeout_(configuration_->get_watchdog_timeout()), - pinged_clients_timer_(io_) { + pinged_clients_timer_(io_), + pending_security_update_id_(0) { } routing_manager_stub::~routing_manager_stub() { @@ -760,7 +761,7 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, std::memcpy(&its_pending_security_update_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(pending_security_update_id_t)); - host_->on_security_update_response(its_pending_security_update_id ,its_client); + on_security_update_response(its_pending_security_update_id ,its_client); break; } case VSOMEIP_REMOVE_SECURITY_POLICY_RESPONSE: { @@ -773,7 +774,7 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, std::memcpy(&its_pending_security_update_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(pending_security_update_id_t)); - host_->on_security_update_response(its_pending_security_update_id ,its_client); + on_security_update_response(its_pending_security_update_id ,its_client); break; } } @@ -787,9 +788,20 @@ void routing_manager_stub::on_register_application(client_t _client) { VSOMEIP_WARNING << "Reregistering application: " << std::hex << _client << ". Last registration might have been taken too long."; } else { - (void)host_->find_or_create_local(_client); - std::lock_guard<std::mutex> its_lock(routing_info_mutex_); - routing_info_[_client].first = 0; + endpoint = host_->find_or_create_local(_client); + { + std::lock_guard<std::mutex> its_lock(routing_info_mutex_); + routing_info_[_client].first = 0; + } + + std::pair<uid_t, gid_t> its_uid_gid; + std::set<std::shared_ptr<policy> > its_policies; + + security::get()->get_client_to_uid_gid_mapping(_client, its_uid_gid); + get_requester_policies(its_uid_gid.first, its_uid_gid.second, its_policies); + + if (!its_policies.empty()) + send_requester_policies({ _client }, its_policies); } } @@ -2153,4 +2165,408 @@ bool routing_manager_stub::send_remove_security_policy_request( client_t _client } } +bool +routing_manager_stub::add_requester_policies(uid_t _uid, gid_t _gid, + const std::set<std::shared_ptr<policy> > &_policies) { + + std::lock_guard<std::mutex> its_lock(requester_policies_mutex_); + auto found_uid = requester_policies_.find(_uid); + if (found_uid != requester_policies_.end()) { + auto found_gid = found_uid->second.find(_gid); + if (found_gid != found_uid->second.end()) { + found_gid->second.insert(_policies.begin(), _policies.end()); + } else { + found_uid->second.insert(std::make_pair(_gid, _policies)); + } + } else { + requester_policies_[_uid][_gid] = _policies; + } + + // Check whether clients with uid/gid are already registered. + // If yes, update their policy + std::unordered_set<client_t> its_clients; + security::get()->get_clients(_uid, _gid, its_clients); + + if (!its_clients.empty()) + return send_requester_policies(its_clients, _policies); + + return (true); +} + +void +routing_manager_stub::remove_requester_policies(uid_t _uid, gid_t _gid) { + + std::lock_guard<std::mutex> its_lock(requester_policies_mutex_); + auto found_uid = requester_policies_.find(_uid); + if (found_uid != requester_policies_.end()) { + found_uid->second.erase(_gid); + if (found_uid->second.empty()) + requester_policies_.erase(_uid); + } +} + +void +routing_manager_stub::get_requester_policies(uid_t _uid, gid_t _gid, + std::set<std::shared_ptr<policy> > &_policies) const { + + std::lock_guard<std::mutex> its_lock(requester_policies_mutex_); + auto found_uid = requester_policies_.find(_uid); + if (found_uid != requester_policies_.end()) { + auto found_gid = found_uid->second.find(_gid); + if (found_gid != found_uid->second.end()) + _policies = found_gid->second; + } +} + +void +routing_manager_stub::add_pending_security_update_handler( + pending_security_update_id_t _id, security_update_handler_t _handler) { + + std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); + security_update_handlers_[_id] = _handler; +} + +void +routing_manager_stub::add_pending_security_update_timer( + pending_security_update_id_t _id) { + + std::shared_ptr<boost::asio::steady_timer> its_timer + = std::make_shared<boost::asio::steady_timer>(io_); + + boost::system::error_code ec; + its_timer->expires_from_now(std::chrono::milliseconds(3000), ec); + if (!ec) { + its_timer->async_wait( + std::bind( + &routing_manager_stub::on_security_update_timeout, + shared_from_this(), + std::placeholders::_1, _id, its_timer)); + } else { + VSOMEIP_ERROR << __func__ + << "[" << std::dec << _id << "]: timer creation: " + << ec.message(); + } + std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); + security_update_timers_[_id] = its_timer; +} + +bool +routing_manager_stub::send_requester_policies(const std::unordered_set<client_t> &_clients, + const std::set<std::shared_ptr<policy> > &_policies) { + + pending_security_update_id_t its_policy_id; + + // serialize the policies and send them... + for (const auto p : _policies) { + std::vector<byte_t> its_policy_data; + if (p->serialize(its_policy_data)) { + std::vector<byte_t> its_message; + its_message.push_back(VSOMEIP_UPDATE_SECURITY_POLICY_INT); + its_message.push_back(0); + its_message.push_back(0); + + uint32_t its_policy_size = static_cast<uint32_t>(its_policy_data.size() + sizeof(uint32_t)); + its_message.push_back(VSOMEIP_LONG_BYTE0(its_policy_size)); + its_message.push_back(VSOMEIP_LONG_BYTE1(its_policy_size)); + its_message.push_back(VSOMEIP_LONG_BYTE2(its_policy_size)); + its_message.push_back(VSOMEIP_LONG_BYTE3(its_policy_size)); + + its_policy_id = pending_security_update_add(_clients); + its_message.push_back(VSOMEIP_LONG_BYTE0(its_policy_id)); + its_message.push_back(VSOMEIP_LONG_BYTE1(its_policy_id)); + its_message.push_back(VSOMEIP_LONG_BYTE2(its_policy_id)); + its_message.push_back(VSOMEIP_LONG_BYTE3(its_policy_id)); + + its_message.insert(its_message.end(), its_policy_data.begin(), its_policy_data.end()); + + for (const auto c : _clients) { + std::shared_ptr<endpoint> its_endpoint = host_->find_local(c); + if (its_endpoint) + its_endpoint->send(&its_message[0], static_cast<uint32_t>(its_message.size())); + } + } + } + + return (true); +} + +void routing_manager_stub::on_security_update_timeout( + const boost::system::error_code& _error, + pending_security_update_id_t _id, + std::shared_ptr<boost::asio::steady_timer> _timer) { + (void)_timer; + if (_error) { + // timer was cancelled + return; + } + security_update_state_e its_state = security_update_state_e::SU_UNKNOWN_USER_ID; + std::unordered_set<client_t> its_missing_clients = pending_security_update_get(_id); + { + // erase timer + std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); + security_update_timers_.erase(_id); + } + { + // print missing responses and check if some clients did not respond because they already disconnected + if (!its_missing_clients.empty()) { + for (auto its_client : its_missing_clients) { + VSOMEIP_INFO << __func__ << ": Client 0x" << std::hex << its_client + << " did not respond to the policy update / removal with ID: 0x" << std::hex << _id; + if (!host_->find_local(its_client)) { + VSOMEIP_INFO << __func__ << ": Client 0x" << std::hex << its_client + << " is not connected anymore, do not expect answer for policy update / removal with ID: 0x" + << std::hex << _id; + pending_security_update_remove(_id, its_client); + } + } + } + + its_missing_clients = pending_security_update_get(_id); + if (its_missing_clients.empty()) { + VSOMEIP_INFO << __func__ << ": Received all responses for " + "security update/removal ID: 0x" << std::hex << _id; + its_state = security_update_state_e::SU_SUCCESS; + } + { + // erase pending security update + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + pending_security_updates_.erase(_id); + } + + // call handler with error on timeout or with SUCCESS if missing clients are not connected + std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); + const auto found_handler = security_update_handlers_.find(_id); + if (found_handler != security_update_handlers_.end()) { + found_handler->second(its_state); + security_update_handlers_.erase(found_handler); + } else { + VSOMEIP_WARNING << __func__ << ": Callback not found for security update / removal with ID: 0x" + << std::hex << _id; + } + } +} + +bool routing_manager_stub::update_security_policy_configuration( + uint32_t _uid, uint32_t _gid, + const std::shared_ptr<policy> &_policy, + const std::shared_ptr<payload> &_payload, + const security_update_handler_t &_handler) { + + bool ret(true); + + // cache security policy payload for later distribution to new registering clients + policy_cache_add(_uid, _payload); + + // update security policy from configuration + security::get()->update_security_policy(_uid, _gid, _policy); + + // Build requester policies for the services offered by the new policy + std::set<std::shared_ptr<policy> > its_requesters; + security::get()->get_requester_policies(_policy, its_requesters); + + // and add them to the requester policy cache + add_requester_policies(_uid, _gid, its_requesters); + + // determine currently connected clients + std::unordered_set<client_t> its_clients_to_inform; + auto its_epm = host_->get_endpoint_manager(); + if (its_epm) + its_clients_to_inform = its_epm->get_connected_clients(); + + // add handler + pending_security_update_id_t its_id; + if (!its_clients_to_inform.empty()) { + its_id = pending_security_update_add(its_clients_to_inform); + + add_pending_security_update_handler(its_id, _handler); + add_pending_security_update_timer(its_id); + + // trigger all currently connected clients to update the security policy + uint32_t sent_counter(0); + uint32_t its_tranche = + uint32_t(its_clients_to_inform.size() >= 10 ? (its_clients_to_inform.size() / 10) : 1); + VSOMEIP_INFO << __func__ << ": Informing [" << std::dec << its_clients_to_inform.size() + << "] currently connected clients about policy update for UID: " + << std::dec << _uid << " with update ID: 0x" << std::hex << its_id; + for (auto its_client : its_clients_to_inform) { + if (!send_update_security_policy_request(its_client, its_id, _uid, _payload)) { + VSOMEIP_INFO << __func__ << ": Couldn't send update security policy " + << "request to client 0x" << std::hex << std::setw(4) + << std::setfill('0') << its_client << " policy UID: " + << std::hex << std::setw(4) << std::setfill('0') << _uid << " GID: " + << std::hex << std::setw(4) << std::setfill('0') << _gid + << " with update ID: 0x" << std::hex << its_id + << " as client already disconnected"; + // remove client from expected answer list + pending_security_update_remove(its_id, its_client); + } + sent_counter++; + // Prevent burst + if (sent_counter % its_tranche == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + } else { + // if routing manager has no client call the handler directly + _handler(security_update_state_e::SU_SUCCESS); + } + + return ret; +} + +bool routing_manager_stub::remove_security_policy_configuration( + uint32_t _uid, uint32_t _gid, const security_update_handler_t &_handler) { + + bool ret(true); + + // remove security policy from configuration (only if there was a updateACL call before) + if (is_policy_cached(_uid)) { + if (!security::get()->remove_security_policy(_uid, _gid)) { + _handler(security_update_state_e::SU_UNKNOWN_USER_ID); + ret = false; + } else { + // remove policy from cache to prevent sending it to registering clients + policy_cache_remove(_uid); + + // add handler + pending_security_update_id_t its_id; + + // determine currently connected clients + std::unordered_set<client_t> its_clients_to_inform; + auto its_epm = host_->get_endpoint_manager(); + if (its_epm) + its_clients_to_inform = its_epm->get_connected_clients(); + + if (!its_clients_to_inform.empty()) { + its_id = pending_security_update_add(its_clients_to_inform); + + add_pending_security_update_handler(its_id, _handler); + add_pending_security_update_timer(its_id); + + // trigger all clients to remove the security policy + uint32_t sent_counter(0); + uint32_t its_tranche = + uint32_t(its_clients_to_inform.size() >= 10 ? (its_clients_to_inform.size() / 10) : 1); + VSOMEIP_INFO << __func__ << ": Informing [" << std::dec << its_clients_to_inform.size() + << "] currently connected clients about policy removal for UID: " + << std::dec << _uid << " with update ID: " << its_id; + for (auto its_client : its_clients_to_inform) { + if (!send_remove_security_policy_request(its_client, its_id, _uid, _gid)) { + VSOMEIP_INFO << __func__ << ": Couldn't send remove security policy " + << "request to client 0x" << std::hex << std::setw(4) + << std::setfill('0') << its_client << " policy UID: " + << std::hex << std::setw(4) << std::setfill('0') << _uid << " GID: " + << std::hex << std::setw(4) << std::setfill('0') << _gid + << " with update ID: 0x" << std::hex << its_id + << " as client already disconnected"; + // remove client from expected answer list + pending_security_update_remove(its_id, its_client); + } + sent_counter++; + // Prevent burst + if (sent_counter % its_tranche == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + } else { + // if routing manager has no client call the handler directly + _handler(security_update_state_e::SU_SUCCESS); + } + } + } + else { + _handler(security_update_state_e::SU_UNKNOWN_USER_ID); + ret = false; + } + return ret; +} + +pending_security_update_id_t routing_manager_stub::pending_security_update_add( + const std::unordered_set<client_t>& _clients) { + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + if (++pending_security_update_id_ == 0) { + pending_security_update_id_++; + } + pending_security_updates_[pending_security_update_id_] = _clients; + + return pending_security_update_id_; +} + +std::unordered_set<client_t> routing_manager_stub::pending_security_update_get( + pending_security_update_id_t _id) { + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + std::unordered_set<client_t> its_missing_clients; + auto found_si = pending_security_updates_.find(_id); + if (found_si != pending_security_updates_.end()) { + its_missing_clients = pending_security_updates_[_id]; + } + return its_missing_clients; +} + +bool routing_manager_stub::pending_security_update_remove( + pending_security_update_id_t _id, client_t _client) { + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + auto found_si = pending_security_updates_.find(_id); + if (found_si != pending_security_updates_.end()) { + if (found_si->second.erase(_client)) { + return true; + } + } + return false; +} + +bool routing_manager_stub::is_pending_security_update_finished( + pending_security_update_id_t _id) { + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + bool ret(false); + auto found_si = pending_security_updates_.find(_id); + if (found_si != pending_security_updates_.end()) { + if (!found_si->second.size()) { + ret = true; + } + } + if (ret) { + pending_security_updates_.erase(_id); + } + return ret; +} + +void routing_manager_stub::on_security_update_response( + pending_security_update_id_t _id, client_t _client) { + if (pending_security_update_remove(_id, _client)) { + if (is_pending_security_update_finished(_id)) { + // cancel timeout timer + { + std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); + auto found_timer = security_update_timers_.find(_id); + if (found_timer != security_update_timers_.end()) { + boost::system::error_code ec; + found_timer->second->cancel(ec); + security_update_timers_.erase(found_timer); + } else { + VSOMEIP_WARNING << __func__ << ": Received all responses " + "for security update/removal ID: 0x" + << std::hex << _id << " but timeout already happened"; + } + } + + // call handler + { + std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); + auto found_handler = security_update_handlers_.find(_id); + if (found_handler != security_update_handlers_.end()) { + found_handler->second(security_update_state_e::SU_SUCCESS); + security_update_handlers_.erase(found_handler); + VSOMEIP_INFO << __func__ << ": Received all responses for " + "security update/removal ID: 0x" << std::hex << _id; + } else { + VSOMEIP_WARNING << __func__ << ": Received all responses " + "for security update/removal ID: 0x" + << std::hex << _id << " but didn't find handler"; + } + } + } + } +} + } // namespace vsomeip_v3 |