diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_impl.cpp | 465 |
1 files changed, 289 insertions, 176 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 7a27e4e..c1daec7 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -47,7 +47,8 @@ namespace vsomeip { routing_manager_impl::routing_manager_impl(routing_manager_host *_host) : routing_manager_base(_host), - version_log_timer_(_host->get_io()) + version_log_timer_(_host->get_io()), + if_state_running_(false) #ifndef WITHOUT_SYSTEMD , watchdog_timer_(_host->get_io()) #endif @@ -87,16 +88,22 @@ void routing_manager_impl::init() { discovery_ = (*its_runtime)->create_service_discovery(this); discovery_->init(); } - } else { - init_routing_info(); // Static routing } } void routing_manager_impl::start() { - stub_->start(); - if (discovery_) - discovery_->start(); +#ifndef WIN32 + netlink_connector_ = std::make_shared<netlink_connector>(host_->get_io(), + configuration_->get_unicast_address()); + netlink_connector_->register_net_if_changes_handler( + std::bind(&routing_manager_impl::on_net_if_state_changed, + this, std::placeholders::_1, std::placeholders::_2)); + netlink_connector_->start(); +#else + start_ip_routing(); +#endif + stub_->start(); host_->on_state(state_type_e::ST_REGISTERED); if (configuration_->log_version()) { @@ -115,6 +122,11 @@ void routing_manager_impl::start() { void routing_manager_impl::stop() { version_log_timer_.cancel(); +#ifndef WIN32 + if (netlink_connector_) { + netlink_connector_->stop(); + } +#endif #ifndef WITHOUT_SYSTEMD watchdog_timer_.cancel(); @@ -136,7 +148,7 @@ void routing_manager_impl::stop() { bool routing_manager_impl::offer_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor) { - VSOMEIP_DEBUG << "OFFER(" + VSOMEIP_INFO << "OFFER(" << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance @@ -146,7 +158,14 @@ bool routing_manager_impl::offer_service(client_t _client, service_t _service, return false; } - init_service_info(_service, _instance, true); + { + std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_); + if (if_state_running_) { + init_service_info(_service, _instance, true); + } else { + pending_sd_offers_.push_back(std::make_pair(_service, _instance)); + } + } { std::lock_guard<std::mutex> its_lock(events_mutex_); @@ -181,7 +200,7 @@ bool routing_manager_impl::offer_service(client_t _client, service_t _service, send_pending_subscriptions(_service, _instance, _major); } stub_->on_offer_service(_client, _service, _instance, _major, _minor); - host_->on_availability(_service, _instance, true, _major, _minor); + on_availability(_service, _instance, true, _major, _minor); return true; } @@ -189,23 +208,35 @@ void routing_manager_impl::stop_offer_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor) { - VSOMEIP_DEBUG << "STOP OFFER(" + VSOMEIP_INFO << "STOP OFFER(" << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << ":" << std::dec << int(_major) << "." << _minor << "]"; + { + std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_); + for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) { + if (it->first == _service && it->second == _instance) { + it = pending_sd_offers_.erase(it); + break; + } else { + ++it; + } + } + } + routing_manager_base::stop_offer_service(_client, _service, _instance, _major, _minor); on_stop_offer_service(_client, _service, _instance, _major, _minor); stub_->on_stop_offer_service(_client, _service, _instance, _major, _minor); - host_->on_availability(_service, _instance, false, _major, _minor); + on_availability(_service, _instance, false, _major, _minor); } void routing_manager_impl::request_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, bool _use_exclusive_proxy) { - VSOMEIP_DEBUG << "REQUEST(" + VSOMEIP_INFO << "REQUEST(" << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << ":" @@ -226,7 +257,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, discovery_->request_service(_service, _instance, _major, _minor, DEFAULT_TTL); } else { - VSOMEIP_DEBUG << std::hex + VSOMEIP_INFO << std::hex << "Avoid trigger SD find-service message" << " for local service/instance/major/minor: " << _service << "/" << _instance << std::dec @@ -269,7 +300,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, void routing_manager_impl::release_service(client_t _client, service_t _service, instance_t _instance) { - VSOMEIP_DEBUG << "RELEASE(" + VSOMEIP_INFO << "RELEASE(" << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; @@ -292,9 +323,11 @@ void routing_manager_impl::release_service(client_t _client, service_t _service, std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance)); if(its_info && !its_info->is_local()) { + unsubscribe_specific_client_at_sd(_service, _instance, _client); if(!its_info->get_requesters_size()) { if(discovery_) { discovery_->release_service(_service, _instance); + discovery_->unsubscribe_client(_service, _instance, VSOMEIP_ROUTING_CLIENT); } clear_client_endpoints(_service, _instance, true); clear_client_endpoints(_service, _instance, false); @@ -319,7 +352,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, subscription_type_e _subscription_type) { - VSOMEIP_DEBUG << "SUBSCRIBE(" + VSOMEIP_INFO << "SUBSCRIBE(" << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." @@ -339,6 +372,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup); } routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _subscription_type); + send_pending_notify_ones(_service, _instance, _eventgroup, _client); } else { if (discovery_) { client_t subscriber = VSOMEIP_ROUTING_CLIENT; @@ -365,7 +399,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, } } if(identify) { - identify_for_subscribe(_client, _service, _instance, _major); + identify_for_subscribe(_client, _service, _instance, _major, _subscription_type); } } bool inserted = insert_subscription(_service, _instance, _eventgroup, _client); @@ -404,7 +438,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, void routing_manager_impl::unsubscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup) { - VSOMEIP_DEBUG << "UNSUBSCRIBE(" + VSOMEIP_INFO << "UNSUBSCRIBE(" << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." @@ -447,8 +481,14 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, } } } - if( last_subscriber_removed ) + if (subscriber == VSOMEIP_ROUTING_CLIENT && last_subscriber_removed) { + // for normal subscribers only unsubscribe via SD if last + // subscriber was removed + discovery_->unsubscribe(_service, _instance, _eventgroup, subscriber); + } else if (subscriber != VSOMEIP_ROUTING_CLIENT) { + // for selective subscribers always unsubscribe at the SD discovery_->unsubscribe(_service, _instance, _eventgroup, subscriber); + } } else { stub_->send_unsubscribe(find_local(_service, _instance), _client, _service, _instance, _eventgroup, false); @@ -493,7 +533,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, } else if (is_notification && _client) { // Selective notifications! if (_client == get_client()) { #ifdef USE_DLT - uint16_t its_data_size + const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); tc::trace_header its_header; @@ -509,7 +549,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, if (its_target) { #ifdef USE_DLT - uint16_t its_data_size + const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); tc::trace_header its_header; @@ -545,7 +585,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, its_target = find_or_create_remote_client(its_service, _instance, _reliable, client); if (its_target) { #ifdef USE_DLT - uint16_t its_data_size + const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); tc::trace_header its_header; @@ -600,7 +640,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, } #ifdef USE_DLT if (has_sent) { - uint16_t its_data_size + const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); tc::trace_header its_header; @@ -618,7 +658,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, (sd_info_ ? sd_info_->get_endpoint(false) : nullptr) : its_info->get_endpoint(_reliable); if (its_target) { #ifdef USE_DLT - uint16_t its_data_size + const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); tc::trace_header its_header; @@ -671,7 +711,7 @@ bool routing_manager_impl::send_to( if (its_endpoint) { #ifdef USE_DLT - uint16_t its_data_size + const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); tc::trace_header its_header; @@ -691,7 +731,7 @@ bool routing_manager_impl::send_to(const std::shared_ptr<endpoint_definition> &_ if (its_endpoint) { #ifdef USE_DLT - uint16_t its_data_size + const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); tc::trace_header its_header; @@ -782,15 +822,23 @@ void routing_manager_impl::notify_one(service_t _service, instance_t _instance, } } -void routing_manager_impl::on_error(const byte_t *_data, length_t _length, endpoint *_receiver) { +void routing_manager_impl::on_availability(service_t _service, instance_t _instance, + bool _is_available, major_version_t _major, minor_version_t _minor) { + host_->on_availability(_service, _instance, _is_available, _major, _minor); +} + +void routing_manager_impl::on_error( + const byte_t *_data, length_t _length, endpoint *_receiver, + const boost::asio::ip::address &_remote_address, + std::uint16_t _remote_port) { instance_t its_instance = 0; if (_length >= VSOMEIP_SERVICE_POS_MAX) { service_t its_service = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); its_instance = find_instance(its_service, _receiver); } - send_error(return_code_e::E_MALFORMED_MESSAGE, _data, _length, - its_instance, _receiver->is_reliable(), _receiver); + send_error(return_code_e::E_MALFORMED_MESSAGE, _data, _length, its_instance, + _receiver->is_reliable(), _receiver, _remote_address, _remote_port); } void routing_manager_impl::release_port(uint16_t _port, bool _reliable) { @@ -800,13 +848,15 @@ void routing_manager_impl::release_port(uint16_t _port, bool _reliable) { void routing_manager_impl::on_message(const byte_t *_data, length_t _size, endpoint *_receiver, const boost::asio::ip::address &_destination, - client_t _bound_client) { + client_t _bound_client, + const boost::asio::ip::address &_remote_address, + std::uint16_t _remote_port) { #if 0 std::stringstream msg; msg << "rmi::on_message: "; for (uint32_t i = 0; i < _size; ++i) msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; - VSOMEIP_DEBUG << msg.str(); + VSOMEIP_INFO << msg.str(); #endif (void)_bound_client; service_t its_service; @@ -818,16 +868,15 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); if (discovery_ && its_method == sd::method) { - if (configuration_->get_sd_port() == _receiver->get_remote_port()) { - boost::asio::ip::address its_address; - if (_receiver->get_remote_address(its_address)) { - discovery_->on_message(_data, _size, its_address, _destination); + if (configuration_->get_sd_port() == _remote_port) { + if (!_remote_address.is_unspecified()) { + discovery_->on_message(_data, _size, _remote_address, _destination); } else { VSOMEIP_ERROR << "Ignored SD message from unknown address."; } } else { VSOMEIP_ERROR << "Ignored SD message from unknown port (" - << _receiver->get_remote_port() << ")"; + << _remote_port << ")"; } } } else { @@ -843,7 +892,8 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, if(!(_size >= VSOMEIP_MESSAGE_TYPE_POS && utility::is_request_no_return(_data[VSOMEIP_MESSAGE_TYPE_POS]))) { if (return_code != return_code_e::E_OK && return_code != return_code_e::E_NOT_OK) { send_error(return_code, _data, _size, its_instance, - _receiver->is_reliable(), _receiver); + _receiver->is_reliable(), _receiver, + _remote_address, _remote_port); return; } } else if(return_code != return_code_e::E_OK && return_code != return_code_e::E_NOT_OK) { @@ -893,13 +943,20 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, } } #ifdef USE_DLT - uint16_t its_data_size + const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); tc::trace_header its_header; - if (its_header.prepare(_receiver, false)) - tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, - _data, its_data_size); + const boost::asio::ip::address_v4 its_remote_address = + _remote_address.is_v4() ? _remote_address.to_v4() : + boost::asio::ip::address_v4::from_string("6.6.6.6"); + tc::protocol_e its_protocol = + _receiver->is_local() ? tc::protocol_e::local : + _receiver->is_reliable() ? tc::protocol_e::tcp : + tc::protocol_e::udp; + its_header.prepare(its_remote_address, _remote_port, its_protocol, false); + tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, + its_data_size); #endif } @@ -914,7 +971,7 @@ void routing_manager_impl::on_message( << _service << ", " << _instance << "): "; for (uint32_t i = 0; i < _size; ++i) msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; - VSOMEIP_DEBUG << msg.str(); + VSOMEIP_INFO << msg.str(); #endif client_t its_client; @@ -1031,8 +1088,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) { } } for (const auto &s : services_to_report_) { - host_->on_availability(s.service_id_, s.instance_id_, true, s.major_, - s.minor_); + on_availability(s.service_id_, s.instance_id_, true, s.major_, s.minor_); if (s.reliable_) { stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, s.service_id_, s.instance_id_, s.major_, s.minor_); @@ -1060,7 +1116,7 @@ void routing_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) { if(!its_info){ return; } - host_->on_availability(its_service.first, its_instance.first, + on_availability(its_service.first, its_instance.first, false, its_info->get_major(), its_info->get_minor()); } } @@ -1072,7 +1128,7 @@ void routing_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) { if(!its_info){ return; } - host_->on_availability(its_service.first, its_instance.first, + on_availability(its_service.first, its_instance.first, false, its_info->get_major(), its_info->get_minor()); } } @@ -1119,17 +1175,22 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se } } } + std::map<event_t, std::shared_ptr<event> > events; { - std::lock_guard<std::mutex> its_lock(events_mutex_); + std::unique_lock<std::mutex> its_lock(events_mutex_); auto its_events_service = events_.find(_service); if (its_events_service != events_.end()) { auto its_events_instance = its_events_service->second.find(_instance); if (its_events_instance != its_events_service->second.end()) { - for (auto &e : its_events_instance->second) - e.second->unset_payload(); + for (auto &e : its_events_instance->second) { + events[e.first] = e.second; + } } } } + for (auto &e : events) { + e.second->unset_payload(); + } { std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_); auto its_service = eventgroup_clients_.find(_service); @@ -1236,7 +1297,7 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, if (its_message) { its_message->set_instance(_instance); its_message->set_reliable(_reliable); - host_->on_message(its_message); + host_->on_message(std::move(its_message)); is_delivered = true; } else { VSOMEIP_ERROR << "Routing manager: deliver_message: " @@ -1305,18 +1366,25 @@ std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoin std::shared_ptr<endpoint> its_service_endpoint = find_server_endpoint(_port, _reliable); if (!its_service_endpoint) { - its_service_endpoint = create_server_endpoint(_port, _reliable, true); - - if (its_service_endpoint) { - sd_info_ = std::make_shared<serviceinfo>(ANY_MAJOR, ANY_MINOR, DEFAULT_TTL, - false); // false, because we do _not_ want to announce it... - sd_info_->set_endpoint(its_service_endpoint, _reliable); - its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE, - _address, _port); - its_service_endpoint->join(_address); - } else { - VSOMEIP_ERROR << "Service Discovery endpoint could not be created. " - "Please check your network configuration."; + try { + its_service_endpoint = create_server_endpoint(_port, _reliable, + true); + + if (its_service_endpoint) { + sd_info_ = std::make_shared<serviceinfo>(ANY_MAJOR, ANY_MINOR, + DEFAULT_TTL, false); // false, because we do _not_ want to announce it... + sd_info_->set_endpoint(its_service_endpoint, _reliable); + its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE, + _address, _port); + its_service_endpoint->join(_address); + } else { + VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. " + "Please check your network configuration."; + } + } catch (const std::exception &e) { + host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED); + VSOMEIP_ERROR << "Service Discovery endpoint could not be created: " + << e.what(); } } return its_service_endpoint; @@ -1398,7 +1466,7 @@ void routing_manager_impl::init_service_info( if (ILLEGAL_PORT == its_reliable_port && ILLEGAL_PORT == its_unreliable_port) { - VSOMEIP_DEBUG << "Port configuration missing for [" + VSOMEIP_INFO << "Port configuration missing for [" << std::hex << _service << "." << _instance << "]. Service is internal."; } @@ -1427,7 +1495,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint( boost::asio::ip::tcp::endpoint(_address, _remote_port), io_, configuration_->get_message_size_reliable( - _address.to_string(), _remote_port)); + _address.to_string(), _remote_port), + configuration_->get_buffer_shrink_threshold()); if (configuration_->has_enabled_magic_cookies(_address.to_string(), _remote_port)) { @@ -1465,7 +1534,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( shared_from_this(), boost::asio::ip::tcp::endpoint(its_unicast, _port), io_, configuration_->get_message_size_reliable( - its_unicast.to_string(), _port)); + its_unicast.to_string(), _port), + configuration_->get_buffer_shrink_threshold()); if (configuration_->has_enabled_magic_cookies( its_unicast.to_string(), _port) || configuration_->has_enabled_magic_cookies( @@ -1495,7 +1565,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( server_endpoints_[_port][_reliable] = its_endpoint; its_endpoint->start(); } - } catch (std::exception &e) { + } catch (const std::exception &e) { host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED); VSOMEIP_ERROR << e.what(); } @@ -1887,8 +1957,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( && (major_minor_pair.second <= _minor || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { - host_->on_availability(_service, _instance, - true, its_info->get_major(), its_info->get_minor()); + on_availability(_service, _instance, + true, its_info->get_major(), its_info->get_minor()); if (!stub_->contained_in_routing_info( VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), @@ -1916,7 +1986,7 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( = endpoint_definition::get(_unreliable_address, _unreliable_port, false); remote_service_info_[_service][_instance][false] = endpoint_def; if (!is_reliable_known) { - host_->on_availability(_service, _instance, true, _major, _minor); + on_availability(_service, _instance, true, _major, _minor); stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, _major, _minor); } } @@ -1931,7 +2001,7 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst if(!its_info) return; - host_->on_availability(_service, _instance, false, its_info->get_major(), its_info->get_minor()); + on_availability(_service, _instance, false, its_info->get_major(), its_info->get_minor()); stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor()); // Implicit unsubscribe { @@ -2022,7 +2092,7 @@ std::chrono::milliseconds routing_manager_impl::update_routing_info(std::chrono: for (auto &s : its_expired_offers) { for (auto &i : s.second) { - VSOMEIP_DEBUG << "update_routing_info: elapsed=" << _elapsed.count() + VSOMEIP_INFO << "update_routing_info: elapsed=" << _elapsed.count() << " : delete service/instance " << std::hex << s.first << "/" << i.first; del_routing_info(s.first, i.first, i.second.first, i.second.second); } @@ -2043,15 +2113,19 @@ void routing_manager_impl::expire_services(const boost::asio::ip::address &_addr } bool is_gone(false); boost::asio::ip::address its_address; - std::shared_ptr<endpoint> its_endpoint = i.second->get_endpoint(true); - if (its_endpoint) { - if (its_endpoint->get_remote_address(its_address)) { + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>( + i.second->get_endpoint(true)); + if (its_client_endpoint) { + if (its_client_endpoint->get_remote_address(its_address)) { is_gone = (its_address == _address); } } else { - its_endpoint = i.second->get_endpoint(false); - if (its_endpoint) { - if (its_endpoint->get_remote_address(its_address)) { + its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>( + i.second->get_endpoint(false)); + if (its_client_endpoint) { + if (its_client_endpoint->get_remote_address(its_address)) { is_gone = (its_address == _address); } } @@ -2070,7 +2144,7 @@ void routing_manager_impl::expire_services(const boost::asio::ip::address &_addr for (auto &s : its_expired_offers) { for (auto &i : s.second) { - VSOMEIP_DEBUG << "expire_services for address: " << _address.to_string() + VSOMEIP_INFO << "expire_services for address: " << _address.to_string() << " : delete service/instance " << std::hex << s.first << "/" << i.first; del_routing_info(s.first, i.first, i.second.first, i.second.second); } @@ -2175,13 +2249,13 @@ bool routing_manager_impl::on_subscribe_accepted(service_t _service, instance_t } if (client != VSOMEIP_ROUTING_CLIENT) { - VSOMEIP_DEBUG << "Subscription accepted: eventgroup=" << _eventgroup + VSOMEIP_INFO << "Subscription accepted: eventgroup=" << _eventgroup << " : target: " << _target->get_address().to_string() << ":" << std::dec <<_target->get_port() << (_target->is_reliable() ? " reliable" : " unreliable") << " from client: 0x" << std::hex << client << "."; } else { - VSOMEIP_DEBUG << "Subscription accepted: eventgroup: " << _eventgroup + VSOMEIP_INFO << "Subscription accepted: eventgroup: " << _eventgroup << " : target: " << _target->get_address().to_string() << ":" << std::dec <<_target->get_port() << (_target->is_reliable() ? " reliable" : " unreliable") @@ -2267,12 +2341,12 @@ void routing_manager_impl::on_unsubscribe(service_t _service, client_t its_client = find_client(_service, _instance, its_eventgroup, _target); if (its_client != VSOMEIP_ROUTING_CLIENT) { - VSOMEIP_DEBUG << "on_unsubscribe: target: " << _target->get_address().to_string() + VSOMEIP_INFO << "on_unsubscribe: target: " << _target->get_address().to_string() << ":" << std::dec <<_target->get_port() << (_target->is_reliable() ? " reliable" : " unreliable") << " from client: 0x" << std::hex << its_client; } else { - VSOMEIP_DEBUG << "on_unsubscribe: target: " << _target->get_address().to_string() + VSOMEIP_INFO << "on_unsubscribe: target: " << _target->get_address().to_string() << ":" << std::dec <<_target->get_port() << (_target->is_reliable() ? " reliable" : " unreliable"); } @@ -2609,7 +2683,9 @@ return_code_e routing_manager_impl::check_error(const byte_t *_data, length_t _s void routing_manager_impl::send_error(return_code_e _return_code, const byte_t *_data, length_t _size, instance_t _instance, bool _reliable, - endpoint *_receiver) { + endpoint *_receiver, + const boost::asio::ip::address &_remote_address, + std::uint16_t _remote_port) { client_t its_client = 0; service_t its_service = 0; @@ -2646,53 +2722,21 @@ void routing_manager_impl::send_error(return_code_e _return_code, error_message->set_return_code(_return_code); error_message->set_service(its_service); error_message->set_session(its_session); - - std::lock_guard<std::mutex> its_lock(serialize_mutex_); - if (serializer_->serialize(error_message.get())) { - if (_receiver) { - boost::asio::ip::address adr; - uint16_t port; - if (_receiver->is_reliable()) { - auto endpoint = dynamic_cast<tcp_server_endpoint_impl*>(_receiver); - if(!endpoint) { - return; - } - if (!endpoint->get_remote_address(adr)) { - VSOMEIP_ERROR << "routing_manager_impl::send_error: " - "couldn't determine remote address (reliable)"; - return; - } - port = endpoint->get_remote_port(); - if (!port) { - VSOMEIP_ERROR << "routing_manager_impl::send_error: " - "couldn't determine remote port (reliable)"; - return; - } - } else { - auto endpoint = dynamic_cast<udp_server_endpoint_impl*>(_receiver); - if (!endpoint) { - return; - } - if (!endpoint->get_remote_address(adr)) { - VSOMEIP_ERROR << "routing_manager_impl::send_error: " - "couldn't determine remote address (unreliable)"; - return; - } - port = endpoint->get_remote_port(); - if (!port) { - VSOMEIP_ERROR << "routing_manager_impl::send_error: " - "couldn't determine remote port (unreliable)"; - return; - } - } - auto its_endpoint_def = - std::make_shared<endpoint_definition>(adr, port, _receiver->is_reliable()); - its_endpoint_def->set_remote_port(_receiver->get_local_port()); - send_to(its_endpoint_def, serializer_->get_data(), serializer_->get_size(), true); + { + std::lock_guard<std::mutex> its_lock(serialize_mutex_); + if (serializer_->serialize(error_message.get())) { + if (_receiver) { + auto its_endpoint_def = std::make_shared<endpoint_definition>( + _remote_address, _remote_port, + _receiver->is_reliable()); + its_endpoint_def->set_remote_port(_receiver->get_local_port()); + send_to(its_endpoint_def, serializer_->get_data(), + serializer_->get_size(), true); + } + serializer_->reset(); + } else { + VSOMEIP_ERROR<< "Failed to serialize error message."; } - serializer_->reset(); - } else { - VSOMEIP_ERROR << "Failed to serialize error message."; } } @@ -2716,55 +2760,72 @@ void routing_manager_impl::on_identify_response(client_t _client, service_t _ser } void routing_manager_impl::identify_for_subscribe(client_t _client, - service_t _service, instance_t _instance, major_version_t _major) { - if (!has_identified(_client, _service, _instance, false) && - !is_identifying(_client, _service, _instance, false)) { - auto unreliable_endpoint = find_or_create_remote_client(_service, _instance, false, _client); - if (unreliable_endpoint) { - { - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - identifying_clients_[_service][_instance][false].insert(_client); - } - auto message = runtime::get()->create_message(false); - message->set_service(_service); - message->set_instance(_instance); - message->set_client(_client); - message->set_method(ANY_METHOD - 1); - message->set_interface_version(_major); - message->set_message_type(message_type_e::MT_REQUEST); - std::lock_guard<std::mutex> its_lock(serialize_mutex_); - if (serializer_->serialize(message.get())) { - unreliable_endpoint->send(serializer_->get_data(), - serializer_->get_size()); - serializer_->reset(); - } - } - } - if (!has_identified(_client, _service, _instance, true) && - !is_identifying(_client, _service, _instance, true)) { - auto reliable_endpoint = find_or_create_remote_client(_service, _instance, true, _client); - if (reliable_endpoint) { - { - std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); - identifying_clients_[_service][_instance][true].insert(_client); + service_t _service, instance_t _instance, major_version_t _major, + subscription_type_e _subscription_type) { + + if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE + || _subscription_type == subscription_type_e::SU_PREFER_UNRELIABLE + || _subscription_type == subscription_type_e::SU_UNRELIABLE) { + if (!has_identified(_client, _service, _instance, false) + && !is_identifying(_client, _service, _instance, false)) { + if (!send_identify_message(_client, _service, _instance, _major, + false) && _subscription_type + == subscription_type_e::SU_PREFER_UNRELIABLE) { + send_identify_message(_client, _service, _instance, _major, + true); } - auto message = runtime::get()->create_message(true); - message->set_service(_service); - message->set_instance(_instance); - message->set_client(_client); - message->set_method(ANY_METHOD - 1); - message->set_interface_version(_major); - message->set_message_type(message_type_e::MT_REQUEST); - std::lock_guard<std::mutex> its_lock(serialize_mutex_); - if (serializer_->serialize(message.get())) { - reliable_endpoint->send(serializer_->get_data(), - serializer_->get_size()); - serializer_->reset(); + } + } + + if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE + || _subscription_type == subscription_type_e::SU_PREFER_RELIABLE + || _subscription_type == subscription_type_e::SU_RELIABLE) { + if (!has_identified(_client, _service, _instance, true) + && !is_identifying(_client, _service, _instance, true)) { + if (!send_identify_message(_client, _service, _instance, _major, + true) && _subscription_type + == subscription_type_e::SU_PREFER_RELIABLE) { + send_identify_message(_client, _service, _instance, _major, + false); } } } } +bool routing_manager_impl::send_identify_message(client_t _client, + service_t _service, + instance_t _instance, + major_version_t _major, + bool _reliable) { + auto its_endpoint = find_or_create_remote_client(_service, _instance, + _reliable, _client); + if (!its_endpoint) { + return false; + } + { + std::lock_guard<std::mutex> its_lock(identified_clients_mutex_); + identifying_clients_[_service][_instance][_reliable].insert(_client); + } + auto message = runtime::get()->create_message(_reliable); + message->set_service(_service); + message->set_instance(_instance); + message->set_client(_client); + message->set_method(ANY_METHOD - 1); + message->set_interface_version(_major); + message->set_message_type(message_type_e::MT_REQUEST); + { + std::lock_guard<std::mutex> its_lock(serialize_mutex_); + if (serializer_->serialize(message.get())) { + its_endpoint->send(serializer_->get_data(), serializer_->get_size()); + serializer_->reset(); + } else { + return false; + } + } + return true; +} + + bool routing_manager_impl::supports_selective(service_t _service, instance_t _instance) { bool supports_selective(false); auto its_service = remote_service_info_.find(_service); @@ -2884,7 +2945,7 @@ routing_manager_impl::expire_subscriptions() { its_instance.first, its_eventgroup.first, true); } - VSOMEIP_DEBUG << "Expired subscription (" + VSOMEIP_INFO << "Expired subscription (" << std::hex << its_service.first << "." << its_instance .first << "." << its_eventgroup.first << " from " @@ -3141,10 +3202,12 @@ void routing_manager_impl::on_pong(client_t _client) { } void routing_manager_impl::on_clientendpoint_error(client_t _client) { - VSOMEIP_WARNING << "Application/Client " - << std::hex << std::setw(4) << std::setfill('0') - << _client << " will be deregistered because of an client endpoint error."; - stub_->deregister_erroneous_client(_client); + if (stub_->is_registered(_client)) { + VSOMEIP_WARNING << "Application/Client " + << std::hex << std::setw(4) << std::setfill('0') + << _client << " will be deregistered because of an client endpoint error."; + stub_->deregister_erroneous_client(_client); + } } void routing_manager_impl::confirm_pending_offers(client_t _client) { @@ -3208,6 +3271,7 @@ void routing_manager_impl::remove_specific_client_endpoint(client_t _client, ser if(found_instance != found_service->second.end()) { auto its_client = found_instance->second.find(_client); if (its_client != found_instance->second.end()) { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); if (remote_services_.find(_service) != remote_services_.end()) { if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) { auto endpoint = remote_services_[_service][_instance][_client][_reliable]; @@ -3307,6 +3371,26 @@ void routing_manager_impl::remove_identifying_client(service_t _service, instanc } } +void routing_manager_impl::unsubscribe_specific_client_at_sd( + service_t _service, instance_t _instance, client_t _client) { + bool found(false); + { + std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_); + auto its_service = specific_endpoint_clients_.find(_service); + if (its_service != specific_endpoint_clients_.end()) { + auto its_instance = its_service->second.find(_instance); + if (its_instance != its_service->second.end()) { + if (its_instance->second.find(_client) != its_instance->second.end()) { + found = true; + } + } + } + } + if (found && discovery_) { + discovery_->unsubscribe_client(_service, _instance, _client); + } +} + void routing_manager_impl::send_subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, subscription_type_e _subscription_type) { @@ -3377,5 +3461,34 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } } +void routing_manager_impl::on_net_if_state_changed(std::string _if, bool _available) { + if (_available != if_state_running_) { + if (_available) { + VSOMEIP_INFO << "Network interface \"" << _if << "\" is up and running."; + start_ip_routing(); +#ifndef WIN32 + if (netlink_connector_) { + netlink_connector_->unregister_net_if_changes_handler(); + } +#endif + } + } +} + +void routing_manager_impl::start_ip_routing() { + std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_); + if_state_running_ = true; + + if (discovery_) { + discovery_->start(); + } else { + init_routing_info(); + } + + for (auto its_service : pending_sd_offers_) { + init_service_info(its_service.first, its_service.second, true); + } + pending_sd_offers_.clear(); +} } // namespace vsomeip |