diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_impl.cpp | 1012 |
1 files changed, 864 insertions, 148 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index faf5282..f8dbd14 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -51,6 +51,9 @@ #include "../../utility/include/byteorder.hpp" #include "../../utility/include/utility.hpp" #include "../../plugin/include/plugin_manager.hpp" +#ifdef USE_DLT +#include "../../tracing/include/connector_impl.hpp" +#endif #include "../../e2e_protection/include/buffer/buffer.hpp" #include "../../e2e_protection/include/e2exf/config.hpp" @@ -75,7 +78,10 @@ routing_manager_impl::routing_manager_impl(routing_manager_host *_host) : watchdog_timer_(_host->get_io()), #endif status_log_timer_(_host->get_io()), - memory_log_timer_(_host->get_io()) + memory_log_timer_(_host->get_io()), + pending_remote_offer_id_(0), + last_resume_(std::chrono::steady_clock::now().min()), + pending_security_update_id_(0) { } @@ -90,6 +96,16 @@ client_t routing_manager_impl::get_client() const { return routing_manager_base::get_client(); } +std::set<client_t> routing_manager_impl::find_local_clients(service_t _service, instance_t _instance) { + return routing_manager_base::find_local_clients(_service, _instance); +} + +bool routing_manager_impl::is_subscribe_to_any_event_allowed(client_t _client, + service_t _service, instance_t _instance, eventgroup_t _eventgroup) { + return routing_manager_base::is_subscribe_to_any_event_allowed(_client, + _service, _instance, _eventgroup); +} + void routing_manager_impl::init() { routing_manager_base::init(); @@ -106,7 +122,7 @@ void routing_manager_impl::init() { plugin_type_e::SD_RUNTIME_PLUGIN, VSOMEIP_SD_LIBRARY); if (its_plugin) { VSOMEIP_INFO << "Service Discovery module loaded."; - discovery_ = std::dynamic_pointer_cast<sd::runtime>(its_plugin)->create_service_discovery(this); + discovery_ = std::dynamic_pointer_cast<sd::runtime>(its_plugin)->create_service_discovery(this, configuration_); discovery_->init(); } else { VSOMEIP_ERROR << "Service Discovery module could not be loaded!"; @@ -116,11 +132,11 @@ void routing_manager_impl::init() { if( configuration_->is_e2e_enabled()) { VSOMEIP_INFO << "E2E protection enabled."; - std::map<e2exf::data_identifier, std::shared_ptr<cfg::e2e>> its_e2e_configuration = configuration_->get_e2e_configuration(); + std::map<e2exf::data_identifier_t, std::shared_ptr<cfg::e2e>> its_e2e_configuration = configuration_->get_e2e_configuration(); for (auto &identifier : its_e2e_configuration) { auto its_cfg = identifier.second; if(its_cfg->profile == "CRC8") { - e2exf::data_identifier its_data_identifier = {its_cfg->service_id, its_cfg->event_id}; + e2exf::data_identifier_t its_data_identifier = {its_cfg->service_id, its_cfg->event_id}; e2e::profile01::profile_config its_profile_config = e2e::profile01::profile_config(its_cfg->crc_offset, its_cfg->data_id, (e2e::profile01::p01_data_id_mode) its_cfg->data_id_mode, its_cfg->data_length, its_cfg->counter_offset, its_cfg->data_id_nibble_offset); if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) { @@ -130,7 +146,7 @@ void routing_manager_impl::init() { custom_checkers[its_data_identifier] = std::make_shared<e2e::profile01::profile_01_checker>(its_profile_config); } } else if(its_cfg->profile == "CRC32") { - e2exf::data_identifier its_data_identifier = {its_cfg->service_id, its_cfg->event_id}; + e2exf::data_identifier_t its_data_identifier = {its_cfg->service_id, its_cfg->event_id}; e2e::profile_custom::profile_config its_profile_config = e2e::profile_custom::profile_config(its_cfg->crc_offset); if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) { @@ -237,7 +253,7 @@ void routing_manager_impl::stop() { for (auto client: get_connected_clients()) { if (client != VSOMEIP_ROUTING_CLIENT) { - remove_local(client); + remove_local(client, true); } } } @@ -296,22 +312,35 @@ void routing_manager_impl::stop_offer_service(client_t _client, << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << ":" << std::dec << int(_major) << "." << _minor << "]"; - + bool is_local(false); { - std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_); - for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) { - if (it->first == _service && it->second == _instance) { - it = pending_sd_offers_.erase(it); - break; - } else { - ++it; + std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance); + is_local = (its_info && its_info->is_local()); + } + if (is_local) { + { + std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_); + for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) { + if (it->first == _service && it->second == _instance) { + it = pending_sd_offers_.erase(it); + break; + } else { + ++it; + } } } - } - on_stop_offer_service(_client, _service, _instance, _major, _minor); - stub_->on_stop_offer_service(_client, _service, _instance, _major, _minor); - on_availability(_service, _instance, false, _major, _minor); + on_stop_offer_service(_client, _service, _instance, _major, _minor); + stub_->on_stop_offer_service(_client, _service, _instance, _major, _minor); + on_availability(_service, _instance, false, _major, _minor); + } else { + VSOMEIP_WARNING << __func__ << " received STOP_OFFER(" + << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance + << ":" << std::dec << int(_major) << "." << _minor << "] " + << "for remote service --> ignore"; + } } void routing_manager_impl::request_service(client_t _client, service_t _service, @@ -435,6 +464,8 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, << std::dec << (uint16_t)_major << "]"; const client_t its_local_client = find_local_client(_service, _instance); if (get_client() == its_local_client) { + routing_manager_base::set_incoming_subscription_state(_client, _service, _instance, + _eventgroup, _event, subscription_state_e::IS_SUBSCRIBING); auto self = shared_from_this(); host_->on_subscription(_service, _instance, _eventgroup, _client, true, [this, self, _client, _service, _instance, _eventgroup, @@ -452,6 +483,8 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, } routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _event, _subscription_type); send_pending_notify_ones(_service, _instance, _eventgroup, _client); + routing_manager_base::erase_incoming_subscription_state(_client, _service, _instance, + _eventgroup, _event); }); } else { if (discovery_) { @@ -581,8 +614,8 @@ bool routing_manager_impl::send(client_t _client, } bool routing_manager_impl::send(client_t _client, const byte_t *_data, - length_t _size, instance_t _instance, - bool _flush, bool _reliable, bool _is_valid_crc) { + length_t _size, instance_t _instance, bool _flush, bool _reliable, + client_t _bound_client, bool _is_valid_crc, bool _sent_from_remote) { bool is_sent(false); if (_size > VSOMEIP_MESSAGE_TYPE_POS) { std::shared_ptr<endpoint> its_target; @@ -616,12 +649,12 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - tc::trace_header its_header; + trace::header its_header; if (its_header.prepare(its_target, true, _instance)) tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); #endif - deliver_message(_data, _size, _instance, _reliable, _is_valid_crc); + deliver_message(_data, _size, _instance, _reliable, _bound_client, _is_valid_crc, _sent_from_remote); return true; } its_target = find_local(_client); @@ -634,7 +667,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - tc::trace_header its_header; + trace::header its_header; if (its_header.prepare(its_target, true, _instance)) tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); @@ -648,7 +681,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, || (find_local_client(its_service, _instance) == host_->get_client() && is_request)) { // TODO: find out how to handle session id here - is_sent = deliver_message(_data, _size, _instance, _reliable, _is_valid_crc); + is_sent = deliver_message(_data, _size, _instance, _reliable, _bound_client, _is_valid_crc, _sent_from_remote); } else { e2e_buffer outputBuffer; if( configuration_->is_e2e_enabled()) { @@ -675,7 +708,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - tc::trace_header its_header; + trace::header its_header; if (its_header.prepare(its_target, true, _instance)) tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); @@ -754,7 +787,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - tc::trace_header its_header; + trace::header its_header; if (its_header.prepare(nullptr, true, _instance)) tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); @@ -786,7 +819,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - tc::trace_header its_header; + trace::header its_header; if (its_header.prepare(its_target, true, _instance)) tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); @@ -869,7 +902,7 @@ bool routing_manager_impl::send_to( const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - tc::trace_header its_header; + trace::header its_header; if (its_header.prepare(its_endpoint, true, _instance)) tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); @@ -892,7 +925,7 @@ bool routing_manager_impl::send_to( const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - tc::trace_header its_header; + trace::header its_header; if (its_header.prepare(its_endpoint, true, 0x0)) tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, its_data_size); @@ -911,8 +944,10 @@ void routing_manager_impl::register_event( bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) { auto its_event = find_event(_service, _instance, _event); bool is_first(false); - if (its_event && !its_event->has_ref(_client, _is_provided)) { - is_first = true; + if (its_event) { + if (!its_event->has_ref(_client, _is_provided)) { + is_first = true; + } } else { is_first = true; } @@ -950,10 +985,10 @@ void routing_manager_impl::unregister_shadow_event(client_t _client, void routing_manager_impl::notify_one(service_t _service, instance_t _instance, event_t _event, std::shared_ptr<payload> _payload, client_t _client, - bool _force, bool _flush) { + bool _force, bool _flush, bool _remote_subscriber) { if (find_local(_client)) { routing_manager_base::notify_one(_service, _instance, _event, _payload, - _client, _force, _flush); + _client, _force, _flush, _remote_subscriber); } else { std::shared_ptr<event> its_event = find_event(_service, _instance, _event); if (its_event) { @@ -1034,6 +1069,113 @@ void routing_manager_impl::release_port(uint16_t _port, bool _reliable) { used_client_ports_[_reliable].erase(_port); } +bool routing_manager_impl::offer_service_remotely(service_t _service, + instance_t _instance, + std::uint16_t _port, + bool _reliable, + bool _magic_cookies_enabled) { + bool ret = true; + + if(!is_available(_service, _instance, ANY_MAJOR)) { + VSOMEIP_ERROR << __func__ << ": Service [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance + << "] is not offered locally! Won't offer it remotely."; + ret = false; + } else { + // update service info in configuration + if (!configuration_->remote_offer_info_add(_service, _instance, _port, + _reliable, _magic_cookies_enabled)) { + ret = false; + } else { + // trigger event registration again to create shadow events + const client_t its_offering_client = find_local_client(_service, _instance); + if (its_offering_client == VSOMEIP_ROUTING_CLIENT) { + VSOMEIP_ERROR << __func__ << " didn't find offering client for service [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance + << "]"; + ret = false; + } else { + if (!stub_->send_provided_event_resend_request(its_offering_client, + pending_remote_offer_add(_service, _instance))) { + VSOMEIP_ERROR << __func__ << ": Couldn't send event resend" + << "request to client 0x" << std::hex << std::setw(4) + << std::setfill('0') << its_offering_client << " providing service [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance + << "]"; + + ret = false; + } + } + } + } + return ret; +} + +bool routing_manager_impl::stop_offer_service_remotely(service_t _service, + instance_t _instance, + std::uint16_t _port, + bool _reliable, + bool _magic_cookies_enabled) { + bool ret = true; + bool service_still_offered_remote(false); + // update service configuration + if (!configuration_->remote_offer_info_remove(_service, _instance, _port, + _reliable, _magic_cookies_enabled, &service_still_offered_remote)) { + VSOMEIP_ERROR << __func__ << " couldn't remove remote offer info for service [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance + << "] from configuration"; + ret = false; + } + std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance); + std::shared_ptr<endpoint> its_server_endpoint; + if (its_info) { + its_server_endpoint = its_info->get_endpoint(_reliable); + } + // don't deregister events if the service is still offered remotely + if (!service_still_offered_remote) { + const client_t its_offering_client = find_local_client(_service, _instance); + major_version_t its_major(0); + minor_version_t its_minor(0); + if (its_info) { + its_major = its_info->get_major(); + its_minor = its_info->get_minor(); + } + // unset payload and clear subcribers + routing_manager_base::stop_offer_service(its_offering_client, + _service, _instance, its_major, its_minor); + // unregister events + for (const event_t its_event_id : find_events(_service, _instance)) { + unregister_shadow_event(its_offering_client, _service, _instance, + its_event_id, true); + } + clear_targets_and_pending_sub_from_eventgroups(_service, _instance); + clear_remote_subscriber(_service, _instance); + + if (discovery_ && its_info) { + discovery_->stop_offer_service(_service, _instance, its_info); + its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable); + } + } else { + // service is still partly offered + if (discovery_ && its_info) { + std::shared_ptr<serviceinfo> its_copied_info = + std::make_shared<serviceinfo>(*its_info); + its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable); + // ensure to not send StopOffer for endpoint on which the service is + // still offered + its_copied_info->set_endpoint(std::shared_ptr<endpoint>(), !_reliable); + discovery_->stop_offer_service(_service, _instance, its_copied_info); + } + } + + cleanup_server_endpoint(_service, its_server_endpoint); + return ret; +} + void routing_manager_impl::on_message(const byte_t *_data, length_t _size, endpoint *_receiver, const boost::asio::ip::address &_destination, client_t _bound_client, @@ -1119,6 +1261,9 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, client_t requester = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]); + its_method = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_METHOD_POS_MIN], + _data[VSOMEIP_METHOD_POS_MAX]); if (!configuration_->is_offered_remote(its_service, its_instance)) { VSOMEIP_WARNING << std::hex << "Security: Received a remote request " << "for service/instance " << its_service << "/" << its_instance @@ -1131,11 +1276,13 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, << " which is already used locally ~> Skip message!"; return; } - if (!configuration_->is_client_allowed(requester, its_service, its_instance)) { - VSOMEIP_WARNING << std::hex << "Security: Received a remote request " - << "from client 0x" << requester << " for service/instance " + if (!configuration_->is_remote_client_allowed()) { + // check if policy allows remote requests. + VSOMEIP_WARNING << "routing_manager_impl::on_message: " + << std::hex << "Security: Remote client with client ID 0x" << requester + << " is not allowed to communicate with service/instance/method " << its_service << "/" << its_instance - << " which violates the security policy ~> Skip message!"; + << "/" << its_method; return; } } @@ -1167,7 +1314,8 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, #ifdef USE_DLT is_forwarded = #endif - on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(), its_is_crc_valid); + on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(), + _bound_client, its_is_crc_valid, true); } } @@ -1177,14 +1325,14 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - tc::trace_header its_header; + trace::header its_header; const boost::asio::ip::address_v4 its_remote_address = _remote_address.is_v4() ? _remote_address.to_v4() : boost::asio::ip::address_v4::from_string("6.6.6.6"); - tc::protocol_e its_protocol = - _receiver->is_local() ? tc::protocol_e::local : - _receiver->is_reliable() ? tc::protocol_e::tcp : - tc::protocol_e::udp; + trace::protocol_e its_protocol = + _receiver->is_local() ? trace::protocol_e::local : + _receiver->is_reliable() ? trace::protocol_e::tcp : + trace::protocol_e::udp; its_header.prepare(its_remote_address, _remote_port, its_protocol, false, its_instance); tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, @@ -1196,7 +1344,9 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, bool routing_manager_impl::on_message( service_t _service, instance_t _instance, const byte_t *_data, length_t _size, - bool _reliable, bool _is_valid_crc) { + bool _reliable, client_t _bound_client, + bool _is_valid_crc, + bool _is_from_remote) { #if 0 std::stringstream msg; msg << "rmi::on_message(" @@ -1219,11 +1369,13 @@ bool routing_manager_impl::on_message( if (its_client == VSOMEIP_ROUTING_CLIENT && utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) { - is_forwarded = deliver_notification(_service, _instance, _data, _size, _reliable, _is_valid_crc); + is_forwarded = deliver_notification(_service, _instance, _data, _size, + _reliable, _bound_client, _is_valid_crc, _is_from_remote); } else if (its_client == host_->get_client()) { - deliver_message(_data, _size, _instance, _reliable, _is_valid_crc); + deliver_message(_data, _size, _instance, + _reliable, _bound_client, _is_valid_crc, _is_from_remote); } else { - send(its_client, _data, _size, _instance, true, _reliable, _is_valid_crc); //send to proxy + send(its_client, _data, _size, _instance, true, _reliable, _bound_client, _is_valid_crc, _is_from_remote); //send to proxy } return is_forwarded; } @@ -1244,7 +1396,7 @@ void routing_manager_impl::on_notification(client_t _client, its_length); if (_notify_one) { - notify_one(_service, _instance, its_event->get_event(), its_payload, _client, true, true); + notify_one(_service, _instance, its_event->get_event(), its_payload, _client, true, true, false); } else { if (its_event->is_field()) { if (its_event->is_set()) { @@ -1311,6 +1463,13 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) { bool reliable_; std::shared_ptr<endpoint> endpoint_; }; + + // Set to state CONNECTED as connection is not yet fully established in remote side POV + // but endpoint is ready to send / receive. Set to ESTABLISHED after timer expires + // to prevent inserting subscriptions twice or send out subscription before remote side + // is finished with TCP 3 way handshake + _endpoint->set_connected(true); + std::forward_list<struct service_info> services_to_report_; { std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); @@ -1324,7 +1483,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) { if (found_endpoint->second.get() == _endpoint.get()) { std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first)); if (!its_info) { - _endpoint->set_connected(true); + _endpoint->set_established(true); return; } services_to_report_.push_front( @@ -1340,7 +1499,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) { if (found_endpoint->second.get() == _endpoint.get()) { std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first)); if (!its_info) { - _endpoint->set_connected(true); + _endpoint->set_established(true); return; } services_to_report_.push_front( @@ -1356,6 +1515,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) { } } } + for (const auto &s : services_to_report_) { on_availability(s.service_id_, s.instance_id_, true, s.major_, s.minor_); if (s.reliable_) { @@ -1379,7 +1539,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) { } } if (services_to_report_.empty()) { - _endpoint->set_connected(true); + _endpoint->set_established(true); } } @@ -1488,7 +1648,6 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se if (discovery_) { if (its_info) { if (its_info->get_major() == _major && its_info->get_minor() == _minor) { - its_info->set_ttl(0); discovery_->stop_offer_service(_service, _instance, its_info); } } @@ -1498,59 +1657,21 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se // Cleanup reliable & unreliable server endpoints hold before if (its_info) { - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - std::shared_ptr<endpoint> its_empty_endpoint; - bool reliable = true; - - // Loop over reliable/unreliable and cleanup if needed - for (uint8_t i = 0; i < 2; ++i) { - std::shared_ptr<endpoint> its_endpoint; - if (reliable) { - its_endpoint = its_reliable_endpoint; - } else { - its_endpoint = its_unreliable_endpoint; - } - if (!its_endpoint) { - reliable = !reliable; - continue; - } - - // Check whether any service still uses this endpoint - its_endpoint->decrement_use_count(); - bool isLastService = (its_endpoint->get_use_count() == 0); - - // Clear service_instances_ - if (1 >= service_instances_[_service].size()) { - service_instances_.erase(_service); - } else { - service_instances_[_service].erase(its_endpoint.get()); - } - - // Clear server endpoint if no service remains using it - if (isLastService) { - uint16_t port = its_endpoint->get_local_port(); - if (server_endpoints_.find(port) != server_endpoints_.end()) { - server_endpoints_[port].erase(reliable); - if (server_endpoints_[port].find(!reliable) == server_endpoints_[port].end()) { - server_endpoints_.erase(port); - } - } - - // Stop endpoint (close socket) to release its async_handlers! - its_endpoint->stop(); - } - + if (its_unreliable_endpoint) { + cleanup_server_endpoint(_service, its_unreliable_endpoint); // Clear service info and service group - clear_service_info(_service, _instance, reliable); - - // Invert reliable flag and loop again - reliable = !reliable; + clear_service_info(_service, _instance, false); + } + if (its_reliable_endpoint) { + cleanup_server_endpoint(_service, its_reliable_endpoint); + // Clear service info and service group + clear_service_info(_service, _instance, true); } } } bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, - instance_t _instance, bool _reliable, bool _is_valid_crc) { + instance_t _instance, bool _reliable, client_t _bound_client, bool _is_valid_crc, bool _is_from_remote) { bool is_delivered(false); auto a_deserializer = get_deserializer(); @@ -1563,6 +1684,109 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, its_message->set_instance(_instance); its_message->set_reliable(_reliable); its_message->set_is_valid_crc(_is_valid_crc); + + if (!_is_from_remote) { + if (utility::is_notification(its_message->get_message_type())) { + if (!is_response_allowed(_bound_client, its_message->get_service(), + its_message->get_instance(), its_message->get_method())) { + VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() + << " : routing_manager_impl::deliver_message: " + << std::hex << " received a notification from client 0x" << _bound_client + << " which does not offer service/instance/event " + << its_message->get_service() << "/" << its_message->get_instance() + << "/" << its_message->get_method() + << " ~> Skip message!"; + return false; + } else { + if (!configuration_->is_client_allowed(get_client(), its_message->get_service(), + its_message->get_instance(), its_message->get_method())) { + VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() + << " : routing_manager_impl::deliver_message: " + << " isn't allowed to receive a notification from service/instance/event " + << its_message->get_service() << "/" << its_message->get_instance() + << "/" << its_message->get_method() + << " respectively from client 0x" << _bound_client + << " ~> Skip message!"; + return false; + } + } + } else if (utility::is_request(its_message->get_message_type())) { + if (configuration_->is_security_enabled() + && its_message->get_client() != _bound_client) { + VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() + << " : routing_manager_impl::deliver_message:" + << " received a request from client 0x" << std::setw(4) << std::setfill('0') + << its_message->get_client() << " to service/instance/method " + << its_message->get_service() << "/" << its_message->get_instance() + << "/" << its_message->get_method() << " which doesn't match the bound client 0x" + << std::setw(4) << std::setfill('0') << _bound_client + << " ~> skip message!"; + return false; + } + + if (!configuration_->is_client_allowed(its_message->get_client(), + its_message->get_service(), its_message->get_instance(), its_message->get_method())) { + VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() + << " : routing_manager_impl::deliver_message: " + << " isn't allowed to send a request to service/instance/method " + << its_message->get_service() << "/" << its_message->get_instance() + << "/" << its_message->get_method() + << " ~> Skip message!"; + return false; + } + } else { // response + if (!is_response_allowed(_bound_client, its_message->get_service(), + its_message->get_instance(), its_message->get_method())) { + VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() + << " : routing_manager_impl::deliver_message: " + << " received a response from client 0x" << _bound_client + << " which does not offer service/instance/method " + << its_message->get_service() << "/" << its_message->get_instance() + << "/" << its_message->get_method() + << " ~> Skip message!"; + return false; + } else { + if (!configuration_->is_client_allowed(get_client(), its_message->get_service(), + its_message->get_instance(), its_message->get_method())) { + VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() + << " : routing_manager_impl::deliver_message: " + << " isn't allowed to receive a response from service/instance/method " + << its_message->get_service() << "/" << its_message->get_instance() + << "/" << its_message->get_method() + << " respectively from client 0x" << _bound_client + << " ~> Skip message!"; + return false; + } + } + } + } else { + if (!configuration_->is_remote_client_allowed()) { + // if the message is from remote, check if + // policy allows remote requests. + VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() + << " : routing_manager_impl::deliver_message: " + << std::hex << "Remote clients are not allowed" + << " to communicate with service/instance/method " + << its_message->get_service() << "/" << its_message->get_instance() + << "/" << its_message->get_method() + << " respectively with client 0x" << get_client() + << " ~> Skip message!"; + return false; + } else if (utility::is_notification(its_message->get_message_type())) { + if (!configuration_->is_client_allowed(get_client(), its_message->get_service(), + its_message->get_instance(), its_message->get_method())) { + VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client() + << " : routing_manager_impl::deliver_message: " + << " isn't allowed to receive a notification from service/instance/event " + << its_message->get_service() << "/" << its_message->get_instance() + << "/" << its_message->get_method() + << " respectively from remote client" + << " ~> Skip message!"; + return false; + } + } + } + host_->on_message(std::move(its_message)); is_delivered = true; } else { @@ -1575,7 +1799,8 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, bool routing_manager_impl::deliver_notification( service_t _service, instance_t _instance, const byte_t *_data, length_t _length, - bool _reliable, bool _is_valid_crc) { + bool _reliable, client_t _bound_client, + bool _is_valid_crc, bool _is_from_remote) { method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); @@ -1605,6 +1830,10 @@ bool routing_manager_impl::deliver_notification( } } const uint32_t its_length(utility::get_payload_size(_data, _length)); + if (its_length != _length - VSOMEIP_FULL_HEADER_SIZE) { + VSOMEIP_ERROR << "Message length mismatch, dropping message!"; + return false; + } std::shared_ptr<payload> its_payload = runtime::get()->create_payload(&_data[VSOMEIP_PAYLOAD_POS], its_length); @@ -1616,7 +1845,7 @@ bool routing_manager_impl::deliver_notification( for (const auto its_local_client : its_event->get_subscribers()) { if (its_local_client == host_->get_client()) { - deliver_message(_data, _length, _instance, _reliable, _is_valid_crc); + deliver_message(_data, _length, _instance, _reliable, _bound_client, _is_valid_crc, _is_from_remote); } else { std::shared_ptr<endpoint> its_local_target = find_local(its_local_client); if (its_local_target) { @@ -1809,9 +2038,12 @@ std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint( configuration_->get_max_message_size_reliable( _address.to_string(), _remote_port), configuration_->get_buffer_shrink_threshold(), + // send timeout after 2/3 of configured ttl, warning after 1/3 std::chrono::milliseconds(configuration_->get_sd_ttl() * 666), configuration_->get_endpoint_queue_limit( - _address.to_string(), _remote_port)); + _address.to_string(), _remote_port), + configuration_->get_max_tcp_restart_aborts(), + configuration_->get_max_tcp_connect_time()); if (configuration_->has_enabled_magic_cookies(_address.to_string(), _remote_port)) { @@ -1827,7 +2059,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint( _local_port), boost::asio::ip::udp::endpoint(_address, _remote_port), io_, configuration_->get_endpoint_queue_limit( - _address.to_string(), _remote_port)); + _address.to_string(), _remote_port), + configuration_->get_udp_receive_buffer_size()); } } catch (...) { host_->on_error(error_code_e::CLIENT_ENDPOINT_CREATION_FAILED); @@ -1850,6 +2083,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( configuration_->get_max_message_size_reliable( its_unicast.to_string(), _port), configuration_->get_buffer_shrink_threshold(), + // send timeout after 2/3 of configured ttl, warning after 1/3 std::chrono::milliseconds(configuration_->get_sd_ttl() * 666), configuration_->get_endpoint_queue_limit( its_unicast.to_string(), _port)); @@ -1872,7 +2106,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( #endif boost::asio::ip::udp::endpoint ep(its_unicast, _port); its_endpoint = std::make_shared<udp_server_endpoint_impl>( - shared_from_this(), ep, io_, its_limit); + shared_from_this(), ep, io_, its_limit, + configuration_->get_udp_receive_buffer_size()); } } else { @@ -1917,7 +2152,7 @@ std::shared_ptr<endpoint> routing_manager_impl::find_or_create_server_endpoint( return (its_endpoint); } -void routing_manager_impl::remove_local(client_t _client) { +void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) { auto clients_subscriptions = get_subscriptions(_client); { std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); @@ -1925,7 +2160,7 @@ void routing_manager_impl::remove_local(client_t _client) { remote_subscription_state_.erase(std::tuple_cat(s, std::make_tuple(_client))); } } - routing_manager_base::remove_local(_client, clients_subscriptions); + routing_manager_base::remove_local(_client, clients_subscriptions, _remove_uid); std::forward_list<std::pair<service_t, instance_t>> services_to_release_; { @@ -2300,7 +2535,7 @@ void routing_manager_impl::add_routing_info( its_info->get_minor()); if (discovery_) { std::shared_ptr<endpoint> ep = its_info->get_endpoint(true); - if (ep && ep->is_connected()) { + if (ep && ep->is_established()) { discovery_->on_endpoint_connected( _service, _instance, ep); @@ -2361,7 +2596,7 @@ void routing_manager_impl::add_routing_info( } if (discovery_) { std::shared_ptr<endpoint> ep = its_info->get_endpoint(false); - if (ep && ep->is_connected()) { + if (ep && ep->is_established()) { discovery_->on_endpoint_connected(_service, _instance, ep); } } @@ -2391,7 +2626,7 @@ void routing_manager_impl::add_routing_info( its_info->get_minor()); if (discovery_) { std::shared_ptr<endpoint> ep = its_info->get_endpoint(false); - if (ep && ep->is_connected()) { + if (ep && ep->is_established()) { discovery_->on_endpoint_connected( _service, _instance, ep); @@ -2417,33 +2652,12 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst on_availability(_service, _instance, false, its_info->get_major(), its_info->get_minor()); stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor()); // Implicit unsubscribe - { - std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - auto found_service = eventgroups_.find(_service); - if (found_service != eventgroups_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - for (auto &its_eventgroup : found_instance->second) { - its_eventgroup.second->clear_targets(); - its_eventgroup.second->clear_pending_subscriptions(); - } - } - } - } + clear_targets_and_pending_sub_from_eventgroups(_service, _instance); clear_identified_clients( _service, _instance); clear_identifying_clients( _service, _instance); - { - std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); - auto found_service = remote_subscribers_.find(_service); - if (found_service != remote_subscribers_.end()) { - if (found_service->second.erase(_instance) > 0 && - !found_service->second.size()) { - remote_subscribers_.erase(found_service); - } - } - } + clear_remote_subscriber(_service, _instance); if (_has_reliable) { clear_client_endpoints(_service, _instance, true); @@ -2509,11 +2723,8 @@ void routing_manager_impl::update_routing_info(std::chrono::milliseconds _elapse void routing_manager_impl::expire_services(const boost::asio::ip::address &_address) { std::map<service_t, std::vector<instance_t> > its_expired_offers; - for (auto &s : get_services()) { + for (auto &s : get_services_remote()) { for (auto &i : s.second) { - if (find_local_client(s.first, i.first) != VSOMEIP_ROUTING_CLIENT) { - continue; //don't expire local services - } bool is_gone(false); boost::asio::ip::address its_address; std::shared_ptr<client_endpoint> its_client_endpoint = @@ -2919,7 +3130,14 @@ void routing_manager_impl::on_subscribe_ack(client_t _client, if (specific_endpoint_client) { if (_client == get_client()) { host_->on_subscription_error(_service, _instance, _eventgroup, 0x0 /*OK*/); - host_->on_subscription_status(_service, _instance, _eventgroup, _event, 0x0 /*OK*/); + if (_event == ANY_EVENT) { + for (const auto &its_event : its_eventgroup->get_events()) + host_->on_subscription_status(_service, _instance, + _eventgroup, its_event->get_event(), 0x0 /*OK*/); + } else { + host_->on_subscription_status(_service, _instance, + _eventgroup, _event, 0x0 /*OK*/); + } } else { stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event); @@ -3068,7 +3286,7 @@ bool routing_manager_impl::deliver_specific_endpoint_message(service_t _service, _receiver->is_reliable(), VSOMEIP_SEND); } } else { - deliver_message(_data, _size, _instance, _receiver->is_reliable()); + deliver_message(_data, _size, _instance, _receiver->is_reliable(), VSOMEIP_ROUTING_CLIENT, true, true); } return true; } @@ -3484,7 +3702,7 @@ void routing_manager_impl::clear_remote_subscriber( } std::chrono::steady_clock::time_point -routing_manager_impl::expire_subscriptions() { +routing_manager_impl::expire_subscriptions(bool _force) { struct subscriptions_info { service_t service_id_; instance_t instance_id_; @@ -3507,10 +3725,14 @@ routing_manager_impl::expire_subscriptions() { for (auto &its_eventgroup : its_instance.second) { std::set<std::shared_ptr<endpoint_definition>> its_expired_endpoints; for (auto &its_target : its_eventgroup.second->get_targets()) { - if (its_target.expiration_ < now) { + if (_force) { its_expired_endpoints.insert(its_target.endpoint_); - } else if (its_target.expiration_ < next_expiration) { - next_expiration = its_target.expiration_; + } else { + if (its_target.expiration_ < now) { + its_expired_endpoints.insert(its_target.endpoint_); + } else if (its_target.expiration_ < next_expiration) { + next_expiration = its_target.expiration_; + } } } @@ -3576,7 +3798,8 @@ routing_manager_impl::expire_subscriptions() { << std::hex << std::setfill('0') << std::setw(4) << s.eventgroup_id_ << "] from " << s.invalid_endpoint_->get_address() << ":" << std::dec << s.invalid_endpoint_->get_port() - << "(" << std::hex << std::setfill('0') << std::setw(4) << s.client_ << ")"; + << "(" << std::hex << std::setfill('0') << std::setw(4) << s.client_ << ") " + << _force; } } return next_expiration; @@ -3593,8 +3816,18 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const if (discovery_) { is_diag_mode = discovery_->get_diagnosis_mode(); } + std::stringstream its_last_resume; + { + std::lock_guard<std::mutex> its_lock(last_resume_mutex_); + if (last_resume_ != std::chrono::steady_clock::time_point::min()) { + its_last_resume << " | " << std::dec + << std::chrono::duration_cast<std::chrono::seconds>( + std::chrono::steady_clock::now() - last_resume_).count() << "s"; + } + } VSOMEIP_INFO << "vSomeIP " << VSOMEIP_VERSION << " | (" - << ((is_diag_mode == true) ? "diagnosis)" : "default)"); + << ((is_diag_mode == true) ? "diagnosis)" : "default)") + << its_last_resume.str(); { std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_); version_log_timer_.expires_from_now( @@ -3838,6 +4071,8 @@ void routing_manager_impl::register_client_error_handler(client_t _client, } void routing_manager_impl::handle_client_error(client_t _client) { + VSOMEIP_INFO << "Client 0x" << std::hex << get_client() + << " handles a client error(" << std::hex << _client << ")"; if (stub_) stub_->update_registration(_client, registration_type_e::DEREGISTER_ON_ERROR); @@ -4023,14 +4258,26 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { // stop processing of incoming SD messages discovery_->stop(); + // remove all remote subscriptions to remotely offered services on this node + expire_subscriptions(true); + // send StopOffer messages for remotely offered services on this node for (const auto &its_service : get_offered_services()) { for (const auto &its_instance : its_service.second) { - its_instance.second->set_ttl(0); + if (its_instance.second->get_endpoint(true) || its_instance.second->get_endpoint(false)) { + const client_t its_client(find_local_client(its_service.first, its_instance.first)); + VSOMEIP_WARNING << "service " + << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << " still offered by " + << std::hex << std::setw(4) << std::setfill('0') << its_client; + } discovery_->stop_offer_service(its_service.first, its_instance.first, its_instance.second); } } - + { + std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); + remote_subscription_state_.clear(); + } // mark all external services as offline services_t its_remote_services; { @@ -4065,6 +4312,10 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { { VSOMEIP_INFO << "Set routing to resume mode, diagnosis mode was " << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + { + std::lock_guard<std::mutex> its_lock(last_resume_mutex_); + last_resume_ = std::chrono::steady_clock::now(); + } // Reset relevant in service info for (const auto &its_service : get_offered_services()) { @@ -4076,6 +4327,10 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { // Switch SD back to normal operation discovery_->set_diagnosis_mode(false); + if (routing_state_handler_) { + routing_state_handler_(_routing_state); + } + // start processing of SD messages (incoming remote offers should lead to new subscribe messages) discovery_->start(); @@ -4098,7 +4353,6 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { for (const auto &its_instance : its_service.second) { if (host_->get_configuration()->is_someip( its_service.first, its_instance.first)) { - its_instance.second->set_ttl(0); discovery_->stop_offer_service( its_service.first, its_instance.first, its_instance.second); } @@ -4192,6 +4446,9 @@ void routing_manager_impl::on_net_interface_or_route_state_changed( } void routing_manager_impl::start_ip_routing() { + if (routing_ready_handler_) { + routing_ready_handler_(); + } if (discovery_) { discovery_->start(); } else { @@ -4262,6 +4519,62 @@ routing_manager_impl::get_subscribed_eventgroups( return its_eventgroups; } +void routing_manager_impl::clear_targets_and_pending_sub_from_eventgroups( + service_t _service, instance_t _instance) { + std::vector<std::shared_ptr<event>> its_events; + { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + auto found_service = eventgroups_.find(_service); + if (found_service != eventgroups_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + for (const auto &its_eventgroup : found_instance->second) { + // As the service is gone, all subscriptions to its events + // do no longer exist and the last received payload is no + // longer valid. + for (auto &its_event : its_eventgroup.second->get_events()) { + const auto its_subscribers = its_event->get_subscribers(); + for (const auto its_subscriber : its_subscribers) { + if (its_subscriber != get_client()) { + its_event->remove_subscriber( + its_eventgroup.first, its_subscriber); + } + + client_t its_client = is_specific_endpoint_client(its_subscriber, _service, _instance); + { + std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); + const auto its_tuple = + std::make_tuple(found_service->first, found_instance->first, + its_eventgroup.first, its_client); + remote_subscription_state_.erase(its_tuple); + } + } + its_events.push_back(its_event); + } + its_eventgroup.second->clear_targets(); + its_eventgroup.second->clear_pending_subscriptions(); + } + } + } + } + for (const auto& e : its_events) { + e->unset_payload(true); + } +} + +void routing_manager_impl::clear_remote_subscriber(service_t _service, + instance_t _instance) { + std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); + auto found_service = remote_subscribers_.find(_service); + if (found_service != remote_subscribers_.end()) { + if (found_service->second.erase(_instance) > 0 && + !found_service->second.size()) { + remote_subscribers_.erase(found_service); + } + } +} + + void routing_manager_impl::call_sd_endpoint_connected( const boost::system::error_code& _error, service_t _service, instance_t _instance, @@ -4271,7 +4584,7 @@ void routing_manager_impl::call_sd_endpoint_connected( if (_error) { return; } - _endpoint->set_connected(true); + _endpoint->set_established(true); if (discovery_) { discovery_->on_endpoint_connected(_service, _instance, _endpoint); @@ -4423,6 +4736,39 @@ void routing_manager_impl::send_initial_events( } } +void routing_manager_impl::register_offer_acceptance_handler( + vsomeip::offer_acceptance_handler_t _handler) const { + if (discovery_) { + discovery_->register_offer_acceptance_handler(_handler); + } +} + +void routing_manager_impl::register_reboot_notification_handler( + vsomeip::reboot_notification_handler_t _handler) const { + if (discovery_) { + discovery_->register_reboot_notification_handler(_handler); + } +} + +void routing_manager_impl::register_routing_ready_handler( + routing_ready_handler_t _handler) { + routing_ready_handler_ = _handler; +} + +void routing_manager_impl::register_routing_state_handler( + routing_state_handler_t _handler) { + routing_state_handler_ = _handler; +} + +void routing_manager_impl::offer_acceptance_enabled( + boost::asio::ip::address _address) { + boost::system::error_code ec; + VSOMEIP_INFO << "ipsec-plugin-mgu: expire subscriptions and services: " + << _address.to_string(ec); + expire_subscriptions(_address); + expire_services(_address); +} + void routing_manager_impl::memory_log_timer_cbk( boost::system::error_code const & _error) { if (_error) { @@ -4707,4 +5053,374 @@ void routing_manager_impl::send_subscription( } } +void routing_manager_impl::cleanup_server_endpoint( + service_t _service, const std::shared_ptr<endpoint>& _endpoint) { + if (_endpoint) { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + bool reliable = _endpoint->is_reliable(); + // Check whether any service still uses this endpoint + _endpoint->decrement_use_count(); + bool isLastService = (_endpoint->get_use_count() == 0); + + // Clear service_instances_ + if (1 >= service_instances_[_service].size()) { + service_instances_.erase(_service); + } else { + service_instances_[_service].erase(_endpoint.get()); + } + + // Clear server endpoint if no service remains using it + if (isLastService) { + const uint16_t port = _endpoint->get_local_port(); + if (server_endpoints_.find(port) != server_endpoints_.end()) { + server_endpoints_[port].erase(reliable); + if (server_endpoints_[port].find(!reliable) == server_endpoints_[port].end()) { + server_endpoints_.erase(port); + } + } + + // Stop endpoint (close socket) to release its async_handlers! + _endpoint->stop(); + } + } +} + +pending_remote_offer_id_t routing_manager_impl::pending_remote_offer_add( + service_t _service, instance_t _instance) { + std::lock_guard<std::mutex> its_lock(pending_remote_offers_mutex_); + if (++pending_remote_offer_id_ == 0) { + pending_remote_offer_id_++; + } + pending_remote_offers_[pending_remote_offer_id_] = std::make_pair(_service, + _instance); + return pending_remote_offer_id_; +} + +std::pair<service_t, instance_t> routing_manager_impl::pending_remote_offer_remove( + pending_remote_offer_id_t _id) { + std::lock_guard<std::mutex> its_lock(pending_remote_offers_mutex_); + std::pair<service_t, instance_t> ret = std::make_pair(ANY_SERVICE, + ANY_INSTANCE); + auto found_si = pending_remote_offers_.find(_id); + if (found_si != pending_remote_offers_.end()) { + ret = found_si->second; + pending_remote_offers_.erase(found_si); + } + return ret; +} + +void routing_manager_impl::on_resend_provided_events_response( + pending_remote_offer_id_t _id) { + const std::pair<service_t, instance_t> its_service = + pending_remote_offer_remove(_id); + if (its_service.first != ANY_SERVICE) { + // create server endpoint + std::shared_ptr<serviceinfo> its_info = find_service(its_service.first, + its_service.second); + if (its_info) { + its_info->set_ttl(DEFAULT_TTL); + init_service_info(its_service.first, its_service.second, true); + } + } +} + +void routing_manager_impl::on_security_update_timeout( + const boost::system::error_code& _error, + pending_security_update_id_t _id, + std::shared_ptr<boost::asio::steady_timer> _timer) { + (void)_timer; + if (_error) { + // timer was cancelled + return; + } + security_update_state_e its_state = security_update_state_e::SU_UNKNOWN_USER_ID; + std::unordered_set<client_t> its_missing_clients = pending_security_update_get(_id); + { + // erase timer + std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); + security_update_timers_.erase(_id); + } + { + // print missing responses and check if some clients did not respond because they already disconnected + if (!its_missing_clients.empty()) { + for (auto its_client : its_missing_clients) { + VSOMEIP_INFO << __func__ << ": Client 0x" << std::hex << its_client + << " did not respond to the policy update / removal with ID: 0x" << std::hex << _id; + if (!find_local(its_client)) { + VSOMEIP_INFO << __func__ << ": Client 0x" << std::hex << its_client + << " is not connected anymore, do not expect answer for policy update / removal with ID: 0x" + << std::hex << _id; + pending_security_update_remove(_id, its_client); + } + } + } + + its_missing_clients = pending_security_update_get(_id); + if (its_missing_clients.empty()) { + VSOMEIP_INFO << __func__ << ": Received all responses for " + "security update/removal ID: 0x" << std::hex << _id; + its_state = security_update_state_e::SU_SUCCESS; + } + { + // erase pending security update + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + pending_security_updates_.erase(_id); + } + + // call handler with error on timeout or with SUCCESS if missing clients are not connected + std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); + const auto found_handler = security_update_handlers_.find(_id); + if (found_handler != security_update_handlers_.end()) { + found_handler->second(its_state); + security_update_handlers_.erase(found_handler); + } else { + VSOMEIP_WARNING << __func__ << ": Callback not found for security update / removal with ID: 0x" + << std::hex << _id; + } + } +} + +bool routing_manager_impl::update_security_policy_configuration( + uint32_t _uid, uint32_t _gid, + ::std::shared_ptr<policy> _policy, std::shared_ptr<payload> _payload, security_update_handler_t _handler) { + bool ret(true); + // cache security policy payload for later distribution to new registering clients + stub_->policy_cache_add(_uid, _payload); + + // update security policy from configuration + configuration_->update_security_policy(_uid, _gid, _policy); + + // determine currently connected clients + std::unordered_set<client_t> its_clients_to_inform = get_connected_clients(); + + // add handler + pending_security_update_id_t its_id; + if (!its_clients_to_inform.empty()) { + its_id = pending_security_update_add(its_clients_to_inform); + { + std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); + security_update_handlers_[its_id] = _handler; + } + + { + std::shared_ptr<boost::asio::steady_timer> its_timer = + std::make_shared<boost::asio::steady_timer>(io_); + boost::system::error_code ec; + its_timer->expires_from_now(std::chrono::milliseconds(3000), ec); + if (!ec) { + its_timer->async_wait( + std::bind( + &routing_manager_impl::on_security_update_timeout, + std::static_pointer_cast<routing_manager_impl>( + shared_from_this()), + std::placeholders::_1, its_id, its_timer)); + } else { + VSOMEIP_ERROR << __func__ << ": timer creation: " << ec.message(); + } + std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); + security_update_timers_[its_id] = its_timer; + } + + // trigger all currently connected clients to update the security policy + uint32_t sent_counter(0); + uint32_t its_tranche = + uint32_t(its_clients_to_inform.size() >= 10 ? (its_clients_to_inform.size() / 10) : 1); + VSOMEIP_INFO << __func__ << ": Informing [" << std::dec << its_clients_to_inform.size() + << "] currently connected clients about policy update for UID: " + << std::dec << _uid << " with update ID: 0x" << std::hex << its_id; + for (auto its_client : its_clients_to_inform) { + if (!stub_->send_update_security_policy_request(its_client, its_id, _uid, _payload)) { + VSOMEIP_INFO << __func__ << ": Couldn't send update security policy " + << "request to client 0x" << std::hex << std::setw(4) + << std::setfill('0') << its_client << " policy UID: " + << std::hex << std::setw(4) << std::setfill('0') << _uid << " GID: " + << std::hex << std::setw(4) << std::setfill('0') << _gid + << " with update ID: 0x" << std::hex << its_id + << " as client already disconnected"; + // remove client from expected answer list + pending_security_update_remove(its_id, its_client); + } + sent_counter++; + // Prevent burst + if (sent_counter % its_tranche == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + } else { + // if routing manager has no client call the handler directly + _handler(security_update_state_e::SU_SUCCESS); + } + return ret; +} + +bool routing_manager_impl::remove_security_policy_configuration( + uint32_t _uid, uint32_t _gid, security_update_handler_t _handler) { + bool ret(true); + + // remove security policy from configuration (only if there was a updateACL call before) + if (stub_->is_policy_cached(_uid)) { + if (!configuration_->remove_security_policy(_uid, _gid)) { + _handler(vsomeip::security_update_state_e::SU_UNKNOWN_USER_ID); + ret = false; + } else { + // remove policy from cache to prevent sending it to registering clients + stub_->policy_cache_remove(_uid); + + // add handler + pending_security_update_id_t its_id; + + // determine currently connected clients + std::unordered_set<client_t> its_clients_to_inform = get_connected_clients(); + + if (!its_clients_to_inform.empty()) { + its_id = pending_security_update_add(its_clients_to_inform); + { + std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); + security_update_handlers_[its_id] = _handler; + } + + { + std::shared_ptr<boost::asio::steady_timer> its_timer = + std::make_shared<boost::asio::steady_timer>(io_); + boost::system::error_code ec; + its_timer->expires_from_now(std::chrono::milliseconds(3000), ec); + if (!ec) { + its_timer->async_wait( + std::bind( + &routing_manager_impl::on_security_update_timeout, + std::static_pointer_cast<routing_manager_impl>( + shared_from_this()), + std::placeholders::_1, its_id, its_timer)); + } else { + VSOMEIP_ERROR << __func__ << ": timer creation: " << ec.message(); + } + std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); + security_update_timers_[its_id] = its_timer; + } + + // trigger all clients to remove the security policy + uint32_t sent_counter(0); + uint32_t its_tranche = + uint32_t(its_clients_to_inform.size() >= 10 ? (its_clients_to_inform.size() / 10) : 1); + VSOMEIP_INFO << __func__ << ": Informing [" << std::dec << its_clients_to_inform.size() + << "] currently connected clients about policy removal for UID: " + << std::dec << _uid << " with update ID: " << its_id; + for (auto its_client : its_clients_to_inform) { + if (!stub_->send_remove_security_policy_request(its_client, its_id, _uid, _gid)) { + VSOMEIP_INFO << __func__ << ": Couldn't send remove security policy " + << "request to client 0x" << std::hex << std::setw(4) + << std::setfill('0') << its_client << " policy UID: " + << std::hex << std::setw(4) << std::setfill('0') << _uid << " GID: " + << std::hex << std::setw(4) << std::setfill('0') << _gid + << " with update ID: 0x" << std::hex << its_id + << " as client already disconnected"; + // remove client from expected answer list + pending_security_update_remove(its_id, its_client); + } + sent_counter++; + // Prevent burst + if (sent_counter % its_tranche == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + } else { + // if routing manager has no client call the handler directly + _handler(security_update_state_e::SU_SUCCESS); + } + } + } + else { + _handler(vsomeip::security_update_state_e::SU_UNKNOWN_USER_ID); + ret = false; + } + return ret; +} + +pending_security_update_id_t routing_manager_impl::pending_security_update_add( + std::unordered_set<client_t> _clients) { + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + if (++pending_security_update_id_ == 0) { + pending_security_update_id_++; + } + pending_security_updates_[pending_security_update_id_] = _clients; + return pending_security_update_id_; +} + +std::unordered_set<client_t> routing_manager_impl::pending_security_update_get( + pending_security_update_id_t _id) { + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + std::unordered_set<client_t> its_missing_clients; + auto found_si = pending_security_updates_.find(_id); + if (found_si != pending_security_updates_.end()) { + its_missing_clients = pending_security_updates_[_id]; + } + return its_missing_clients; +} + +bool routing_manager_impl::pending_security_update_remove( + pending_security_update_id_t _id, client_t _client) { + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + auto found_si = pending_security_updates_.find(_id); + if (found_si != pending_security_updates_.end()) { + if (found_si->second.erase(_client)) { + return true; + } + } + return false; +} + +bool routing_manager_impl::is_pending_security_update_finished( + pending_security_update_id_t _id) { + std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_); + bool ret(false); + auto found_si = pending_security_updates_.find(_id); + if (found_si != pending_security_updates_.end()) { + if (!found_si->second.size()) { + ret = true; + } + } + if (ret) { + pending_security_updates_.erase(_id); + } + return ret; +} + +void routing_manager_impl::on_security_update_response( + pending_security_update_id_t _id, client_t _client) { + if (pending_security_update_remove(_id, _client)) { + if (is_pending_security_update_finished(_id)) { + // cancel timeout timer + { + std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_); + auto found_timer = security_update_timers_.find(_id); + if (found_timer != security_update_timers_.end()) { + boost::system::error_code ec; + found_timer->second->cancel(ec); + security_update_timers_.erase(found_timer); + } else { + VSOMEIP_WARNING << __func__ << ": Received all responses " + "for security update/removal ID: 0x" + << std::hex << _id << " but timeout already happened"; + } + } + + // call handler + { + std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_); + auto found_handler = security_update_handlers_.find(_id); + if (found_handler != security_update_handlers_.end()) { + found_handler->second(security_update_state_e::SU_SUCCESS); + security_update_handlers_.erase(found_handler); + VSOMEIP_INFO << __func__ << ": Received all responses for " + "security update/removal ID: 0x" << std::hex << _id; + } else { + VSOMEIP_WARNING << __func__ << ": Received all responses " + "for security update/removal ID: 0x" + << std::hex << _id << " but didn't find handler"; + } + } + } + } +} + } // namespace vsomeip |