diff options
Diffstat (limited to 'implementation')
54 files changed, 2014 insertions, 611 deletions
diff --git a/implementation/configuration/include/client.hpp b/implementation/configuration/include/client.hpp index 7758eca..872974f 100644 --- a/implementation/configuration/include/client.hpp +++ b/implementation/configuration/include/client.hpp @@ -16,16 +16,20 @@ namespace vsomeip_v3 { namespace cfg { struct client { - client() : service_(ANY_SERVICE), instance_(ANY_INSTANCE) {} + client() : service_(ANY_SERVICE), + instance_(ANY_INSTANCE) { + } // ports for specific service / instance service_t service_; instance_t instance_; std::map<bool, std::set<uint16_t> > ports_; + std::map<bool, uint16_t> last_used_specific_client_port_; // client port ranges mapped to remote port ranges std::map<bool, std::pair<uint16_t, uint16_t> > remote_ports_; std::map<bool, std::pair<uint16_t, uint16_t> > client_ports_; + std::map<bool, uint16_t> last_used_client_port_; }; } // namespace cfg diff --git a/implementation/configuration/include/configuration.hpp b/implementation/configuration/include/configuration.hpp index a962298..e107e17 100644 --- a/implementation/configuration/include/configuration.hpp +++ b/implementation/configuration/include/configuration.hpp @@ -254,7 +254,7 @@ public: virtual bool is_secure_service(service_t _service, instance_t _instance) const = 0; - virtual std::uint32_t get_udp_receive_buffer_size() const = 0; + virtual int get_udp_receive_buffer_size() const = 0; virtual bool check_routing_credentials(client_t _client, uint32_t _uid, uint32_t _gid) const = 0; @@ -273,6 +273,11 @@ public: virtual uint32_t get_statistics_interval() const = 0; virtual uint32_t get_statistics_min_freq() const = 0; virtual uint32_t get_statistics_max_messages() const = 0; + + virtual uint8_t get_max_remote_subscribers() const = 0; + + virtual partition_id_t get_partition_id( + service_t _service, instance_t _instance) const = 0; }; } // namespace vsomeip_v3 diff --git a/implementation/configuration/include/configuration_impl.hpp b/implementation/configuration/include/configuration_impl.hpp index 66cfc84..cb6c284 100644 --- a/implementation/configuration/include/configuration_impl.hpp +++ b/implementation/configuration/include/configuration_impl.hpp @@ -220,7 +220,7 @@ public: VSOMEIP_EXPORT bool is_secure_service(service_t _service, instance_t _instance) const; - VSOMEIP_EXPORT std::uint32_t get_udp_receive_buffer_size() const; + VSOMEIP_EXPORT int get_udp_receive_buffer_size() const; VSOMEIP_EXPORT bool has_overlay(const std::string &_name) const; VSOMEIP_EXPORT void load_overlay(const std::string &_name); @@ -239,6 +239,11 @@ public: VSOMEIP_EXPORT uint32_t get_statistics_min_freq() const; VSOMEIP_EXPORT uint32_t get_statistics_max_messages() const; + VSOMEIP_EXPORT uint8_t get_max_remote_subscribers() const; + + VSOMEIP_EXPORT partition_id_t get_partition_id( + service_t _service, instance_t _instance) const; + private: void read_data(const std::set<std::string> &_input, std::vector<configuration_element> &_elements, @@ -355,6 +360,9 @@ private: instance_t _instance, eventgroup_t _eventgroup) const; bool find_port(uint16_t &_port, uint16_t _remote, bool _reliable, std::map<bool, std::set<uint16_t> > &_used_client_ports) const; + bool find_specific_port(uint16_t &_port, service_t _service, + instance_t _instance, bool _reliable, + std::map<bool, std::set<uint16_t> > &_used_client_ports) const; void set_magic_cookies_unicast_address(); @@ -379,6 +387,9 @@ private: void load_secure_services(const configuration_element &_element); void load_secure_service(const boost::property_tree::ptree &_tree); + void load_partitions(const configuration_element &_element); + void load_partition(const boost::property_tree::ptree &_tree); + private: std::mutex mutex_; @@ -514,7 +525,9 @@ protected: ET_PLUGIN_TYPE, ET_ROUTING_CREDENTIALS, ET_SHUTDOWN_TIMEOUT, - ET_MAX = 42 + ET_MAX_REMOTE_SUBSCRIBERS, + ET_PARTITIONS, + ET_MAX = 44 }; bool is_configured_[ET_MAX]; @@ -552,7 +565,7 @@ protected: bool has_issued_methods_warning_; bool has_issued_clients_warning_; - std::uint32_t udp_receive_buffer_size_; + int udp_receive_buffer_size_; std::chrono::nanoseconds npdu_default_debounce_requ_; std::chrono::nanoseconds npdu_default_debounce_resp_; @@ -568,6 +581,14 @@ protected: uint32_t statistics_interval_; uint32_t statistics_min_freq_; uint32_t statistics_max_messages_; + uint8_t max_remote_subscribers_; + + mutable std::mutex partitions_mutex_; + std::map<service_t, + std::map<instance_t, + partition_id_t + > + > partitions_; }; } // namespace cfg diff --git a/implementation/configuration/include/internal.hpp.in b/implementation/configuration/include/internal.hpp.in index cc21616..efd9ba0 100644 --- a/implementation/configuration/include/internal.hpp.in +++ b/implementation/configuration/include/internal.hpp.in @@ -88,6 +88,8 @@ #define VSOMEIP_DEFAULT_STATISTICS_MIN_FREQ 50 #define VSOMEIP_DEFAULT_STATISTICS_INTERVAL 10000 +#define VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS 3 + #define VSOMEIP_MAX_WAIT_SENT 5 #define VSOMEIP_COMMAND_HEADER_SIZE 7 @@ -138,6 +140,9 @@ #define VSOMEIP_UPDATE_SECURITY_CREDENTIALS 0x27 #define VSOMEIP_DISTRIBUTE_SECURITY_POLICIES 0x28 #define VSOMEIP_UPDATE_SECURITY_POLICY_INT 0x29 +#define VSOMEIP_EXPIRED_SUBSCRIPTION 0x2A + +#define VSOMEIP_SUSPEND 0x30 #define VSOMEIP_SEND_COMMAND_SIZE 13 #define VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN 7 @@ -170,6 +175,7 @@ #define VSOMEIP_REGISTER_APPLICATION_COMMAND_SIZE 7 #define VSOMEIP_DEREGISTER_APPLICATION_COMMAND_SIZE 7 #define VSOMEIP_REGISTERED_ACK_COMMAND_SIZE 7 +#define VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE 17 #ifndef _WIN32 @@ -233,6 +239,9 @@ enum class port_type_e { PT_UNKNOWN }; +typedef uint8_t partition_id_t; +const partition_id_t VSOMEIP_DEFAULT_PARTITION_ID = 0; + } // namespace vsomeip_v3 #endif // VSOMEIP_V3_INTERNAL_HPP_ diff --git a/implementation/configuration/include/internal_android.hpp b/implementation/configuration/include/internal_android.hpp index 25ecb4e..8ecd2b5 100644 --- a/implementation/configuration/include/internal_android.hpp +++ b/implementation/configuration/include/internal_android.hpp @@ -72,6 +72,8 @@ #define VSOMEIP_DEFAULT_STATISTICS_MIN_FREQ 50 #define VSOMEIP_DEFAULT_STATISTICS_INTERVAL 10000 +#define VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS 3 + #define VSOMEIP_MAX_WAIT_SENT 5 #define VSOMEIP_COMMAND_HEADER_SIZE 7 @@ -122,6 +124,9 @@ #define VSOMEIP_UPDATE_SECURITY_CREDENTIALS 0x27 #define VSOMEIP_DISTRIBUTE_SECURITY_POLICIES 0x28 #define VSOMEIP_UPDATE_SECURITY_POLICY_INT 0x29 +#define VSOMEIP_EXPIRED_SUBSCRIPTION 0x2A + +#define VSOMEIP_SUSPEND 0x30 #define VSOMEIP_SEND_COMMAND_SIZE 13 #define VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN 7 @@ -154,6 +159,7 @@ #define VSOMEIP_REGISTER_APPLICATION_COMMAND_SIZE 7 #define VSOMEIP_DEREGISTER_APPLICATION_COMMAND_SIZE 7 #define VSOMEIP_REGISTERED_ACK_COMMAND_SIZE 7 +#define VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE 17 #include <pthread.h> @@ -214,6 +220,9 @@ enum class port_type_e { PT_UNKNOWN }; +typedef uint8_t partition_id_t; +const partition_id_t VSOMEIP_DEFAULT_PARTITION_ID = 0; + } // namespace vsomeip_v3 #endif // VSOMEIP_V3_INTERNAL_HPP_ diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp index 0e9ca51..6fdbb67 100644 --- a/implementation/configuration/src/configuration_impl.cpp +++ b/implementation/configuration/src/configuration_impl.cpp @@ -94,7 +94,8 @@ configuration_impl::configuration_impl() log_statistics_(true), statistics_interval_(VSOMEIP_DEFAULT_STATISTICS_INTERVAL), statistics_min_freq_(VSOMEIP_DEFAULT_STATISTICS_MIN_FREQ), - statistics_max_messages_(VSOMEIP_DEFAULT_STATISTICS_MAX_MSG) { + statistics_max_messages_(VSOMEIP_DEFAULT_STATISTICS_MAX_MSG), + max_remote_subscribers_(VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS) { unicast_ = unicast_.from_string(VSOMEIP_UNICAST_ADDRESS); netmask_ = netmask_.from_string(VSOMEIP_NETMASK); for (auto i = 0; i < ET_MAX; i++) @@ -199,6 +200,7 @@ configuration_impl::configuration_impl(const configuration_impl &_other) statistics_interval_ = _other.statistics_interval_; statistics_min_freq_ = _other.statistics_min_freq_; statistics_max_messages_ = _other.statistics_max_messages_; + max_remote_subscribers_ = _other.max_remote_subscribers_; } configuration_impl::~configuration_impl() { @@ -510,6 +512,7 @@ bool configuration_impl::load_data(const std::vector<configuration_element> &_el load_debounce(e); load_acceptances(e); load_secure_services(e); + load_partitions(e); } } @@ -1320,6 +1323,24 @@ void configuration_impl::load_service_discovery( load_ttl_factors(i->second, &ttl_factors_subscriptions_); is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_SUBSCRIPTIONS] = true; } + } else if (its_key == "max_remote_subscribers") { + if (!is_overlay_ && is_configured_[ET_MAX_REMOTE_SUBSCRIBERS]) { + VSOMEIP_WARNING << "Multiple definitions for service_discovery.max_remote_subscribers." + " Ignoring definition from " << _element.name_; + } else { + int tmp; + its_converter << its_value; + its_converter >> tmp; + max_remote_subscribers_ = (tmp > (std::numeric_limits<std::uint8_t>::max)()) ? + (std::numeric_limits<std::uint8_t>::max)() : + static_cast<std::uint8_t>(tmp); + if (max_remote_subscribers_ == 0) { + VSOMEIP_WARNING << "max_remote_subscribers_ = 0 is not allowed. Using default (" + << std::dec << VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS << ")"; + max_remote_subscribers_ = VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS; + } + is_configured_[ET_MAX_REMOTE_SUBSCRIBERS] = true; + } } } } catch (...) { @@ -1832,6 +1853,10 @@ void configuration_impl::load_client(const boost::property_tree::ptree &_tree) { its_client->remote_ports_[false] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT); its_client->client_ports_[true] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT); its_client->client_ports_[false] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT); + its_client->last_used_specific_client_port_[true] = ILLEGAL_PORT; + its_client->last_used_specific_client_port_[false] = ILLEGAL_PORT; + its_client->last_used_client_port_[true] = ILLEGAL_PORT; + its_client->last_used_client_port_[false] = ILLEGAL_PORT; for (auto i = _tree.begin(); i != _tree.end(); ++i) { std::string its_key(i->first); @@ -2128,6 +2153,92 @@ void configuration_impl::load_selective_broadcasts_support(const configuration_e } } + +void +configuration_impl::load_partitions(const configuration_element &_element) { + + try { + auto its_partitions = _element.tree_.get_child("partitions"); + for (auto i = its_partitions.begin(); i != its_partitions.end(); ++i) { + load_partition(i->second); + } + } catch (...) { + } +} + +void +configuration_impl::load_partition(const boost::property_tree::ptree &_tree) { + + static partition_id_t its_partition_id(VSOMEIP_DEFAULT_PARTITION_ID); + + try { + + std::stringstream its_converter; + std::map<service_t, std::set<instance_t> > its_partition_members; + + for (auto i = _tree.begin(); i != _tree.end(); ++i) { + service_t its_service(0x0); + instance_t its_instance(0x0); + std::string its_service_s, its_instance_s; + + for (auto j = i->second.begin(); j != i->second.end(); ++j) { + std::string its_key(j->first); + std::string its_data(j->second.data()); + + its_converter.str(""); + its_converter.clear(); + + if (its_data.find("0x") != std::string::npos) + its_converter << std::hex; + else + its_converter << std::dec; + its_converter << its_data; + + if (its_key == "service") { + its_converter >> its_service; + its_service_s = its_data; + } else if (its_key == "instance") { + its_converter >> its_instance; + its_instance_s = its_data; + } + } + + if (its_service > 0 && its_instance > 0) + its_partition_members[its_service].insert(its_instance); + else + VSOMEIP_ERROR << "P: <" << its_service_s << "." + << its_instance_s << "> is no valid service instance."; + + } + + if (!its_partition_members.empty()) { + std::lock_guard<std::mutex> its_lock(partitions_mutex_); + its_partition_id++; + + std::stringstream its_log; + its_log << "P" + << std::dec << static_cast<int>(its_partition_id) + << " ["; + + for (const auto &i : its_partition_members) { + for (const auto j : i.second) { + partitions_[i.first][j] = its_partition_id; + its_log << "<" + << std::setw(4) << std::setfill('0') << std::hex + << i.first << "." + << std::setw(4) << std::setfill('0') << std::hex + << j + << ">"; + } + } + + its_log << "]"; + VSOMEIP_INFO << its_log.str(); + } + } catch (...) { + } +} + /////////////////////////////////////////////////////////////////////////////// // Internal helper /////////////////////////////////////////////////////////////////////////////// @@ -2351,19 +2462,14 @@ bool configuration_impl::get_client_port( std::map<bool, std::set<uint16_t> > &_used_client_ports, uint16_t &_client_port) const { bool is_configured(false); - _client_port = ILLEGAL_PORT; - auto its_client = find_client(_service, _instance); - // Check for service, instance specific port configuration - if (its_client && !its_client->ports_[_reliable].empty()) { + uint16_t its_specific_port(ILLEGAL_PORT); + if (find_specific_port(its_specific_port, _service, _instance, _reliable, _used_client_ports)) { is_configured = true; - for (auto its_port : its_client->ports_[_reliable]) { - // Found free configured port - if (_used_client_ports[_reliable].find(its_port) == _used_client_ports[_reliable].end()) { - _client_port = its_port; - return true; - } + if (its_specific_port != ILLEGAL_PORT) { + _client_port = its_specific_port; + return true; } } @@ -2615,18 +2721,120 @@ bool configuration_impl::find_port(uint16_t &_port, uint16_t _remote, bool _reli std::list<std::shared_ptr<client>>::const_iterator it; for (it = clients_.begin(); it != clients_.end(); ++it) { - if (is_in_port_range(_remote, (*it)->remote_ports_[_reliable])) { + if (_remote != ILLEGAL_PORT && is_in_port_range(_remote, + (*it)->remote_ports_[_reliable])) { is_configured = true; - for (uint16_t its_port = (*it)->client_ports_[_reliable].first; - its_port <= (*it)->client_ports_[_reliable].second; its_port++ ) { - if (_used_client_ports[_reliable].find(its_port) == _used_client_ports[_reliable].end()) { + uint16_t its_port(ILLEGAL_PORT); + if ((*it)->last_used_client_port_[_reliable] != ILLEGAL_PORT && + is_in_port_range(((*it)->last_used_client_port_[_reliable])++, + (*it)->client_ports_[_reliable])) { + its_port = ((*it)->last_used_client_port_[_reliable])++; + } else { + // on initial start of port search + if ((*it)->last_used_client_port_[_reliable] == ILLEGAL_PORT) { + its_port = (*it)->client_ports_[_reliable].first; + } else { + continue; + } + } + while (its_port <= (*it)->client_ports_[_reliable].second) { + if (_used_client_ports[_reliable].find(its_port) + == _used_client_ports[_reliable].end()) { + _port = its_port; + (*it)->last_used_client_port_[_reliable] = its_port; + return true; + } + its_port++; + } + } + } + // no free port was found in _used_client_ports or last_used_port + // cannot be incremented for any client port range + // -> reset last used port for all available client port ranges + for (it = clients_.begin(); it != clients_.end(); ++it) { + if (_remote != ILLEGAL_PORT && is_in_port_range(_remote, + (*it)->remote_ports_[_reliable])) { + (*it)->last_used_client_port_[_reliable] = ILLEGAL_PORT; + } + } + // ensure that all configured client ports are checked from beginning + for (it = clients_.begin(); it != clients_.end(); ++it) { + if (_remote != ILLEGAL_PORT && is_in_port_range(_remote, + (*it)->remote_ports_[_reliable])) { + uint16_t its_port(ILLEGAL_PORT); + its_port = (*it)->client_ports_[_reliable].first; + while (its_port <= (*it)->client_ports_[_reliable].second) { + if (_used_client_ports[_reliable].find(its_port) + == _used_client_ports[_reliable].end()) { _port = its_port; + (*it)->last_used_client_port_[_reliable] = its_port; return true; } + its_port++; } } } + return is_configured; +} + +bool configuration_impl::find_specific_port(uint16_t &_port, service_t _service, + instance_t _instance, bool _reliable, + std::map<bool, std::set<uint16_t> > &_used_client_ports) const { + bool is_configured(false); + bool check_all(false); + std::list<std::shared_ptr<client>>::const_iterator it; + auto its_client = find_client(_service, _instance); + // Check for service, instance specific port configuration + if (its_client && !its_client->ports_[_reliable].empty()) { + is_configured = true; + std::set<uint16_t>::const_iterator it; + if (its_client->last_used_specific_client_port_[_reliable] == ILLEGAL_PORT) { + it = its_client->ports_[_reliable].begin(); + } else { + it = its_client->ports_[_reliable].find( + its_client->last_used_specific_client_port_[_reliable]); + auto it_next = std::next(it, 1); + if (it_next != its_client->ports_[_reliable].end()) { + check_all = true; + it = it_next; + } else { + it = its_client->ports_[_reliable].begin(); + } + } + while (it != its_client->ports_[_reliable].end()) { + if (_used_client_ports[_reliable].find(*it) + == _used_client_ports[_reliable].end()) { + _port = *it; + its_client->last_used_specific_client_port_[_reliable] = *it; + VSOMEIP_INFO << "configuration_impl:find_specific_port #1:" + << " service: " << std::hex << _service + << " instance: " << _instance + << " reliable: " << std::dec << _reliable + << " return specific port: " << (uint32_t)_port; + return true; + } + ++it; + } + if (check_all) { + // no free port was found + // ensure that all configured client ports are checked from beginning + for (auto its_port : _used_client_ports[_reliable]) { + if (_used_client_ports[_reliable].find(its_port) + == _used_client_ports[_reliable].end()) { + _port = its_port; + its_client->last_used_specific_client_port_[_reliable] = its_port; + VSOMEIP_INFO << "configuration_impl:find_specific_port #2:" + << " service: " << std::hex << _service + << " instance: " << _instance + << " reliable: " << std::dec << _reliable + << " return specific port: " << (uint32_t)_port; + return true; + } + } + } + its_client->last_used_specific_client_port_[_reliable] = ILLEGAL_PORT; + } return is_configured; } @@ -3414,23 +3622,54 @@ configuration_impl::load_acceptance_data( // If optional was not set, use default! if (!has_optional) { const auto its_optional_client = boost::icl::interval<std::uint16_t>::closed(30491, 30499); + const auto its_optional_client_spare = boost::icl::interval<std::uint16_t>::closed(30898, 30998); const auto its_optional_server = boost::icl::interval<std::uint16_t>::closed(30501, 30599); its_ports.operator [](is_reliable).first.insert(its_optional_client); + its_ports.operator [](is_reliable).first.insert(its_optional_client_spare); its_ports.operator [](is_reliable).first.insert(its_optional_server); } // If secure was not set, use default! if (!has_secure) { const auto its_secure_client = boost::icl::interval<std::uint16_t>::closed(32491, 32499); + const auto its_secure_client_spare = boost::icl::interval<std::uint16_t>::closed(32898, 32998); const auto its_secure_server = boost::icl::interval<std::uint16_t>::closed(32501, 32599); its_ports.operator [](is_reliable).second.insert(its_secure_client); + its_ports.operator [](is_reliable).second.insert(its_secure_client_spare); its_ports.operator [](is_reliable).second.insert(its_secure_server); } } } + // If no ports are specified, use default! + if (its_ports.empty()) { + const auto its_optional_client = boost::icl::interval<std::uint16_t>::closed(30491, 30499); + const auto its_optional_client_spare = boost::icl::interval<std::uint16_t>::closed(30898, 30998); + const auto its_optional_server = boost::icl::interval<std::uint16_t>::closed(30501, 30599); + + // optional + its_ports.operator [](false).first.insert(its_optional_client); + its_ports.operator [](false).first.insert(its_optional_client_spare); + its_ports.operator [](false).first.insert(its_optional_server); + its_ports.operator [](true).first.insert(its_optional_client); + its_ports.operator [](true).first.insert(its_optional_client_spare); + its_ports.operator [](true).first.insert(its_optional_server); + + // secure + const auto its_secure_client = boost::icl::interval<std::uint16_t>::closed(32491, 32499); + const auto its_secure_client_spare = boost::icl::interval<std::uint16_t>::closed(32898, 32998); + const auto its_secure_server = boost::icl::interval<std::uint16_t>::closed(32501, 32599); + + its_ports.operator [](false).second.insert(its_secure_client); + its_ports.operator [](false).second.insert(its_secure_client_spare); + its_ports.operator [](false).second.insert(its_secure_server); + its_ports.operator [](true).second.insert(its_secure_client); + its_ports.operator [](true).second.insert(its_secure_client_spare); + its_ports.operator [](true).second.insert(its_secure_server); + } + if (!its_address.is_unspecified()) { sd_acceptance_rules_.insert( std::make_pair(its_address, @@ -3572,8 +3811,7 @@ configuration_impl::load_udp_receive_buffer_size(const configuration_element &_e } else { const std::string s(_element.tree_.get_child(urbs).data()); try { - udp_receive_buffer_size_ = static_cast<std::uint32_t>(std::stoul( - s.c_str(), NULL, 10)); + udp_receive_buffer_size_ = std::stoi(s.c_str(), NULL, 10); } catch (const std::exception &e) { VSOMEIP_ERROR<< __func__ << ": " << urbs << " " << e.what(); } @@ -3774,8 +4012,11 @@ void configuration_impl::set_sd_acceptance_rule( std::lock_guard<std::mutex> its_lock(sd_acceptance_required_ips_mutex_); const auto its_optional_client = boost::icl::interval<std::uint16_t>::closed(30491, 30499); + const auto its_optional_client_spare = boost::icl::interval<std::uint16_t>::closed(30898, 30998); const auto its_optional_server = boost::icl::interval<std::uint16_t>::closed(30501, 30599); + const auto its_secure_client = boost::icl::interval<std::uint16_t>::closed(32491, 32499); + const auto its_secure_client_spare = boost::icl::interval<std::uint16_t>::closed(32898, 32998); const auto its_secure_server = boost::icl::interval<std::uint16_t>::closed(32501, 32599); const bool rules_active = (sd_acceptance_rules_active_.find(_address) @@ -3801,8 +4042,10 @@ void configuration_impl::set_sd_acceptance_rule( (found_reliability->second.first.empty() && found_reliability->second.second.empty())) { found_reliability->second.first.add(its_optional_client); + found_reliability->second.first.add(its_optional_client_spare); found_reliability->second.first.add(its_optional_server); found_reliability->second.second.add(its_secure_client); + found_reliability->second.second.add(its_secure_client_spare); found_reliability->second.second.add(its_secure_server); if (!rules_active) { sd_acceptance_rules_active_.insert(_address); @@ -3819,8 +4062,10 @@ void configuration_impl::set_sd_acceptance_rule( } } else { found_reliability->second.first.erase(its_optional_client); + found_reliability->second.first.erase(its_optional_client_spare); found_reliability->second.first.erase(its_optional_server); found_reliability->second.second.erase(its_secure_client); + found_reliability->second.second.erase(its_secure_client_spare); found_reliability->second.second.erase(its_secure_server); if (found_reliability->second.first.empty() && found_reliability->second.second.empty()) { @@ -3836,9 +4081,11 @@ void configuration_impl::set_sd_acceptance_rule( } else if (_enable) { boost::icl::interval_set<std::uint16_t> its_optional_default; its_optional_default.add(its_optional_client); + its_optional_default.add(its_optional_client_spare); its_optional_default.add(its_optional_server); boost::icl::interval_set<std::uint16_t> its_secure_default; its_secure_default.add(its_secure_client); + its_secure_default.add(its_secure_client_spare); its_secure_default.add(its_secure_server); found_address->second.second.emplace( @@ -3857,9 +4104,11 @@ void configuration_impl::set_sd_acceptance_rule( } else if (_enable) { boost::icl::interval_set<std::uint16_t> its_optional_default; its_optional_default.add(its_optional_client); + its_optional_default.add(its_optional_client_spare); its_optional_default.add(its_optional_server); boost::icl::interval_set<std::uint16_t> its_secure_default; its_secure_default.add(its_secure_client); + its_secure_default.add(its_secure_client_spare); its_secure_default.add(its_secure_server); sd_acceptance_rules_.emplace(std::make_pair(_address, @@ -3921,7 +4170,7 @@ bool configuration_impl::is_secure_service(service_t _service, instance_t _insta return (false); } -std::uint32_t configuration_impl::get_udp_receive_buffer_size() const { +int configuration_impl::get_udp_receive_buffer_size() const { return udp_receive_buffer_size_; } @@ -4001,5 +4250,27 @@ uint32_t configuration_impl::get_statistics_max_messages() const { return statistics_max_messages_; } -} // namespace config +uint8_t configuration_impl::get_max_remote_subscribers() const { + return max_remote_subscribers_; +} + +partition_id_t +configuration_impl::get_partition_id( + service_t _service, instance_t _instance) const { + + partition_id_t its_id(VSOMEIP_DEFAULT_PARTITION_ID); + + std::lock_guard<std::mutex> its_lock(partitions_mutex_); + auto find_service = partitions_.find(_service); + if (find_service != partitions_.end()) { + auto find_instance = find_service->second.find(_instance); + if (find_instance != find_service->second.end()) { + its_id = find_instance->second; + } + } + + return (its_id); +} + +} // namespace cfg } // namespace vsomeip_v3 diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index 4411a50..518e696 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -70,6 +70,7 @@ public: virtual std::uint16_t get_remote_port() const; std::uint16_t get_local_port() const; + void set_local_port(uint16_t _port); virtual bool is_reliable() const = 0; size_t get_queue_size() const; @@ -93,7 +94,8 @@ protected: CONNECTED, ESTABLISHED }; - virtual void send_queued() = 0; + message_buffer_ptr_t get_front(); + virtual void send_queued(message_buffer_ptr_t _buffer) = 0; virtual void get_configured_times_from_endpoint( service_t _service, method_t _method, std::chrono::nanoseconds *_debouncing, diff --git a/implementation/endpoints/include/endpoint.hpp b/implementation/endpoints/include/endpoint.hpp index ccfe96b..3eafc3a 100644 --- a/implementation/endpoints/include/endpoint.hpp +++ b/implementation/endpoints/include/endpoint.hpp @@ -52,6 +52,7 @@ public: virtual void remove_default_target(service_t _service) = 0; virtual std::uint16_t get_local_port() const = 0; + virtual void set_local_port(uint16_t _port) = 0; virtual bool is_reliable() const = 0; virtual bool is_local() const = 0; diff --git a/implementation/endpoints/include/endpoint_host.hpp b/implementation/endpoints/include/endpoint_host.hpp index 61c012c..2af1697 100644 --- a/implementation/endpoints/include/endpoint_host.hpp +++ b/implementation/endpoints/include/endpoint_host.hpp @@ -29,6 +29,7 @@ public: virtual void on_connect(std::shared_ptr<endpoint> _endpoint) = 0; virtual void on_disconnect(std::shared_ptr<endpoint> _endpoint) = 0; + virtual bool on_bind_error(std::shared_ptr<endpoint> _endpoint, uint16_t _remote_port) = 0; virtual void on_error(const byte_t *_data, length_t _length, endpoint* const _receiver, const boost::asio::ip::address &_remote_address, diff --git a/implementation/endpoints/include/endpoint_impl.hpp b/implementation/endpoints/include/endpoint_impl.hpp index 953201c..76f4698 100644 --- a/implementation/endpoints/include/endpoint_impl.hpp +++ b/implementation/endpoints/include/endpoint_impl.hpp @@ -43,6 +43,7 @@ public: void remove_default_target(service_t); virtual std::uint16_t get_local_port() const = 0; + virtual void set_local_port(uint16_t _port) = 0; virtual bool is_reliable() const = 0; void increment_use_count(); diff --git a/implementation/endpoints/include/endpoint_manager_base.hpp b/implementation/endpoints/include/endpoint_manager_base.hpp index 2a766f9..aa21269 100644 --- a/implementation/endpoints/include/endpoint_manager_base.hpp +++ b/implementation/endpoints/include/endpoint_manager_base.hpp @@ -50,6 +50,7 @@ public: // endpoint_host interface virtual void on_connect(std::shared_ptr<endpoint> _endpoint); virtual void on_disconnect(std::shared_ptr<endpoint> _endpoint); + virtual bool on_bind_error(std::shared_ptr<endpoint> _endpoint, uint16_t _remote_port); virtual void on_error(const byte_t *_data, length_t _length, endpoint* const _receiver, const boost::asio::ip::address &_remote_address, diff --git a/implementation/endpoints/include/endpoint_manager_impl.hpp b/implementation/endpoints/include/endpoint_manager_impl.hpp index a7d28c6..3354947 100644 --- a/implementation/endpoints/include/endpoint_manager_impl.hpp +++ b/implementation/endpoints/include/endpoint_manager_impl.hpp @@ -82,6 +82,8 @@ public: // endpoint_host interface void on_connect(std::shared_ptr<endpoint> _endpoint); void on_disconnect(std::shared_ptr<endpoint> _endpoint); + bool on_bind_error(std::shared_ptr<endpoint> _endpoint, + std::uint16_t _remote_port); void on_error(const byte_t *_data, length_t _length, endpoint* const _receiver, const boost::asio::ip::address &_remote_address, @@ -113,8 +115,15 @@ private: std::map<bool, std::shared_ptr<endpoint>>>> remote_services_t; remote_services_t remote_services_; - typedef std::map<boost::asio::ip::address, std::map<uint16_t, - std::map<bool, std::shared_ptr<endpoint>>>> client_endpoints_by_ip_t; + typedef std::map<boost::asio::ip::address, + std::map<uint16_t, + std::map<bool, + std::map<partition_id_t, + std::shared_ptr<endpoint> + > + > + > + > client_endpoints_by_ip_t; client_endpoints_by_ip_t client_endpoints_by_ip_; std::map<service_t, std::map<endpoint *, instance_t> > service_instances_; diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp index 0e81bf1..3eae191 100644 --- a/implementation/endpoints/include/local_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp @@ -62,7 +62,7 @@ public: std::chrono::nanoseconds *_debouncing, std::chrono::nanoseconds *_maximum_retention) const; private: - void send_queued(); + void send_queued(message_buffer_ptr_t _buffer); void send_magic_cookie(); diff --git a/implementation/endpoints/include/local_server_endpoint_impl.hpp b/implementation/endpoints/include/local_server_endpoint_impl.hpp index 965f03c..8fbb619 100644 --- a/implementation/endpoints/include/local_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_server_endpoint_impl.hpp @@ -83,6 +83,7 @@ public: bool is_reliable() const; std::uint16_t get_local_port() const; + void set_local_port(std::uint16_t _port); client_t assign_client(const byte_t *_data, uint32_t _size); diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp index 193043c..dfa22e6 100644 --- a/implementation/endpoints/include/server_endpoint_impl.hpp +++ b/implementation/endpoints/include/server_endpoint_impl.hpp @@ -59,6 +59,7 @@ public: virtual bool is_reliable() const = 0; virtual std::uint16_t get_local_port() const = 0; + virtual void set_local_port(uint16_t _port) = 0; public: void connect_cbk(boost::system::error_code const &_error); @@ -98,8 +99,10 @@ protected: protected: queue_type queues_; - std::mutex clients_mutex_; - std::map<client_t, std::map<session_t, endpoint_type> > clients_; + std::mutex requests_mutex_; + std::map<client_t, + std::map<std::tuple<session_t, service_t, instance_t>, endpoint_type> + > requests_; std::map<endpoint_type, std::shared_ptr<train>> trains_; diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp index e8c2c9a..137571c 100644 --- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp @@ -42,7 +42,7 @@ public: void send_cbk(boost::system::error_code const &_error, std::size_t _bytes, const message_buffer_ptr_t& _sent_msg); private: - void send_queued(); + void send_queued(message_buffer_ptr_t _buffer); void get_configured_times_from_endpoint( service_t _service, method_t _method, std::chrono::nanoseconds *_debouncing, diff --git a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp index af9a724..bf0e1b9 100644 --- a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp @@ -52,6 +52,7 @@ public: bool get_default_target(service_t, endpoint_type &) const; std::uint16_t get_local_port() const; + void set_local_port(uint16_t _port); bool is_reliable() const; bool is_local() const; diff --git a/implementation/endpoints/include/udp_client_endpoint_impl.hpp b/implementation/endpoints/include/udp_client_endpoint_impl.hpp index 05c63c4..3a3fdcb 100644 --- a/implementation/endpoints/include/udp_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/udp_client_endpoint_impl.hpp @@ -47,8 +47,11 @@ public: bool is_local() const; void print_status(); bool is_reliable() const; + + void send_cbk(boost::system::error_code const &_error, std::size_t _bytes, + const message_buffer_ptr_t &_sent_msg); private: - void send_queued(); + void send_queued(message_buffer_ptr_t _buffer); void get_configured_times_from_endpoint( service_t _service, method_t _method, std::chrono::nanoseconds *_debouncing, @@ -66,7 +69,7 @@ private: private: const boost::asio::ip::address remote_address_; const std::uint16_t remote_port_; - const std::uint32_t udp_receive_buffer_size_; + int udp_receive_buffer_size_; std::shared_ptr<tp::tp_reassembler> tp_reassembler_; }; diff --git a/implementation/endpoints/include/udp_server_endpoint_impl.hpp b/implementation/endpoints/include/udp_server_endpoint_impl.hpp index 4ceefb6..fef09dc 100644 --- a/implementation/endpoints/include/udp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/udp_server_endpoint_impl.hpp @@ -57,6 +57,7 @@ public: bool get_default_target(service_t _service, endpoint_type &_target) const; std::uint16_t get_local_port() const; + void set_local_port(uint16_t _port); bool is_local() const; void print_status(); diff --git a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp index 38db284..adf3972 100644 --- a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp @@ -50,6 +50,7 @@ public: bool get_remote_address(boost::asio::ip::address &_address) const; std::uint16_t get_local_port() const; + void set_local_port(uint16_t _port); std::uint16_t get_remote_port() const; bool is_reliable() const; bool is_local() const; diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index 9b31cc1..66b3138 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -120,9 +120,24 @@ void client_endpoint_impl<Protocol>::stop() { connect_timer_.cancel(ec); } connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; - shutdown_and_close_socket(false); + + // bind to strand as stop() might be called from different thread + strand_.dispatch(std::bind(&client_endpoint_impl::shutdown_and_close_socket, + this->shared_from_this(), + false) + ); +} + +template<typename Protocol> +message_buffer_ptr_t client_endpoint_impl<Protocol>::get_front() { + message_buffer_ptr_t its_buffer; + if (queue_.size()) + its_buffer = queue_.front(); + + return (its_buffer); } + template<typename Protocol> bool client_endpoint_impl<Protocol>::send_to( const std::shared_ptr<endpoint_definition> _target, const byte_t *_data, @@ -317,7 +332,11 @@ void client_endpoint_impl<Protocol>::send_segments( // respect minimal debounce time wait_until_debounce_time_reached(); // ignore retention time and send immediately as the train is full anyway - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) { + strand_.dispatch(std::bind(&client_endpoint_impl::send_queued, + this->shared_from_this(), its_buffer)); + } } train_.last_departure_ = std::chrono::steady_clock::now(); } @@ -397,8 +416,10 @@ void client_endpoint_impl<Protocol>::connect_cbk( if (was_not_connected_) { was_not_connected_ = false; std::lock_guard<std::mutex> its_lock(mutex_); - if (queue_.size() > 0) { - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) { + strand_.dispatch(std::bind(&client_endpoint_impl::send_queued, + this->shared_from_this(), its_buffer)); VSOMEIP_WARNING << __func__ << ": resume sending to: " << get_remote_information(); } @@ -415,7 +436,9 @@ template<typename Protocol> void client_endpoint_impl<Protocol>::wait_connect_cbk( boost::system::error_code const &_error) { if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) { - connect(); + auto self = this->shared_from_this(); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); } } @@ -429,7 +452,9 @@ void client_endpoint_impl<Protocol>::send_cbk( if (queue_.size() > 0) { queue_size_ -= queue_.front()->size(); queue_.pop_front(); - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) + send_queued(its_buffer); } } else if (_error == boost::asio::error::broken_pipe) { state_ = cei_state_e::CLOSED; @@ -475,7 +500,8 @@ void client_endpoint_impl<Protocol>::send_cbk( } was_not_connected_ = true; shutdown_and_close_socket(true); - connect(); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); } else if (_error == boost::asio::error::not_connected || _error == boost::asio::error::bad_descriptor || _error == boost::asio::error::no_permission) { @@ -490,7 +516,8 @@ void client_endpoint_impl<Protocol>::send_cbk( } was_not_connected_ = true; shutdown_and_close_socket(true); - connect(); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); } else if (_error == boost::asio::error::operation_aborted) { VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message(); // endpoint was stopped @@ -585,6 +612,11 @@ std::uint16_t client_endpoint_impl<Protocol>::get_local_port() const { } template<typename Protocol> +void client_endpoint_impl<Protocol>::set_local_port(uint16_t _port) { + local_port_ = _port; +} + +template<typename Protocol> void client_endpoint_impl<Protocol>::start_connect_timer() { std::lock_guard<std::mutex> its_lock(connect_timer_mutex_); connect_timer_.expires_from_now( @@ -665,7 +697,11 @@ void client_endpoint_impl<Protocol>::queue_train(bool _queue_size_zero_on_entry) queue_size_ += train_.buffer_->size(); train_.buffer_ = std::make_shared<message_buffer_t>(); if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) { + strand_.dispatch(std::bind(&client_endpoint_impl::send_queued, + this->shared_from_this(), its_buffer)); + } } } diff --git a/implementation/endpoints/src/endpoint_manager_base.cpp b/implementation/endpoints/src/endpoint_manager_base.cpp index a79ae48..cfddc87 100644 --- a/implementation/endpoints/src/endpoint_manager_base.cpp +++ b/implementation/endpoints/src/endpoint_manager_base.cpp @@ -123,6 +123,13 @@ void endpoint_manager_base::on_disconnect(std::shared_ptr<endpoint> _endpoint) { rm_->on_disconnect(_endpoint); } +bool endpoint_manager_base::on_bind_error(std::shared_ptr<endpoint> _endpoint, uint16_t _remote_port) { + (void)_endpoint; + (void)_remote_port; + return true; + // intentionally left blank +} + void endpoint_manager_base::on_error( const byte_t *_data, length_t _length, endpoint* const _receiver, const boost::asio::ip::address &_remote_address, @@ -158,7 +165,7 @@ endpoint_manager_base::log_client_states() const { { std::lock_guard<std::mutex> its_lock(local_endpoint_mutex_); - for (const auto& e : local_endpoints_) { + for (const auto &e : local_endpoints_) { size_t its_queue_size = e.second->get_queue_size(); if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE) { its_client_queue_sizes.push_back( diff --git a/implementation/endpoints/src/endpoint_manager_impl.cpp b/implementation/endpoints/src/endpoint_manager_impl.cpp index a82a173..dbb2107 100644 --- a/implementation/endpoints/src/endpoint_manager_impl.cpp +++ b/implementation/endpoints/src/endpoint_manager_impl.cpp @@ -328,8 +328,10 @@ bool endpoint_manager_impl::remove_server_endpoint(uint16_t _port, bool _reliabl return ret; } -void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_t _instance, - bool _reliable) { +void +endpoint_manager_impl::clear_client_endpoints( + service_t _service, instance_t _instance, bool _reliable) { + std::shared_ptr<endpoint> endpoint_to_delete; bool other_services_reachable_through_endpoint(false); { @@ -371,8 +373,12 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_ } if (!other_services_reachable_through_endpoint) { - std::uint16_t its_port(0); + partition_id_t its_partition; boost::asio::ip::address its_address; + std::uint16_t its_port(0); + + its_partition = configuration_->get_partition_id(_service, _instance); + if (_reliable) { std::shared_ptr<tcp_client_endpoint_impl> ep = std::dynamic_pointer_cast<tcp_client_endpoint_impl>(endpoint_to_delete); @@ -392,15 +398,21 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_ if (found_ip != client_endpoints_by_ip_.end()) { const auto found_port = found_ip->second.find(its_port); if (found_port != found_ip->second.end()) { - const auto found_reliable = found_port->second.find(_reliable); + auto found_reliable = found_port->second.find(_reliable); if (found_reliable != found_port->second.end()) { - if (found_reliable->second == endpoint_to_delete) { - found_port->second.erase(_reliable); - // delete if necessary - if (!found_port->second.size()) { - found_ip->second.erase(found_port); - if (!found_ip->second.size()) { - client_endpoints_by_ip_.erase(found_ip); + const auto found_partition = found_reliable->second.find(its_partition); + if (found_partition != found_reliable->second.end()) { + if (found_partition->second == endpoint_to_delete) { + found_reliable->second.erase(its_partition); + // delete if necessary + if (0 == found_reliable->second.size()) { + found_port->second.erase(_reliable); + if (0 == found_port->second.size()) { + found_ip->second.erase(found_port); + if (0 == found_ip->second.size()) { + client_endpoints_by_ip_.erase(found_ip); + } + } } } } @@ -455,8 +467,14 @@ void endpoint_manager_impl::find_or_create_multicast_endpoint( std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); service_instances_multicast_[_service][_sender] = _instance; } - dynamic_cast<udp_server_endpoint_impl*>(its_endpoint.get())->join_unlocked( - _address.to_string()); + + auto its_udp_server_endpoint + = std::dynamic_pointer_cast<udp_server_endpoint_impl>(its_endpoint); + if (_port != configuration_->get_sd_port()) { + its_udp_server_endpoint->join(_address.to_string()); + } else { + its_udp_server_endpoint->join_unlocked(_address.to_string()); + } } else { VSOMEIP_ERROR <<"Could not find/create multicast endpoint!"; } @@ -540,11 +558,13 @@ void endpoint_manager_impl::print_status() const { VSOMEIP_INFO << "status start remote client endpoints:"; std::uint32_t num_remote_client_endpoints(0); // normal endpoints - for (const auto &a : client_endpoints_by_ip) { - for (const auto& p : a.second) { - for (const auto& ru : p.second) { - ru.second->print_status(); - num_remote_client_endpoints++; + for (const auto &its_address : client_endpoints_by_ip) { + for (const auto &its_port : its_address.second) { + for (const auto &its_reliability : its_port.second) { + for (const auto &its_partition : its_reliability.second) { + its_partition.second->print_status(); + num_remote_client_endpoints++; + } } } } @@ -800,6 +820,37 @@ void endpoint_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) { } } +bool endpoint_manager_impl::on_bind_error(std::shared_ptr<endpoint> _endpoint, std::uint16_t _remote_port) { + std::lock_guard<std::recursive_mutex> its_ep_lock(endpoint_mutex_); + for (auto &its_service : remote_services_) { + for (auto &its_instance : its_service.second) { + const bool is_reliable = _endpoint->is_reliable(); + auto found_endpoint = its_instance.second.find(is_reliable); + if (found_endpoint != its_instance.second.end()) { + if (found_endpoint->second == _endpoint) { + // get a new client port using service / instance / remote port + uint16_t its_old_local_port = _endpoint->get_local_port(); + uint16_t its_new_local_port(ILLEGAL_PORT); + + std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_); + if (configuration_->get_client_port(its_service.first, + its_instance.first, + _remote_port, + is_reliable, + used_client_ports_, + its_new_local_port)) { + _endpoint->set_local_port(its_new_local_port); + its_lock.unlock(); + release_port(its_old_local_port, _endpoint->is_reliable()); + return true; + } + } + } + } + } + return false; +} + void endpoint_manager_impl::on_error( const byte_t *_data, length_t _length, endpoint* const _receiver, const boost::asio::ip::address &_remote_address, @@ -820,8 +871,10 @@ void endpoint_manager_impl::release_port(uint16_t _port, bool _reliable) { used_client_ports_[_reliable].erase(_port); } -std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client( +std::shared_ptr<endpoint> +endpoint_manager_impl::find_remote_client( service_t _service, instance_t _instance, bool _reliable) { + std::shared_ptr<endpoint> its_endpoint; auto found_service = remote_services_.find(_service); if (found_service != remote_services_.end()) { @@ -837,35 +890,46 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client( return its_endpoint; } - // If another service is hosted on the same server_endpoint - // reuse the existing client_endpoint. + // Endpoint did not yet exist. Get the partition id to check + // whether the client endpoint for the partition does exist. + partition_id_t its_partition_id + = configuration_->get_partition_id(_service, _instance); + + // If another service within the same partition is hosted on the + // same server_endpoint reuse the existing client_endpoint. auto found_service_info = remote_service_info_.find(_service); - if(found_service_info != remote_service_info_.end()) { + if (found_service_info != remote_service_info_.end()) { auto found_instance = found_service_info->second.find(_instance); - if(found_instance != found_service_info->second.end()) { + if (found_instance != found_service_info->second.end()) { auto found_reliable = found_instance->second.find(_reliable); - if(found_reliable != found_instance->second.end()) { - std::shared_ptr<endpoint_definition> its_ep_def = - found_reliable->second; + if (found_reliable != found_instance->second.end()) { + std::shared_ptr<endpoint_definition> its_ep_def + = found_reliable->second; auto found_address = client_endpoints_by_ip_.find( its_ep_def->get_address()); - if(found_address != client_endpoints_by_ip_.end()) { + if (found_address != client_endpoints_by_ip_.end()) { auto found_port = found_address->second.find( its_ep_def->get_remote_port()); - if(found_port != found_address->second.end()) { - auto found_reliable2 = found_port->second.find( - _reliable); - if(found_reliable2 != found_port->second.end()) { - its_endpoint = found_reliable2->second; - // store the endpoint under this service/instance id - // as well - needed for later cleanup - remote_services_[_service][_instance][_reliable] = - its_endpoint; - service_instances_[_service][its_endpoint.get()] = _instance; - // add endpoint to serviceinfo object - auto found_service_info = rm_->find_service(_service,_instance); - if (found_service_info) { - found_service_info->set_endpoint(its_endpoint, _reliable); + if (found_port != found_address->second.end()) { + auto found_reliable2 + = found_port->second.find(_reliable); + if (found_reliable2 != found_port->second.end()) { + auto found_partition + = found_reliable2->second.find(its_partition_id); + if (found_partition != found_reliable2->second.end()) { + its_endpoint = found_partition->second; + + // store the endpoint under this service/instance id + // as well - needed for later cleanup + remote_services_[_service][_instance][_reliable] + = its_endpoint; + service_instances_[_service][its_endpoint.get()] = _instance; + + // add endpoint to serviceinfo object + auto found_service_info = rm_->find_service(_service,_instance); + if (found_service_info) { + found_service_info->set_endpoint(its_endpoint, _reliable); + } } } } @@ -873,7 +937,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client( } } } - return its_endpoint; + + return (its_endpoint); } std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client( @@ -910,6 +975,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client( } if (its_endpoint) { + partition_id_t its_partition + = configuration_->get_partition_id(_service, _instance); used_client_ports_[_reliable].insert(its_local_port); its_lock.unlock(); service_instances_[_service][its_endpoint.get()] = _instance; @@ -917,12 +984,19 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client( client_endpoints_by_ip_[its_endpoint_def->get_address()] [its_endpoint_def->get_port()] - [_reliable] = its_endpoint; + [_reliable] + [its_partition]= its_endpoint; // Set the basic route to the service in the service info auto found_service_info = rm_->find_service(_service, _instance); if (found_service_info) { found_service_info->set_endpoint(its_endpoint, _reliable); } + boost::system::error_code ec; + VSOMEIP_INFO << "endpoint_manager_impl::create_remote_client: " + << its_endpoint_def->get_address().to_string(ec) + << ":" << std::dec << its_endpoint_def->get_port() + << " reliable: " << _reliable + << " using local port: " << std::dec << its_local_port; } } } @@ -983,18 +1057,20 @@ endpoint_manager_impl::log_client_states() const { its_client_endpoints = client_endpoints_by_ip_; } - for (const auto& its_address : its_client_endpoints) { - for (const auto& its_port : its_address.second) { - for (const auto& its_reliability : its_port.second) { - size_t its_queue_size = its_reliability.second->get_queue_size(); - if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE) - its_client_queue_sizes.push_back( - std::make_pair( - std::make_tuple( - its_address.first, - its_port.first, - its_reliability.first), - its_queue_size)); + for (const auto &its_address : its_client_endpoints) { + for (const auto &its_port : its_address.second) { + for (const auto &its_reliability : its_port.second) { + for (const auto &its_partition : its_reliability.second) { + size_t its_queue_size = its_partition.second->get_queue_size(); + if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE) + its_client_queue_sizes.push_back( + std::make_pair( + std::make_tuple( + its_address.first, + its_port.first, + its_reliability.first), + its_queue_size)); + } } } } @@ -1040,8 +1116,8 @@ endpoint_manager_impl::log_server_states() const { its_server_endpoints = server_endpoints_; } - for (const auto& its_port : its_server_endpoints) { - for (const auto& its_reliability : its_port.second) { + for (const auto &its_port : its_server_endpoints) { + for (const auto &its_reliability : its_port.second) { size_t its_queue_size = its_reliability.second->get_queue_size(); if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE) its_client_queue_sizes.push_back( diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index 7e58e67..04c7787 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -203,20 +203,13 @@ bool local_client_endpoint_impl::send(const uint8_t *_data, uint32_t _size) { return ret; } -void local_client_endpoint_impl::send_queued() { +void local_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) { static const byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 }; static const byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 }; std::vector<boost::asio::const_buffer> bufs; - message_buffer_ptr_t its_buffer; - if(queue_.size()) { - its_buffer = queue_.front(); - } else { - return; - } - bufs.push_back(boost::asio::buffer(its_start_tag)); - bufs.push_back(boost::asio::buffer(*its_buffer)); + bufs.push_back(boost::asio::buffer(*_buffer)); bufs.push_back(boost::asio::buffer(its_end_tag)); { @@ -231,7 +224,7 @@ void local_client_endpoint_impl::send_queued() { >(shared_from_this()), std::placeholders::_1, std::placeholders::_2, - its_buffer + _buffer ) ); } diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp index 1d412e4..ebf913f 100644 --- a/implementation/endpoints/src/local_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp @@ -946,6 +946,10 @@ std::uint16_t local_server_endpoint_impl::get_local_port() const { return 0; } +void local_server_endpoint_impl::set_local_port(std::uint16_t _port) { + (void) _port; +} + bool local_server_endpoint_impl::check_packetizer_space( queue_iterator_type _queue_iterator, message_buffer_ptr_t* _packetizer, std::uint32_t _size) { diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp index cfb5ea1..ddf6b25 100644 --- a/implementation/endpoints/src/server_endpoint_impl.cpp +++ b/implementation/endpoints/src/server_endpoint_impl.cpp @@ -185,38 +185,45 @@ template<typename Protocol>bool server_endpoint_impl<Protocol>::send(const uint8 const service_t its_service = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); + const method_t its_method = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); const client_t its_client = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SESSION_POS_MIN], _data[VSOMEIP_SESSION_POS_MAX]); - clients_mutex_.lock(); - auto found_client = clients_.find(its_client); - if (found_client != clients_.end()) { - auto found_session = found_client->second.find(its_session); - if (found_session != found_client->second.end()) { - its_target = found_session->second; + requests_mutex_.lock(); + auto found_client = requests_.find(its_client); + if (found_client != requests_.end()) { + auto its_request = std::make_tuple(its_service, its_method, its_session); + auto found_request = found_client->second.find(its_request); + if (found_request != found_client->second.end()) { + its_target = found_request->second; is_valid_target = true; - found_client->second.erase(its_session); + found_client->second.erase(found_request); } else { - VSOMEIP_WARNING << "server_endpoint::send: session_id 0x" - << std::hex << its_session - << " not found for client 0x" << its_client; - const method_t its_method = - VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], - _data[VSOMEIP_METHOD_POS_MAX]); + VSOMEIP_WARNING << "server_endpoint::send: request [" + << std::hex << std::setw(4) << std::setfill('0') + << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') + << its_method << "/" + << std::hex << std::setw(4) << std::setfill('0') + << its_client << "." + << std::hex << std::setw(4) << std::setfill('0') + << its_session + << "] could not be found."; if (its_service == VSOMEIP_SD_SERVICE && its_method == VSOMEIP_SD_METHOD) { VSOMEIP_ERROR << "Clearing clients map as a request was " "received on SD port"; - clients_.clear(); + requests_.clear(); is_valid_target = get_default_target(its_service, its_target); } } } else { is_valid_target = get_default_target(its_service, its_target); } - clients_mutex_.unlock(); + requests_mutex_.unlock(); if (is_valid_target) { is_valid_target = send_intern(its_target, _data, _size); @@ -757,7 +764,7 @@ size_t server_endpoint_impl<Protocol>::get_queue_size() const { { std::lock_guard<std::mutex> its_lock(mutex_); - for (const auto& q : queues_) { + for (const auto &q : queues_) { its_queue_size += q.second.second.size(); } } diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index 3debcc7..f88d2a2 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -50,6 +50,7 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl( tcp_restart_aborts_max_(configuration_->get_max_tcp_restart_aborts()), tcp_connect_time_max_(configuration_->get_max_tcp_connect_time()), aborted_restart_count_(0), + is_sending_(false), sent_timer_(_io) { is_supporting_magic_cookies_ = true; @@ -67,64 +68,71 @@ bool tcp_client_endpoint_impl::is_local() const { } void tcp_client_endpoint_impl::start() { - connect(); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); } void tcp_client_endpoint_impl::restart(bool _force) { - if (!_force && state_ == cei_state_e::CONNECTING) { - std::chrono::steady_clock::time_point its_current - = std::chrono::steady_clock::now(); - long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>( - its_current - connect_timepoint_).count(); - if (aborted_restart_count_ < tcp_restart_aborts_max_ - && its_connect_duration < tcp_connect_time_max_) { - aborted_restart_count_++; - return; - } else { - VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts [" - << tcp_restart_aborts_max_ << "] reached! its_connect_duration: " - << its_connect_duration; + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + auto restart_func = [self, _force] { + if (!_force && self->state_ == cei_state_e::CONNECTING) { + std::chrono::steady_clock::time_point its_current + = std::chrono::steady_clock::now(); + long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>( + its_current - self->connect_timepoint_).count(); + if (self->aborted_restart_count_ < self->tcp_restart_aborts_max_ + && its_connect_duration < self->tcp_connect_time_max_) { + self->aborted_restart_count_++; + return; + } else { + VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts [" + << self->tcp_restart_aborts_max_ << "] reached! its_connect_duration: " + << its_connect_duration; + } } - } - state_ = cei_state_e::CONNECTING; - std::string address_port_local; - { - std::lock_guard<std::mutex> its_lock(socket_mutex_); - address_port_local = get_address_port_local(); - shutdown_and_close_socket_unlocked(true); - recv_buffer_ = std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0); - } - was_not_connected_ = true; - reconnect_counter_ = 0; - { - std::lock_guard<std::mutex> its_lock(mutex_); - for (const auto&m : queue_) { - const service_t its_service = VSOMEIP_BYTES_TO_WORD( - (*m)[VSOMEIP_SERVICE_POS_MIN], - (*m)[VSOMEIP_SERVICE_POS_MAX]); - const method_t its_method = VSOMEIP_BYTES_TO_WORD( - (*m)[VSOMEIP_METHOD_POS_MIN], - (*m)[VSOMEIP_METHOD_POS_MAX]); - const client_t its_client = VSOMEIP_BYTES_TO_WORD( - (*m)[VSOMEIP_CLIENT_POS_MIN], - (*m)[VSOMEIP_CLIENT_POS_MAX]); - const session_t its_session = VSOMEIP_BYTES_TO_WORD( - (*m)[VSOMEIP_SESSION_POS_MIN], - (*m)[VSOMEIP_SESSION_POS_MAX]); - VSOMEIP_WARNING << "tce::restart: dropping message: " - << "remote:" << get_address_port_remote() << " (" - << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" - << std::hex << std::setw(4) << std::setfill('0') << its_service << "." - << std::hex << std::setw(4) << std::setfill('0') << its_method << "." - << std::hex << std::setw(4) << std::setfill('0') << its_session << "]" - << " size: " << std::dec << m->size(); + self->state_ = cei_state_e::CONNECTING; + std::string address_port_local; + { + std::lock_guard<std::mutex> its_lock(self->socket_mutex_); + address_port_local = self->get_address_port_local(); + self->shutdown_and_close_socket_unlocked(true); + self->recv_buffer_ = std::make_shared<message_buffer_t>(self->recv_buffer_size_initial_, 0); } - queue_.clear(); - queue_size_ = 0; - } - VSOMEIP_WARNING << "tce::restart: local: " << address_port_local - << " remote: " << get_address_port_remote(); - start_connect_timer(); + self->was_not_connected_ = true; + self->reconnect_counter_ = 0; + { + std::lock_guard<std::mutex> its_lock(self->mutex_); + for (const auto&m : self->queue_) { + const service_t its_service = VSOMEIP_BYTES_TO_WORD( + (*m)[VSOMEIP_SERVICE_POS_MIN], + (*m)[VSOMEIP_SERVICE_POS_MAX]); + const method_t its_method = VSOMEIP_BYTES_TO_WORD( + (*m)[VSOMEIP_METHOD_POS_MIN], + (*m)[VSOMEIP_METHOD_POS_MAX]); + const client_t its_client = VSOMEIP_BYTES_TO_WORD( + (*m)[VSOMEIP_CLIENT_POS_MIN], + (*m)[VSOMEIP_CLIENT_POS_MAX]); + const session_t its_session = VSOMEIP_BYTES_TO_WORD( + (*m)[VSOMEIP_SESSION_POS_MIN], + (*m)[VSOMEIP_SESSION_POS_MAX]); + VSOMEIP_WARNING << "tce::restart: dropping message: " + << "remote:" << self->get_address_port_remote() << " (" + << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "." + << std::hex << std::setw(4) << std::setfill('0') << its_session << "]" + << " size: " << std::dec << m->size(); + } + self->queue_.clear(); + self->queue_size_ = 0; + } + VSOMEIP_WARNING << "tce::restart: local: " << address_port_local + << " remote: " << self->get_address_port_remote(); + self->start_connect_timer(); + }; + // bind to strand_ to avoid socket closure if + // parallel socket operation is currently active + strand_.dispatch(restart_func); } void tcp_client_endpoint_impl::connect() { @@ -169,26 +177,54 @@ void tcp_client_endpoint_impl::connect() { std::string its_device(configuration_->get_device()); if (its_device != "") { if (setsockopt(socket_->native_handle(), - SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) { + SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) { VSOMEIP_WARNING << "TCP Client: Could not bind to device \"" << its_device << "\""; } } #endif - // Bind address and, optionally, port. - boost::system::error_code its_bind_error; - socket_->bind(local_, its_bind_error); - if(its_bind_error) { - VSOMEIP_WARNING << "tcp_client_endpoint::connect: " - "Error binding socket: " << its_bind_error.message() - << " remote:" << get_address_port_remote(); - try { - // don't connect on bind error to avoid using a random port - strand_.post(std::bind(&client_endpoint_impl::connect_cbk, - shared_from_this(), its_bind_error)); - } catch (const std::exception &e) { - VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: " - << e.what() << " remote:" << get_address_port_remote(); + // In case a client endpoint port was configured, + // bind to it before connecting + if (local_.port() != ILLEGAL_PORT) { + boost::system::error_code its_bind_error; + socket_->bind(local_, its_bind_error); + if(its_bind_error) { + VSOMEIP_WARNING << "tcp_client_endpoint::connect: " + "Error binding socket: " << its_bind_error.message() + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + + std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock(); + if (its_host) { + // set new client port depending on service / instance / remote port + if (!its_host->on_bind_error(shared_from_this(), remote_port_)) { + VSOMEIP_WARNING << "tcp_client_endpoint::connect: " + "Failed to set new local port for tce: " + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } else { + local_.port(local_port_); + VSOMEIP_INFO << "tcp_client_endpoint::connect: " + "Using new new local port for tce: " + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } + } + try { + // don't connect on bind error to avoid using a random port + strand_.post(std::bind(&client_endpoint_impl::connect_cbk, + shared_from_this(), its_bind_error)); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: " + << e.what() + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } + return; } return; } @@ -220,7 +256,10 @@ void tcp_client_endpoint_impl::receive() { std::lock_guard<std::mutex> its_lock(socket_mutex_); its_recv_buffer = recv_buffer_; } - receive(its_recv_buffer, 0, 0); + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + strand_.dispatch([self, &its_recv_buffer](){ + self->receive(its_recv_buffer, 0, 0); + }); } void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer, @@ -277,32 +316,26 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer, } } -void tcp_client_endpoint_impl::send_queued() { - message_buffer_ptr_t its_buffer; - if(queue_.size()) { - its_buffer = queue_.front(); - } else { - return; - } +void tcp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) { const service_t its_service = VSOMEIP_BYTES_TO_WORD( - (*its_buffer)[VSOMEIP_SERVICE_POS_MIN], - (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]); + (*_buffer)[VSOMEIP_SERVICE_POS_MIN], + (*_buffer)[VSOMEIP_SERVICE_POS_MAX]); const method_t its_method = VSOMEIP_BYTES_TO_WORD( - (*its_buffer)[VSOMEIP_METHOD_POS_MIN], - (*its_buffer)[VSOMEIP_METHOD_POS_MAX]); + (*_buffer)[VSOMEIP_METHOD_POS_MIN], + (*_buffer)[VSOMEIP_METHOD_POS_MAX]); const client_t its_client = VSOMEIP_BYTES_TO_WORD( - (*its_buffer)[VSOMEIP_CLIENT_POS_MIN], - (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]); + (*_buffer)[VSOMEIP_CLIENT_POS_MIN], + (*_buffer)[VSOMEIP_CLIENT_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( - (*its_buffer)[VSOMEIP_SESSION_POS_MIN], - (*its_buffer)[VSOMEIP_SESSION_POS_MAX]); + (*_buffer)[VSOMEIP_SESSION_POS_MIN], + (*_buffer)[VSOMEIP_SESSION_POS_MAX]); if (has_enabled_magic_cookies_) { const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); if (std::chrono::duration_cast<std::chrono::milliseconds>( now - last_cookie_sent_) > std::chrono::milliseconds(10000)) { - send_magic_cookie(its_buffer); + send_magic_cookie(_buffer); last_cookie_sent_ = now; } } @@ -312,9 +345,9 @@ void tcp_client_endpoint_impl::send_queued() { std::stringstream msg; msg << "tcei<" << remote_.address() << ":" << std::dec << remote_.port() << ">::sq: "; - for (std::size_t i = 0; i < its_buffer->size(); i++) + for (std::size_t i = 0; i < _buffer->size(); i++) msg << std::hex << std::setw(2) << std::setfill('0') - << (int)(*its_buffer)[i] << " "; + << (int)(*_buffer)[i] << " "; VSOMEIP_INFO << msg.str(); #endif { @@ -326,21 +359,23 @@ void tcp_client_endpoint_impl::send_queued() { } boost::asio::async_write( *socket_, - boost::asio::buffer(*its_buffer), - std::bind(&tcp_client_endpoint_impl::write_completion_condition, - std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()), - std::placeholders::_1, - std::placeholders::_2, - its_buffer->size(), - its_service, its_method, its_client, its_session, - std::chrono::steady_clock::now()), + boost::asio::buffer(*_buffer), std::bind( + &tcp_client_endpoint_impl::write_completion_condition, + std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()), + std::placeholders::_1, + std::placeholders::_2, + _buffer->size(), + its_service, its_method, its_client, its_session, + std::chrono::steady_clock::now()), + strand_.wrap( + std::bind( &tcp_client_endpoint_base_impl::send_cbk, shared_from_this(), std::placeholders::_1, std::placeholders::_2, - its_buffer - ) + _buffer + )) ); } } @@ -675,7 +710,10 @@ void tcp_client_endpoint_impl::receive_cbk( } } its_lock.unlock(); - receive(_recv_buffer, _recv_buffer_size, its_missing_capacity); + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){ + self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity); + }); } else { VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: " << _error.message() << "(" << std::dec << _error.value() @@ -700,7 +738,10 @@ void tcp_client_endpoint_impl::receive_cbk( } } else { its_lock.unlock(); - receive(_recv_buffer, _recv_buffer_size, its_missing_capacity); + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){ + self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity); + }); } } } @@ -838,7 +879,13 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, if (queue_.size() > 0) { queue_size_ -= queue_.front()->size(); queue_.pop_front(); - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) { + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + strand_.dispatch( + [self, its_buffer]() { self->send_queued(its_buffer);} + ); + } } } else if (_error == boost::system::errc::destination_address_required) { VSOMEIP_WARNING << "tce::send_cbk received error: " << _error.message() diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index 1cd2b5b..37db3f5 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -50,7 +50,7 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl( std::string its_device(configuration_->get_device()); if (its_device != "") { if (setsockopt(acceptor_.native_handle(), - SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) { + SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) { VSOMEIP_WARNING << "TCP Server: Could not bind to device \"" << its_device << "\""; } } @@ -152,6 +152,38 @@ void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iter << static_cast<std::uint16_t>(_queue_iterator->first.port()) << " dropping outstanding messages (" << std::dec << _queue_iterator->second.second.size() << ")."; + + if (_queue_iterator->second.second.size()) { + std::set<service_t> its_services; + + // check all outstanding messages of this connection + // whether stop handlers need to be called + for (const auto &its_buffer : _queue_iterator->second.second) { + if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) { + service_t its_service = VSOMEIP_BYTES_TO_WORD( + (*its_buffer)[VSOMEIP_SERVICE_POS_MIN], + (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]); + its_services.insert(its_service); + } + } + + for (auto its_service : its_services) { + auto found_cbk = prepare_stop_handlers_.find(its_service); + if (found_cbk != prepare_stop_handlers_.end()) { + VSOMEIP_INFO << "Calling prepare stop handler " + << "for service: 0x" + << std::hex << std::setw(4) << std::setfill('0') + << its_service; + auto handler = found_cbk->second; + auto ptr = this->shared_from_this(); + service_.post([ptr, handler, its_service](){ + handler(ptr, its_service); + }); + prepare_stop_handlers_.erase(found_cbk); + } + } + } + queues_.erase(_queue_iterator->first); } } @@ -259,6 +291,10 @@ std::uint16_t tcp_server_endpoint_impl::get_local_port() const { return local_port_; } +void tcp_server_endpoint_impl::set_local_port(std::uint16_t _port) { + (void)_port; +} + bool tcp_server_endpoint_impl::is_reliable() const { return true; } @@ -551,12 +587,19 @@ void tcp_server_endpoint_impl::connection::receive_cbk( recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN], recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MAX]); if (its_client != MAGIC_COOKIE_CLIENT) { + const service_t its_service = VSOMEIP_BYTES_TO_WORD( + recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MIN], + recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MAX]); + const method_t its_method = VSOMEIP_BYTES_TO_WORD( + recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MIN], + recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN], recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MAX]); - its_server->clients_mutex_.lock(); - its_server->clients_[its_client][its_session] = remote_; - its_server->clients_mutex_.unlock(); + + std::lock_guard<std::mutex> its_requests_guard(its_server->requests_mutex_); + its_server->requests_[its_client] + [std::make_tuple(its_service, its_method, its_session)] = remote_; } } if (!magic_cookies_enabled_) { diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index dc7a7bf..3b9a212 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -14,6 +14,8 @@ #include "../../routing/include/routing_host.hpp" #include "../include/udp_client_endpoint_impl.hpp" #include "../../utility/include/utility.hpp" +#include "../../utility/include/byteorder.hpp" + namespace vsomeip_v3 { @@ -61,27 +63,50 @@ void udp_client_endpoint_impl::connect() { socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error); if (its_error) { VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't enable " - << "SO_REUSEADDR: " << its_error.message() << " remote:" - << get_address_port_remote(); + << "SO_REUSEADDR: " << its_error.message() + << " local port:" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); } + socket_->set_option(boost::asio::socket_base::receive_buffer_size( udp_receive_buffer_size_), its_error); if (its_error) { VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't set " - << "SO_RCVBUF: " << its_error.message() << " to: " - << std::dec << udp_receive_buffer_size_ << " remote:" - << get_address_port_remote(); - } else { - boost::asio::socket_base::receive_buffer_size its_option; - socket_->get_option(its_option, its_error); - if (its_error) { - VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get " - << "SO_RCVBUF: " << its_error.message() << " remote:" - << get_address_port_remote(); - } else { - VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: " - << std::dec << its_option.value(); + << "SO_RCVBUF: " << its_error.message() + << " to: " << std::dec << udp_receive_buffer_size_ + << " local port:" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } + + boost::asio::socket_base::receive_buffer_size its_option; + socket_->get_option(its_option, its_error); + #ifdef __linux__ + // If regular setting of the buffer size did not work, try to force + // (requires CAP_NET_ADMIN to be successful) + if (its_option.value() < 0 + || its_option.value() < udp_receive_buffer_size_) { + its_error.assign(setsockopt(socket_->native_handle(), + SOL_SOCKET, SO_RCVBUFFORCE, + &udp_receive_buffer_size_, sizeof(udp_receive_buffer_size_)), + boost::system::generic_category()); + if (!its_error) { + VSOMEIP_INFO << "udp_client_endpoint_impl::connect: " + << "SO_RCVBUFFORCE successful!"; } + socket_->get_option(its_option, its_error); + } + #endif + if (its_error) { + VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get " + << "SO_RCVBUF: " << its_error.message() + << " local port:" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } else { + VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: " + << std::dec << its_option.value() + << " (" << udp_receive_buffer_size_ << ")" + << " local port:" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); } #ifndef _WIN32 @@ -89,26 +114,53 @@ void udp_client_endpoint_impl::connect() { std::string its_device(configuration_->get_device()); if (its_device != "") { if (setsockopt(socket_->native_handle(), - SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) { + SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) { VSOMEIP_WARNING << "UDP Client: Could not bind to device \"" << its_device << "\""; } } #endif - // Bind address and, optionally, port. - boost::system::error_code its_bind_error; - socket_->bind(local_, its_bind_error); - if(its_bind_error) { - VSOMEIP_WARNING << "udp_client_endpoint::connect: " - "Error binding socket: " << its_bind_error.message() - << " remote:" << get_address_port_remote(); - try { - // don't connect on bind error to avoid using a random port - strand_.post(std::bind(&client_endpoint_impl::connect_cbk, - shared_from_this(), its_bind_error)); - } catch (const std::exception &e) { - VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: " - << e.what() << " remote:" << get_address_port_remote(); + // In case a client endpoint port was configured, + // bind to it before connecting + if (local_.port() != ILLEGAL_PORT) { + boost::system::error_code its_bind_error; + socket_->bind(local_, its_bind_error); + if(its_bind_error) { + VSOMEIP_WARNING << "udp_client_endpoint::connect: " + "Error binding socket: " << its_bind_error.message() + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + + std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock(); + if (its_host) { + // set new client port depending on service / instance / remote port + if (!its_host->on_bind_error(shared_from_this(), remote_port_)) { + VSOMEIP_WARNING << "udp_client_endpoint::connect: " + "Failed to set new local port for uce: " + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } else { + local_.port(local_port_); + VSOMEIP_INFO << "udp_client_endpoint::connect: " + "Using new new local port for uce: " + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } + } + + + try { + // don't connect on bind error to avoid using a random port + strand_.post(std::bind(&client_endpoint_impl::connect_cbk, + shared_from_this(), its_bind_error)); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: " + << e.what() << " remote:" << get_address_port_remote(); + } + return; } return; } @@ -158,32 +210,26 @@ void udp_client_endpoint_impl::restart(bool _force) { start_connect_timer(); } -void udp_client_endpoint_impl::send_queued() { - message_buffer_ptr_t its_buffer; - if(queue_.size()) { - its_buffer = queue_.front(); - } else { - return; - } +void udp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) { #if 0 std::stringstream msg; msg << "ucei<" << remote_.address() << ":" << std::dec << remote_.port() << ">::sq: "; - for (std::size_t i = 0; i < its_buffer->size(); i++) + for (std::size_t i = 0; i < _buffer->size(); i++) msg << std::hex << std::setw(2) << std::setfill('0') - << (int)(*its_buffer)[i] << " "; + << (int)(*_buffer)[i] << " "; VSOMEIP_INFO << msg.str(); #endif { std::lock_guard<std::mutex> its_lock(socket_mutex_); socket_->async_send( - boost::asio::buffer(*its_buffer), + boost::asio::buffer(*_buffer), std::bind( &udp_client_endpoint_base_impl::send_cbk, shared_from_this(), std::placeholders::_1, std::placeholders::_2, - its_buffer + _buffer ) ); } @@ -354,7 +400,12 @@ void udp_client_endpoint_impl::receive_cbk( receive(); } else { if (_error == boost::asio::error::connection_refused) { - shutdown_and_close_socket(false); + VSOMEIP_WARNING << "uce::receive_cbk: local: " << get_address_port_local() + << " remote: " << get_address_port_remote() + << " error: " << _error.message(); + std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock(); + its_ep_host->on_disconnect(shared_from_this()); + restart(false); } else { receive(); } @@ -415,6 +466,134 @@ std::string udp_client_endpoint_impl::get_remote_information() const { + std::to_string(remote_.port()); } +void udp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, std::size_t _bytes, + const message_buffer_ptr_t &_sent_msg) { + (void)_bytes; + if (!_error) { + std::lock_guard<std::mutex> its_lock(mutex_); + if (queue_.size() > 0) { + queue_size_ -= queue_.front()->size(); + queue_.pop_front(); + auto its_buffer = get_front(); + if (its_buffer) + send_queued(its_buffer); + } + } else if (_error == boost::asio::error::broken_pipe) { + state_ = cei_state_e::CLOSED; + bool stopping(false); + { + std::lock_guard<std::mutex> its_lock(mutex_); + stopping = sending_blocked_; + if (stopping) { + queue_.clear(); + queue_size_ = 0; + } else { + service_t its_service(0); + method_t its_method(0); + client_t its_client(0); + session_t its_session(0); + if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) { + its_service = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN], + (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]); + its_method = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_METHOD_POS_MIN], + (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]); + its_client = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN], + (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]); + its_session = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_SESSION_POS_MIN], + (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]); + } + VSOMEIP_WARNING << "uce::send_cbk received error: " + << _error.message() << " (" << std::dec + << _error.value() << ") " << get_remote_information() + << " " << std::dec << queue_.size() + << " " << std::dec << queue_size_ << " (" + << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "." + << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"; + } + } + if (!stopping) { + print_status(); + } + was_not_connected_ = true; + shutdown_and_close_socket(true); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); + } else if (_error == boost::asio::error::not_connected + || _error == boost::asio::error::bad_descriptor + || _error == boost::asio::error::no_permission) { + state_ = cei_state_e::CLOSED; + if (_error == boost::asio::error::no_permission) { + VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message() + << " (" << std::dec << _error.value() << ") " + << get_remote_information(); + std::lock_guard<std::mutex> its_lock(mutex_); + queue_.clear(); + queue_size_ = 0; + } + was_not_connected_ = true; + shutdown_and_close_socket(true); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); + } else if (_error == boost::asio::error::operation_aborted) { + VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message(); + // endpoint was stopped + sending_blocked_ = true; + shutdown_and_close_socket(false); + } else if (_error == boost::system::errc::destination_address_required) { + VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message() + << " (" << std::dec << _error.value() << ") " + << get_remote_information(); + was_not_connected_ = true; + } else { + if (state_ == cei_state_e::CONNECTING) { + VSOMEIP_WARNING << "uce::send_cbk endpoint is already restarting:" + << get_remote_information(); + } else { + state_ = cei_state_e::CONNECTING; + shutdown_and_close_socket(false); + std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock(); + if (its_host) { + its_host->on_disconnect(shared_from_this()); + } + restart(true); + } + service_t its_service(0); + method_t its_method(0); + client_t its_client(0); + session_t its_session(0); + if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) { + its_service = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN], + (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]); + its_method = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_METHOD_POS_MIN], + (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]); + its_client = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN], + (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]); + its_session = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_SESSION_POS_MIN], + (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]); + } + VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message() + << " (" << std::dec << _error.value() << ") " + << get_remote_information() << " " + << " " << std::dec << queue_.size() + << " " << std::dec << queue_size_ << " (" + << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "." + << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"; + print_status(); + } +} + bool udp_client_endpoint_impl::tp_segmentation_enabled(service_t _service, method_t _method) const { return configuration_->tp_segment_messages_client_to_service(_service, diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index 7aadf3f..bd44b48 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -55,7 +55,7 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( std::string its_device(configuration_->get_device()); if (its_device != "") { if (setsockopt(unicast_socket_.native_handle(), - SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) { + SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) { VSOMEIP_WARNING << "UDP Server: Could not bind to device \"" << its_device << "\""; } } @@ -79,28 +79,46 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( unicast_socket_.set_option(option, ec); boost::asio::detail::throw_error(ec, "broadcast option"); - const std::uint32_t its_udp_recv_buffer_size = + const int its_udp_recv_buffer_size = configuration_->get_udp_receive_buffer_size(); unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size( its_udp_recv_buffer_size), ec); - if (ec) { - VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set " + VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't set " << "SO_RCVBUF: " << ec.message() << " to: " << std::dec << its_udp_recv_buffer_size << " local port: " << std::dec << local_port_; - } else { - boost::asio::socket_base::receive_buffer_size its_option; - unicast_socket_.get_option(its_option, ec); - if (ec) { - VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get " - << "SO_RCVBUF: " << ec.message() << " local port:" - << std::dec << local_port_; - } else { - VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF is: " - << std::dec << its_option.value(); + } + + boost::asio::socket_base::receive_buffer_size its_option; + unicast_socket_.get_option(its_option, ec); +#ifdef __linux__ + // If regular setting of the buffer size did not work, try to force + // (requires CAP_NET_ADMIN to be successful) + if (its_option.value() < 0 + || its_option.value() < its_udp_recv_buffer_size) { + ec.assign(setsockopt(unicast_socket_.native_handle(), + SOL_SOCKET, SO_RCVBUFFORCE, + &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)), + boost::system::generic_category()); + if (!ec) { + VSOMEIP_INFO << "udp_server_endpoint_impl: " + << "SO_RCVBUFFORCE successful."; } + unicast_socket_.get_option(its_option, ec); } +#endif + if (ec) { + VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get " + << "SO_RCVBUF: " << ec.message() << " local port:" + << std::dec << local_port_; + } else { + VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF is: " + << std::dec << its_option.value() + << " (" << its_udp_recv_buffer_size << ") local port:" + << std::dec << local_port_; + } + #ifdef _WIN32 const char* optval("0001"); @@ -345,29 +363,44 @@ void udp_server_endpoint_impl::join_unlocked(const std::string &_address) { multicast_socket_->bind(*multicast_local_, ec); boost::asio::detail::throw_error(ec, "bind multicast"); - const std::uint32_t its_udp_recv_buffer_size = + const int its_udp_recv_buffer_size = configuration_->get_udp_receive_buffer_size(); - multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size( its_udp_recv_buffer_size), ec); - if (ec) { - VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set " + VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't set " << "SO_RCVBUF: " << ec.message() << " to: " << std::dec << its_udp_recv_buffer_size << " local port: " << std::dec << local_port_; - } else { - boost::asio::socket_base::receive_buffer_size its_option; - multicast_socket_->get_option(its_option, ec); + } - if (ec) { - VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get " - << "SO_RCVBUF: " << ec.message() << " local port:" - << std::dec << local_port_; - } else { - VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF (Multicast) is: " - << std::dec << its_option.value(); + boost::asio::socket_base::receive_buffer_size its_option; + multicast_socket_->get_option(its_option, ec); + #ifdef __linux__ + // If regular setting of the buffer size did not work, try to force + // (requires CAP_NET_ADMIN to be successful) + if (its_option.value() < 0 + || its_option.value() < its_udp_recv_buffer_size) { + ec.assign(setsockopt(multicast_socket_->native_handle(), + SOL_SOCKET, SO_RCVBUFFORCE, + &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)), + boost::system::generic_category()); + if (!ec) { + VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: " + << "SO_RCVBUFFORCE: successful."; } + multicast_socket_->get_option(its_option, ec); + } + #endif + if (ec) { + VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't get " + << "SO_RCVBUF: " << ec.message() << " local port:" + << std::dec << local_port_; + } else { + VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: SO_RCVBUF is: " + << std::dec << its_option.value() + << " (" << its_udp_recv_buffer_size << ") local port:" + << std::dec << local_port_; } #ifdef _WIN32 @@ -413,7 +446,8 @@ void udp_server_endpoint_impl::join_unlocked(const std::string &_address) { joined_group_ = true; } catch (const std::exception &e) { - VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what(); + VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what() + << " address: " << _address; } }; @@ -467,7 +501,8 @@ void udp_server_endpoint_impl::leave_unlocked(const std::string &_address) { } } catch (const std::exception &e) { - VSOMEIP_ERROR << __func__ << ":" << e.what(); + VSOMEIP_ERROR << __func__ << ":" << e.what() + << " address: " << _address; } } @@ -500,6 +535,10 @@ std::uint16_t udp_server_endpoint_impl::get_local_port() const { return local_port_; } +void udp_server_endpoint_impl::set_local_port(std::uint16_t _port) { + (void)_port; +} + void udp_server_endpoint_impl::on_unicast_received( boost::system::error_code const &_error, std::size_t _bytes, @@ -624,9 +663,13 @@ void udp_server_endpoint_impl::on_message_received( const session_t its_session = VSOMEIP_BYTES_TO_WORD( _buffer[i + VSOMEIP_SESSION_POS_MIN], _buffer[i + VSOMEIP_SESSION_POS_MAX]); - clients_mutex_.lock(); - clients_[its_client][its_session] = _remote; - clients_mutex_.unlock(); + const method_t its_method = VSOMEIP_BYTES_TO_WORD( + _buffer[i + VSOMEIP_METHOD_POS_MIN], + _buffer[i + VSOMEIP_METHOD_POS_MAX]); + + std::lock_guard<std::mutex> its_requests_guard(requests_mutex_); + requests_[its_client] + [std::make_tuple(its_service, its_method, its_session)] = _remote; } } else if (its_service != VSOMEIP_SD_SERVICE && utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS]) @@ -656,11 +699,19 @@ void udp_server_endpoint_impl::on_message_received( res.second[VSOMEIP_CLIENT_POS_MIN], res.second[VSOMEIP_CLIENT_POS_MAX]); if (its_client != MAGIC_COOKIE_CLIENT) { + const service_t its_service = VSOMEIP_BYTES_TO_WORD( + res.second[VSOMEIP_SERVICE_POS_MIN], + res.second[VSOMEIP_SERVICE_POS_MAX]); + const method_t its_method = VSOMEIP_BYTES_TO_WORD( + res.second[VSOMEIP_METHOD_POS_MIN], + res.second[VSOMEIP_METHOD_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( res.second[VSOMEIP_SESSION_POS_MIN], res.second[VSOMEIP_SESSION_POS_MAX]); - std::lock_guard<std::mutex> its_client_lock(clients_mutex_); - clients_[its_client][its_session] = _remote; + + std::lock_guard<std::mutex> its_requests_guard(requests_mutex_); + requests_[its_client] + [std::make_tuple(its_service, its_method, its_session)] = _remote; } } else if (its_service != VSOMEIP_SD_SERVICE && utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS]) diff --git a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp index c2c917f..5c8981c 100644 --- a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp @@ -112,6 +112,10 @@ std::uint16_t virtual_server_endpoint_impl::get_local_port() const { return port_; } +void virtual_server_endpoint_impl::set_local_port(std::uint16_t _port) { + port_ = _port; +} + std::uint16_t virtual_server_endpoint_impl::get_remote_port() const { return ILLEGAL_PORT; } diff --git a/implementation/logger/src/message.cpp b/implementation/logger/src/message.cpp index e4a902f..3363416 100644 --- a/implementation/logger/src/message.cpp +++ b/implementation/logger/src/message.cpp @@ -135,7 +135,8 @@ message::~message() { << std::endl; } } - } else if (its_configuration->has_dlt_log()) { + } + if (its_configuration->has_dlt_log()) { #ifdef USE_DLT its_logger->log(level_, buffer_.data_.str().c_str()); #endif // USE_DLT diff --git a/implementation/message/src/deserializer.cpp b/implementation/message/src/deserializer.cpp index c7464cb..0e33faa 100644 --- a/implementation/message/src/deserializer.cpp +++ b/implementation/message/src/deserializer.cpp @@ -111,12 +111,12 @@ bool deserializer::deserialize(uint8_t *_data, std::size_t _length) { return true; } -bool deserializer::deserialize(std::string& _target, std::size_t _length) { +bool deserializer::deserialize(std::string &_target, std::size_t _length) { if (_length > remaining_ || _length > _target.capacity()) { return false; } - _target.assign(position_, position_ + _length); - position_ += _length; + _target.assign(position_, position_ + long(_length)); + position_ += long(_length); remaining_ -= _length; return true; @@ -135,7 +135,7 @@ bool deserializer::deserialize(std::vector< uint8_t >& _value) { } bool deserializer::look_ahead(std::size_t _index, uint8_t &_value) const { - if (_index >= data_.size()) + if (_index > remaining_) return false; _value = *(position_ + static_cast<std::vector<byte_t>::difference_type>(_index)); @@ -144,7 +144,7 @@ bool deserializer::look_ahead(std::size_t _index, uint8_t &_value) const { } bool deserializer::look_ahead(std::size_t _index, uint16_t &_value) const { - if (_index+1 >= data_.size()) + if (_index+1 > remaining_) return false; std::vector< uint8_t >::iterator i = position_ + @@ -155,7 +155,7 @@ bool deserializer::look_ahead(std::size_t _index, uint16_t &_value) const { } bool deserializer::look_ahead(std::size_t _index, uint32_t &_value) const { - if (_index+3 >= data_.size()) + if (_index+3 > remaining_) return false; std::vector< uint8_t >::const_iterator i = position_ + static_cast<std::vector<byte_t>::difference_type>(_index); diff --git a/implementation/routing/include/eventgroupinfo.hpp b/implementation/routing/include/eventgroupinfo.hpp index 32ce5f2..8ec1ac6 100644 --- a/implementation/routing/include/eventgroupinfo.hpp +++ b/implementation/routing/include/eventgroupinfo.hpp @@ -42,7 +42,7 @@ public: VSOMEIP_EXPORT eventgroupinfo( const service_t _service, const service_t _instance, const eventgroup_t _eventgroup, const major_version_t _major, - const ttl_t _ttl); + const ttl_t _ttl, const uint8_t _max_remote_subscribers); VSOMEIP_EXPORT ~eventgroupinfo(); VSOMEIP_EXPORT service_t get_service() const; @@ -86,6 +86,9 @@ public: std::set<client_t> &_changed, remote_subscription_id_t &_id, const bool _is_subscribe); + bool is_remote_subscription_limit_reached( + const std::shared_ptr<remote_subscription> &_subscription); + remote_subscription_id_t add_remote_subscription( const std::shared_ptr<remote_subscription> &_subscription); @@ -107,6 +110,10 @@ public: VSOMEIP_EXPORT void send_initial_events( const std::shared_ptr<endpoint_definition> &_reliable, const std::shared_ptr<endpoint_definition> &_unreliable) const; + + VSOMEIP_EXPORT uint8_t get_max_remote_subscribers() const; + VSOMEIP_EXPORT void set_max_remote_subscribers(uint8_t _max_remote_subscribers); + private: void update_id(); uint32_t get_unreliable_target_count() const; @@ -131,9 +138,12 @@ private: std::shared_ptr<remote_subscription> > subscriptions_; remote_subscription_id_t id_; + std::map<boost::asio::ip::address, uint8_t> remote_subscribers_count_; std::atomic<reliability_type_e> reliability_; std::atomic<bool> reliability_auto_mode_; + + uint8_t max_remote_subscribers_; }; } // namespace vsomeip_v3 diff --git a/implementation/routing/include/remote_subscription.hpp b/implementation/routing/include/remote_subscription.hpp index 685aad9..ff94d5b 100644 --- a/implementation/routing/include/remote_subscription.hpp +++ b/implementation/routing/include/remote_subscription.hpp @@ -31,6 +31,7 @@ public: bool operator==(const remote_subscription &_other) const; bool equals(const std::shared_ptr<remote_subscription> &_other) const; + bool address_equals(const std::shared_ptr<remote_subscription> &_other) const; VSOMEIP_EXPORT void reset(const std::set<client_t> &_clients); @@ -89,6 +90,8 @@ public: VSOMEIP_EXPORT std::uint32_t get_answers() const; VSOMEIP_EXPORT void set_answers(const std::uint32_t _answers); + VSOMEIP_EXPORT bool get_ip_address(boost::asio::ip::address &_address) const; + private: std::atomic<remote_subscription_id_t> id_; std::atomic<bool> is_initial_; diff --git a/implementation/routing/include/routing_manager_base.hpp b/implementation/routing/include/routing_manager_base.hpp index acd57fe..270f4c4 100644 --- a/implementation/routing/include/routing_manager_base.hpp +++ b/implementation/routing/include/routing_manager_base.hpp @@ -121,6 +121,8 @@ public: virtual void set_routing_state(routing_state_e _routing_state) = 0; + virtual routing_state_e get_routing_state(); + virtual void register_client_error_handler(client_t _client, const std::shared_ptr<endpoint> &_endpoint) = 0; @@ -131,6 +133,7 @@ public: std::shared_ptr<serviceinfo> find_service(service_t _service, instance_t _instance) const; client_t find_local_client(service_t _service, instance_t _instance) const; + client_t find_local_client_unlocked(service_t _service, instance_t _instance) const; std::shared_ptr<event> find_event(service_t _service, instance_t _instance, event_t _event) const; @@ -208,6 +211,8 @@ protected: instance_t _instance, method_t _method); bool is_subscribe_to_any_event_allowed(credentials_t _credentials, client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup); + void unsubscribe_all(service_t _service, instance_t _instance); + #ifdef VSOMEIP_ENABLE_COMPAT void set_incoming_subscription_state(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event, subscription_state_e _state); @@ -259,6 +264,9 @@ protected: std::mutex event_registration_mutex_; + std::mutex routing_state_mutex_; + routing_state_e routing_state_; + #ifdef USE_DLT std::shared_ptr<trace::connector_impl> tc_; #endif diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp index b5f0ea0..3fd6eed 100644 --- a/implementation/routing/include/routing_manager_impl.hpp +++ b/implementation/routing/include/routing_manager_impl.hpp @@ -140,7 +140,7 @@ public: void on_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event, - remote_subscription_id_t _id); + remote_subscription_id_t _id, bool _simulated); // interface to stub @@ -396,6 +396,12 @@ private: const std::set<client_t> &_removed, const remote_subscription_id_t _id); + void send_expired_subscription(client_t _offering_client, + const service_t _service, const instance_t _instance, + const eventgroup_t _eventgroup, + const std::set<client_t> &_removed, + const remote_subscription_id_t _id); + void cleanup_server_endpoint(service_t _service, const std::shared_ptr<endpoint>& _endpoint); @@ -414,6 +420,8 @@ private: method_t _method, length_t _length); void statistics_log_timer_cbk(boost::system::error_code const & _error); + void send_suspend() const; + private: std::shared_ptr<routing_manager_stub> stub_; std::shared_ptr<sd::service_discovery> discovery_; @@ -476,7 +484,6 @@ private: pending_remote_offer_id_t pending_remote_offer_id_; std::map<pending_remote_offer_id_t, std::pair<service_t, instance_t>> pending_remote_offers_; - std::mutex last_resume_mutex_; std::chrono::steady_clock::time_point last_resume_; std::mutex offer_serialization_mutex_; @@ -493,6 +500,9 @@ private: msg_statistic_t> message_statistics_; std::tuple<service_t, instance_t, method_t> message_to_discard_; uint32_t ignored_statistics_counter_; + + // synchronize update_remote_subscription() and send_(un)subscription() + std::mutex update_remote_subscription_mutex_; }; } // namespace vsomeip_v3 diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp index fd546a0..2b5335a 100644 --- a/implementation/routing/include/routing_manager_proxy.hpp +++ b/implementation/routing/include/routing_manager_proxy.hpp @@ -194,6 +194,8 @@ private: void on_update_security_credentials(const byte_t *_data, uint32_t _size); void on_client_assign_ack(const client_t &_client); + void on_suspend(); + private: enum class inner_state_type_e : std::uint8_t { ST_REGISTERED = 0x0, diff --git a/implementation/routing/include/routing_manager_stub.hpp b/implementation/routing/include/routing_manager_stub.hpp index 4c2ff01..aa5796e 100644 --- a/implementation/routing/include/routing_manager_stub.hpp +++ b/implementation/routing/include/routing_manager_stub.hpp @@ -66,6 +66,11 @@ public: instance_t _instance, eventgroup_t _eventgroup, event_t _event, remote_subscription_id_t _id); + bool send_expired_subscription(const std::shared_ptr<endpoint>& _target, + client_t _client, service_t _service, + instance_t _instance, eventgroup_t _eventgroup, + event_t _event, remote_subscription_id_t _id); + void send_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event); @@ -115,6 +120,8 @@ public: const std::set<std::shared_ptr<policy> > &_policies); void remove_requester_policies(uid_t _uid, gid_t _gid); + void send_suspend() const; + private: void broadcast(const std::vector<byte_t> &_command) const; @@ -123,6 +130,9 @@ private: void distribute_credentials(client_t _hoster, service_t _service, instance_t _instance); + void inform_provider(client_t _hoster, service_t _service, + instance_t _instance, major_version_t _major, minor_version_t _minor, + routing_info_entry_e _entry); void inform_requesters(client_t _hoster, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, routing_info_entry_e _entry, diff --git a/implementation/routing/include/routing_manager_stub_host.hpp b/implementation/routing/include/routing_manager_stub_host.hpp index 4707948..6ad0ab1 100644 --- a/implementation/routing/include/routing_manager_stub_host.hpp +++ b/implementation/routing/include/routing_manager_stub_host.hpp @@ -51,7 +51,7 @@ public: virtual void on_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event, - remote_subscription_id_t _subscription_id) = 0; + remote_subscription_id_t _subscription_id, bool _simulated) = 0; virtual void on_subscribe_ack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event, diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp index bc0ba3e..b63022c 100644 --- a/implementation/routing/src/event.cpp +++ b/implementation/routing/src/event.cpp @@ -130,8 +130,10 @@ void event::set_payload(const std::shared_ptr<payload> &_payload, bool _force) { } } } else { - VSOMEIP_INFO << "Can't set payload for event " << std::hex - << message_->get_method() << " as it isn't provided"; + VSOMEIP_INFO << "Can't set payload for event " + << std::hex << std::setw(4) << std::setfill('0') + << get_service() << "." << get_instance() << "." << get_event() + << " as it isn't provided"; } } @@ -146,8 +148,10 @@ void event::set_payload(const std::shared_ptr<payload> &_payload, client_t _clie } } } else { - VSOMEIP_INFO << "Can't set payload for event " << std::hex - << message_->get_method() << " as it isn't provided"; + VSOMEIP_INFO << "Can't set payload for event " + << std::hex << std::setw(4) << std::setfill('0') + << get_service() << "." << get_instance() << "." << get_event() + << ". It isn't provided"; } } @@ -164,8 +168,10 @@ void event::set_payload(const std::shared_ptr<payload> &_payload, } } } else { - VSOMEIP_INFO << "Can't set payload for event " << std::hex - << message_->get_method() << " as it isn't provided"; + VSOMEIP_INFO << "Can't set payload for event " + << std::hex << std::setw(4) << std::setfill('0') + << get_service() << "." << get_instance() << "." << get_event() + << ". It isn't provided"; } } @@ -281,7 +287,9 @@ void event::notify() { routing_->send(VSOMEIP_ROUTING_CLIENT, message_); } else { VSOMEIP_INFO << __func__ - << ": Notifying " << std::hex << get_event() + << ": Notifying " + << std::hex << std::setw(4) << std::setfill('0') + << get_service() << "." << get_instance() << "." << get_event() << " failed. Event payload not (yet) set!"; } } @@ -293,7 +301,9 @@ void event::notify_one(client_t _client, notify_one_unlocked(_client, _target); } else { VSOMEIP_WARNING << __func__ - << ": Notifying " << std::hex << get_event() + << ": Notifying " + << std::hex << std::setw(4) << std::setfill('0') + << get_service() << "." << get_instance() << "." << get_event() << " failed. Target undefined"; } } @@ -306,13 +316,17 @@ void event::notify_one_unlocked(client_t _client, routing_->send_to(_client, _target, message_); } else { VSOMEIP_INFO << __func__ - << ": Notifying " << std::hex << get_event() + << ": Notifying " + << std::hex << std::setw(4) << std::setfill('0') + << get_service() << "." << get_instance() << "." << get_event() << " failed. Event payload not (yet) set!"; pending_.insert(_target); } } else { VSOMEIP_WARNING << __func__ - << ": Notifying " << std::hex << get_event() + << ": Notifying " + << std::hex << std::setw(4) << std::setfill('0') + << get_service() << "." << get_instance() << "." << get_event() << " failed. Target undefined"; } } @@ -329,7 +343,8 @@ void event::notify_one_unlocked(client_t _client) { } else { VSOMEIP_INFO << __func__ << ": Notifying " - << std::hex << message_->get_method() + << std::hex << std::setw(4) << std::setfill('0') + << get_service() << "." << get_instance() << "." << get_event() << " to client " << _client << " failed. Event payload not set!"; } @@ -403,9 +418,12 @@ bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _for ret = eventgroups_[_eventgroup].insert(_client).second; } else { VSOMEIP_WARNING << __func__ << ": Didnt' insert client " - << std::hex << std::setw(4) << std::setfill('0') << _client - << " to eventgroup 0x" - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup; + << std::hex << std::setw(4) << std::setfill('0') + << _client + << " to eventgroup " + << std::hex << std::setw(4) << std::setfill('0') + << get_service() << "." << get_instance() << "." + << _eventgroup; } return ret; } diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp index d09ed1b..50bbdb6 100644 --- a/implementation/routing/src/eventgroupinfo.cpp +++ b/implementation/routing/src/eventgroupinfo.cpp @@ -26,13 +26,14 @@ eventgroupinfo::eventgroupinfo() threshold_(0), id_(PENDING_SUBSCRIPTION_ID), reliability_(reliability_type_e::RT_UNKNOWN), - reliability_auto_mode_(false) { + reliability_auto_mode_(false), + max_remote_subscribers_(VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS) { } eventgroupinfo::eventgroupinfo( const service_t _service, const instance_t _instance, const eventgroup_t _eventgroup, const major_version_t _major, - const ttl_t _ttl) + const ttl_t _ttl, const uint8_t _max_remote_subscribers) : service_(_service), instance_(_instance), eventgroup_(_eventgroup), @@ -42,7 +43,8 @@ eventgroupinfo::eventgroupinfo( threshold_(0), id_(PENDING_SUBSCRIPTION_ID), reliability_(reliability_type_e::RT_UNKNOWN), - reliability_auto_mode_(false) { + reliability_auto_mode_(false), + max_remote_subscribers_(_max_remote_subscribers) { } eventgroupinfo::~eventgroupinfo() { @@ -213,65 +215,123 @@ eventgroupinfo::update_remote_subscription( const std::chrono::steady_clock::time_point &_expiration, std::set<client_t> &_changed, remote_subscription_id_t &_id, const bool _is_subscribe) { - std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); - for (const auto& its_item : subscriptions_) { - if (its_item.second->equals(_subscription)) { - // update existing subscription - _changed = its_item.second->update( - _subscription->get_clients(), _expiration, _is_subscribe); - _id = its_item.second->get_id(); - - // Copy acknowledgment states from existing subscription - for (const auto& its_client : _subscription->get_clients()) { - _subscription->set_client_state(its_client, - its_item.second->get_client_state(its_client)); - } + bool its_result(false); + std::shared_ptr<endpoint_definition> its_subscriber; + std::set<std::shared_ptr<event> > its_events; - if (_is_subscribe) { - if (!_changed.empty()) { - // New clients: - // Let this be a child subscription - _subscription->set_parent(its_item.second); - update_id(); - _subscription->set_id(id_); - subscriptions_[id_] = _subscription; - } else { - if (!_subscription->is_pending()) { - if (!_subscription->force_initial_events()) { - _subscription->set_initial(false); - } + { + std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); + + for (const auto& its_item : subscriptions_) { + if (its_item.second->equals(_subscription)) { + // update existing subscription + _changed = its_item.second->update( + _subscription->get_clients(), _expiration, _is_subscribe); + _id = its_item.second->get_id(); + + // Copy acknowledgment states from existing subscription + for (const auto& its_client : _subscription->get_clients()) { + const auto its_state = its_item.second->get_client_state(its_client); + if (_is_subscribe && + its_state == remote_subscription_state_e::SUBSCRIPTION_UNKNOWN) { + _subscription->set_client_state(its_client, + remote_subscription_state_e::SUBSCRIPTION_PENDING); + _changed.insert(its_client); } else { - its_item.second->set_answers( - its_item.second->get_answers() + 1); - _subscription->set_parent(its_item.second); - _subscription->set_answers(0); + _subscription->set_client_state(its_client, its_state); } } - } else { - if (its_item.second->is_pending()) { - for (const auto &its_event : events_) - its_event->remove_pending( - its_item.second->get_subscriber()); + + if (_is_subscribe) { + if (!_changed.empty()) { + // New clients: + // Let this be a child subscription + _subscription->set_parent(its_item.second); + update_id(); + _subscription->set_id(id_); + subscriptions_[id_] = _subscription; + } else { + if (!_subscription->is_pending()) { + if (!_subscription->force_initial_events()) { + _subscription->set_initial(false); + } + } else { + its_item.second->set_answers( + its_item.second->get_answers() + 1); + _subscription->set_parent(its_item.second); + _subscription->set_answers(0); + } + } + } else { + if (its_item.second->is_pending()) { + its_subscriber = its_item.second->get_subscriber(); + } } + + its_result = true; + break; } + } + } - return true; + if (its_subscriber) { + { + // Build set of events first to avoid having to + // hold the "events_mutex_" in parallel to the internal event mutexes. + std::lock_guard<std::mutex> its_lock(events_mutex_); + for (const auto &its_event : events_) + its_events.insert(its_event); } + for (const auto &its_event : its_events) + its_event->remove_pending(its_subscriber); } - return false; + return (its_result); +} + +bool +eventgroupinfo::is_remote_subscription_limit_reached( + const std::shared_ptr<remote_subscription> &_subscription) { + bool limit_reached(false); + + if (subscriptions_.size() <= max_remote_subscribers_) { + return false; + } + + boost::asio::ip::address its_address; + if (_subscription->get_ip_address(its_address)) { + auto find_address = remote_subscribers_count_.find(its_address); + if (find_address != remote_subscribers_count_.end()) { + if (find_address->second > max_remote_subscribers_) { + VSOMEIP_WARNING << ": remote subscriber limit [" << std::dec + << (uint32_t)max_remote_subscribers_ << "] to [" + << std::hex << std::setw(4) << std::setfill('0') << service_ << "." + << std::hex << std::setw(4) << std::setfill('0') << instance_ << "." + << std::hex << std::setw(4) << std::setfill('0') << eventgroup_ << "]" + << " reached for remote address: " << its_address.to_string() + << " rejecting subscription!"; + return true; + } + } + } + return limit_reached; } remote_subscription_id_t eventgroupinfo::add_remote_subscription( const std::shared_ptr<remote_subscription> &_subscription) { std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); + update_id(); _subscription->set_id(id_); subscriptions_[id_] = _subscription; + boost::asio::ip::address its_address; + if (_subscription->get_ip_address(its_address)) { + remote_subscribers_count_[its_address]++; + } return id_; } @@ -291,6 +351,20 @@ void eventgroupinfo::remove_remote_subscription( const remote_subscription_id_t _id) { std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); + + auto find_subscription = subscriptions_.find(_id); + if (find_subscription != subscriptions_.end()) { + boost::asio::ip::address its_address; + if (find_subscription->second->get_ip_address(its_address)) { + auto find_address = remote_subscribers_count_.find(its_address); + if (find_address != remote_subscribers_count_.end()) { + if(find_address->second != 0) { + find_address->second--; + } + } + } + } + subscriptions_.erase(_id); } @@ -298,6 +372,7 @@ void eventgroupinfo::clear_remote_subscriptions() { std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); subscriptions_.clear(); + remote_subscribers_count_.clear(); } std::set<std::shared_ptr<endpoint_definition> > @@ -390,11 +465,19 @@ eventgroupinfo::send_initial_events( } // Send events - for (const auto& its_event : its_reliable_events) + for (const auto &its_event : its_reliable_events) its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _reliable); - for (const auto& its_event : its_unreliable_events) + for (const auto &its_event : its_unreliable_events) its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _unreliable); } +uint8_t eventgroupinfo::get_max_remote_subscribers() const { + return max_remote_subscribers_; +} + +void eventgroupinfo::set_max_remote_subscribers(uint8_t _max_remote_subscribers) { + max_remote_subscribers_ = _max_remote_subscribers; +} + } // namespace vsomeip_v3 diff --git a/implementation/routing/src/remote_subscription.cpp b/implementation/routing/src/remote_subscription.cpp index daec6f9..cb04d93 100644 --- a/implementation/routing/src/remote_subscription.cpp +++ b/implementation/routing/src/remote_subscription.cpp @@ -46,6 +46,21 @@ remote_subscription::equals( return operator ==(*_other); } +bool +remote_subscription::address_equals( + const std::shared_ptr<remote_subscription> &_other) const { + bool relibale_address_equals(false); + bool unrelibale_address_equals(false); + + if (reliable_ && (*_other).reliable_) + relibale_address_equals = (reliable_->get_address() + == (*_other).reliable_->get_address()); + if (unreliable_ && (*_other).unreliable_) + unrelibale_address_equals = (unreliable_->get_address() + == (*_other).unreliable_->get_address()); + return (relibale_address_equals || unrelibale_address_equals); +} + void remote_subscription::reset(const std::set<client_t> &_clients) { auto its_client_state = std::make_pair( @@ -135,7 +150,7 @@ std::set<client_t> remote_subscription::get_clients() const { std::lock_guard<std::mutex> its_lock(mutex_); std::set<client_t> its_clients; - for (const auto& its_item : clients_) + for (const auto &its_item : clients_) its_clients.insert(its_item.first); return its_clients; } @@ -313,4 +328,17 @@ remote_subscription::set_answers(const std::uint32_t _answers) { answers_ = _answers; } +bool +remote_subscription::get_ip_address(boost::asio::ip::address &_address) const { + if (reliable_) { + _address = reliable_->get_address(); + return true; + } + else if (unreliable_) { + _address = unreliable_->get_address(); + return true; + } + return false; +} + } // namespace vsomeip_v3 diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index 960ae60..c787a9e 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -24,7 +24,8 @@ routing_manager_base::routing_manager_base(routing_manager_host *_host) : host_(_host), io_(host_->get_io()), client_(host_->get_client()), - configuration_(host_->get_configuration()) + configuration_(host_->get_configuration()), + routing_state_(routing_state_e::RS_UNKNOWN) #ifdef USE_DLT , tc_(trace::connector_impl::get()) #endif @@ -458,6 +459,8 @@ void routing_manager_base::register_event(client_t _client, its_eventgroupinfo->set_service(_service); its_eventgroupinfo->set_instance(_instance); its_eventgroupinfo->set_eventgroup(eg); + its_eventgroupinfo->set_max_remote_subscribers( + configuration_->get_max_remote_subscribers()); std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); eventgroups_[_service][_instance][eg] = its_eventgroupinfo; } @@ -549,22 +552,24 @@ bool routing_manager_base::is_response_allowed(client_t _sender, service_t _serv return true; } - if (_sender == find_local_client(_service, _instance)) { - // sender is still offering the service - return true; - } + { + std::lock_guard<std::mutex> its_lock(local_services_mutex_); + if (_sender == find_local_client_unlocked(_service, _instance)) { + // sender is still offering the service + return true; + } - std::lock_guard<std::mutex> its_lock(local_services_mutex_); - auto found_service = local_services_history_.find(_service); - if (found_service != local_services_history_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_client = found_instance->second.find(_sender); - if (found_client != found_instance->second.end()) { - // sender was offering the service and is still connected - return true; + auto found_service = local_services_history_.find(_service); + if (found_service != local_services_history_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto found_client = found_instance->second.find(_sender); + if (found_client != found_instance->second.end()) { + // sender was offering the service and is still connected + return true; + } } - } + } } // service is now offered by another client @@ -648,6 +653,21 @@ void routing_manager_base::unsubscribe(client_t _client, uid_t _uid, gid_t _gid, } } +void +routing_manager_base::unsubscribe_all( + service_t _service, instance_t _instance) { + + std::lock_guard<std::mutex> its_guard(events_mutex_); + auto find_service = events_.find(_service); + if (find_service != events_.end()) { + auto find_instance = find_service->second.find(_instance); + if (find_instance != find_service->second.end()) { + for (auto &e : find_instance->second) + e.second->clear_subscribers(); + } + } +} + void routing_manager_base::notify(service_t _service, instance_t _instance, event_t _event, std::shared_ptr<payload> _payload, bool _force) { @@ -982,6 +1002,11 @@ std::set<client_t> routing_manager_base::find_local_clients(service_t _service, client_t routing_manager_base::find_local_client(service_t _service, instance_t _instance) const { std::lock_guard<std::mutex> its_lock(local_services_mutex_); + return find_local_client_unlocked(_service, _instance); +} + +client_t routing_manager_base::find_local_client_unlocked(service_t _service, + instance_t _instance) const { client_t its_client(VSOMEIP_ROUTING_CLIENT); auto its_service = local_services_.find(_service); if (its_service != local_services_.end()) { @@ -1396,6 +1421,11 @@ routing_manager_base::get_subscriptions(const client_t _client) { return result; } +routing_state_e +routing_manager_base::get_routing_state() { + return routing_state_; +} + #ifdef VSOMEIP_ENABLE_COMPAT void routing_manager_base::set_incoming_subscription_state(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event, subscription_state_e _state) { diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 34150fe..07c0740 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -532,8 +532,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, _minor, DEFAULT_TTL); } its_info->add_client(_client); - ep_mgr_impl_->find_or_create_remote_client( - _service, _instance, true); + ep_mgr_impl_->find_or_create_remote_client(_service, _instance); } } } @@ -633,6 +632,7 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid, std::unique_lock<std::mutex> its_critical(remote_subscription_state_mutex_); bool inserted = insert_subscription(_service, _instance, _eventgroup, _event, _client, &its_already_subscribed_events); + const bool subscriber_is_rm_host = (get_client() == _client); if (inserted) { if (0 == its_local_client) { handle_subscription_state(_client, _service, _instance, _eventgroup, _event); @@ -642,7 +642,11 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid, _eventgroup, _event, its_already_subscribed_events); auto its_info = find_eventgroup(_service, _instance, _eventgroup); - if (its_info) { + // if the subscriber is the rm_host itself: check if service + // is available before subscribing via SD otherwise we sent + // a StopSubscribe/Subscribe once the first offer is received + if (its_info && + (!subscriber_is_rm_host || find_service(_service, _instance))) { discovery_->subscribe(_service, _instance, _eventgroup, _major, configured_ttl, its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT, @@ -657,7 +661,7 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid, } } } - if (get_client() == _client) { + if (subscriber_is_rm_host) { std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); subscription_data_t subscription = { _service, _instance, _eventgroup, _major, _event, _uid, _gid @@ -701,6 +705,10 @@ void routing_manager_impl::unsubscribe(client_t _client, uid_t _uid, gid_t _gid, host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; }); if (0 == find_local_client(_service, _instance)) { + if (get_client() == _client) { + std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); + remove_pending_subscription(_service, _instance, _eventgroup, _event); + } if (last_subscriber_removed) { unset_all_eventpayloads(_service, _instance, _eventgroup); { @@ -784,7 +792,8 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, if (its_target) { #ifdef USE_DLT if ((is_request && its_client == get_client()) || - (is_response && find_local_client(its_service, _instance) == get_client())) { + (is_response && find_local_client(its_service, _instance) == get_client()) || + (is_notification && find_local_client(its_service, _instance) == VSOMEIP_ROUTING_CLIENT)) { const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); @@ -1201,6 +1210,37 @@ void routing_manager_impl::notify_one(service_t _service, instance_t _instance, void routing_manager_impl::on_availability(service_t _service, instance_t _instance, bool _is_available, major_version_t _major, minor_version_t _minor) { + + // insert subscriptions of routing manager into service discovery + // to send SubscribeEventgroup after StopOffer / Offer was received + if (_is_available) { + if (discovery_) { + const client_t its_local_client = find_local_client(_service, _instance); + // remote service + if (VSOMEIP_ROUTING_CLIENT == its_local_client) { + static const ttl_t configured_ttl(configuration_->get_sd_ttl()); + + std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_); + for (auto &ps : pending_subscriptions_) { + if (ps.service_ == _service + && ps.instance_ == _instance + && ps.major_ == _major) { + auto its_info = find_eventgroup(_service, _instance, ps.eventgroup_); + if (its_info) { + discovery_->subscribe( + _service, + _instance, + ps.eventgroup_, + _major, + configured_ttl, + its_info->is_selective() ? get_client() : VSOMEIP_ROUTING_CLIENT, + its_info); + } + } + } + } + } + } host_->on_availability(_service, _instance, _is_available, _major, _minor); } @@ -1719,16 +1759,23 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se erase_offer_command(_service, _instance); } - std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_); - auto find_service = eventgroups_.find(_service); - if (find_service != eventgroups_.end()) { - auto find_instance = find_service->second.find(_instance); - if (find_instance != find_service->second.end()) { - for (auto e : find_instance->second) { - e.second->clear_remote_subscriptions(); + std::set<std::shared_ptr<eventgroupinfo> > its_eventgroup_info_set; + { + std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_); + auto find_service = eventgroups_.find(_service); + if (find_service != eventgroups_.end()) { + auto find_instance = find_service->second.find(_instance); + if (find_instance != find_service->second.end()) { + for (auto e : find_instance->second) { + its_eventgroup_info_set.insert(e.second); + } } } } + + for (auto e : its_eventgroup_info_set) { + e->clear_remote_subscriptions(); + } } else { erase_offer_command(_service, _instance); } @@ -1902,6 +1949,11 @@ bool routing_manager_impl::deliver_notification( } } if (!cache_event) { + VSOMEIP_WARNING << __func__ << ": dropping [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_event_id + << "]. No subscription to corresponding eventgroup."; return true; // as there is nothing to do } } @@ -1996,13 +2048,10 @@ std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoin its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE, _address, _port); if (!_reliable) { -#if defined(_WIN32) || defined(ANDROID) - dynamic_cast<udp_server_endpoint_impl*>( - its_service_endpoint.get())->join(_address); -#else - reinterpret_cast<udp_server_endpoint_impl*>( - its_service_endpoint.get())->join(_address); -#endif + auto its_udp_server_endpoint_impl = std::dynamic_pointer_cast< + udp_server_endpoint_impl>(its_service_endpoint); + if (its_udp_server_endpoint_impl) + its_udp_server_endpoint_impl->join(_address); } } else { VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. " @@ -2150,6 +2199,7 @@ bool routing_manager_impl::is_field(service_t _service, instance_t _instance, return false; } +//only called from the SD void routing_manager_impl::add_routing_info( service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl, @@ -2158,6 +2208,12 @@ void routing_manager_impl::add_routing_info( const boost::asio::ip::address &_unreliable_address, uint16_t _unreliable_port) { + std::lock_guard<std::mutex> its_lock(routing_state_mutex_); + if (routing_state_ == routing_state_e::RS_SUSPENDED) { + VSOMEIP_INFO << "rmi::" << __func__ << " We are suspened --> do nothing."; + return; + } + // Create/Update service info std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance)); if (!its_info) { @@ -2259,6 +2315,7 @@ void routing_manager_impl::add_routing_info( } } else if (_reliable_port != ILLEGAL_PORT && is_reliable_known) { std::lock_guard<std::mutex> its_lock(requested_services_mutex_); + bool connected(false); for(const auto &client_id : requested_services_) { auto found_service = client_id.second.find(_service); if (found_service != client_id.second.end()) { @@ -2272,21 +2329,34 @@ void routing_manager_impl::add_routing_info( || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { std::shared_ptr<endpoint> ep = its_info->get_endpoint(true); - if (ep && ep->is_established() && + if (ep) { + if (ep->is_established() && !stub_->contained_in_routing_info( VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor())) { - on_availability(_service, _instance, - true, its_info->get_major(), its_info->get_minor()); - stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, - _service, _instance, - its_info->get_major(), - its_info->get_minor()); - if (discovery_) { - discovery_->on_endpoint_connected( - _service, _instance, ep); + on_availability(_service, _instance, + true, its_info->get_major(), its_info->get_minor()); + stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, + _service, _instance, + its_info->get_major(), + its_info->get_minor()); + if (discovery_) { + discovery_->on_endpoint_connected( + _service, _instance, ep); + } + } + } else { + // no endpoint yet, but requested -> create one + + // SWS_SD_00376 establish TCP connection to service + // service is marked as available later in on_connect() + if (!connected) { + ep_mgr_impl_->find_or_create_remote_client( + _service, _instance, true); + connected = true; } + its_info->add_client(client_id.first); } break; } @@ -2358,7 +2428,8 @@ void routing_manager_impl::add_routing_info( && (major_minor_pair.second <= _minor || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { - if (!stub_->contained_in_routing_info( + if (_reliable_port == ILLEGAL_PORT && !is_reliable_known && + !stub_->contained_in_routing_info( VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor())) { @@ -2411,18 +2482,15 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst // do no longer exist and the last received payload is no // longer valid. for (auto &its_event : its_eventgroup.second->get_events()) { - const auto its_subscribers = its_event->get_subscribers(); - for (const auto& its_subscriber : its_subscribers) { + const auto& its_subscribers = its_event->get_subscribers(); + for (const auto its_subscriber : its_subscribers) { if (its_subscriber != get_client()) { its_event->remove_subscriber( its_eventgroup.first, its_subscriber); } } its_events.push_back(its_event); - remove_pending_subscription(_service, _instance, - its_eventgroup.first, its_event->get_event()); } - } } } @@ -2436,14 +2504,14 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst std::set<std::tuple< service_t, instance_t, eventgroup_t, client_t> > its_invalid; - for (const auto& its_state : remote_subscription_state_) { + for (const auto &its_state : remote_subscription_state_) { if (std::get<0>(its_state.first) == _service && std::get<1>(its_state.first) == _instance) { its_invalid.insert(its_state.first); } } - for (const auto& its_key : its_invalid) + for (const auto &its_key : its_invalid) remote_subscription_state_.erase(its_key); } @@ -2599,8 +2667,15 @@ routing_manager_impl::expire_subscriptions( const bool expire_all = (_range.first == ANY_PORT && _range.second == ANY_PORT); - std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - for (const auto &its_service : eventgroups_) { + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, + std::shared_ptr<eventgroupinfo> > > >its_eventgroups; + { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + its_eventgroups = eventgroups_; + } + for (const auto &its_service : its_eventgroups) { for (const auto &its_instance : its_service.second) { for (const auto &its_eventgroup : its_instance.second) { const auto its_info = its_eventgroup.second; @@ -2743,10 +2818,12 @@ void routing_manager_impl::on_remote_subscribe( // not exist or is still (partly) pending. remote_subscription_id_t its_id; std::set<client_t> its_added; + update_remote_subscription_mutex_.lock(); auto its_result = its_eventgroupinfo->update_remote_subscription( _subscription, its_expiration, its_added, its_id, true); if (its_result) { if (!_subscription->is_pending()) { // resubscription without change + update_remote_subscription_mutex_.unlock(); _callback(_subscription); } else if (!its_added.empty()) { // new clients for a selective subscription const client_t its_offering_client @@ -2754,6 +2831,7 @@ void routing_manager_impl::on_remote_subscribe( send_subscription(its_offering_client, its_service, its_instance, its_eventgroup, its_major, its_added, _subscription->get_id()); + update_remote_subscription_mutex_.unlock(); } else { // identical subscription is not yet processed std::stringstream its_warning; its_warning << __func__ << " a remote subscription is already pending [" @@ -2774,9 +2852,21 @@ void routing_manager_impl::on_remote_subscribe( if (its_reliable && its_unreliable) its_warning << "]"; VSOMEIP_WARNING << its_warning.str(); + + update_remote_subscription_mutex_.unlock(); _callback(_subscription); } } else { // new subscription + if (its_eventgroupinfo->is_remote_subscription_limit_reached( + _subscription)) { + _subscription->set_all_client_states( + remote_subscription_state_e::SUBSCRIPTION_NACKED); + + update_remote_subscription_mutex_.unlock(); + _callback(_subscription); + return; + } + auto its_id = its_eventgroupinfo->add_remote_subscription(_subscription); @@ -2785,6 +2875,7 @@ void routing_manager_impl::on_remote_subscribe( send_subscription(its_offering_client, its_service, its_instance, its_eventgroup, its_major, _subscription->get_clients(), its_id); + update_remote_subscription_mutex_.unlock(); } } @@ -2820,6 +2911,7 @@ void routing_manager_impl::on_remote_unsubscribe( remote_subscription_id_t its_id(0); std::set<client_t> its_removed; + update_remote_subscription_mutex_.lock(); auto its_result = its_info->update_remote_subscription( _subscription, std::chrono::steady_clock::now(), its_removed, its_id, false); @@ -2831,6 +2923,8 @@ void routing_manager_impl::on_remote_unsubscribe( its_service, its_instance, its_eventgroup, its_major, its_removed, its_id); } + + update_remote_subscription_mutex_.unlock(); } void routing_manager_impl::on_subscribe_ack_with_multicast( @@ -2931,13 +3025,20 @@ std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client( void routing_manager_impl::on_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event, remote_subscription_id_t _id) { + event_t _event, remote_subscription_id_t _id, bool _simulated) { (void)_event; // TODO: Remove completely? auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); if (its_eventgroup) { auto its_subscription = its_eventgroup->get_remote_subscription(_id); if (its_subscription) { + if (_simulated) { + // method was called because a subscription for unoffered + // service was received. Therefore, remove the remote_subscription + // from the eventgroupinfo to ensure subsequent similar + // subscriptions are handled like a new/unknown subscription + its_eventgroup->remove_remote_subscription(_id); + } its_subscription->set_client_state(_client, remote_subscription_state_e::SUBSCRIPTION_NACKED); @@ -3110,6 +3211,10 @@ void routing_manager_impl::clear_remote_subscriber( std::chrono::steady_clock::time_point routing_manager_impl::expire_subscriptions(bool _force) { + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, + std::shared_ptr<eventgroupinfo> > > >its_eventgroups; std::map<std::shared_ptr<remote_subscription>, std::set<client_t> > its_expired_subscriptions; @@ -3119,24 +3224,25 @@ routing_manager_impl::expire_subscriptions(bool _force) { = std::chrono::steady_clock::now() + std::chrono::hours(24); { std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - - for (auto &its_service : eventgroups_) { - for (auto &its_instance : its_service.second) { - for (auto &its_eventgroup : its_instance.second) { - auto its_subscriptions - = its_eventgroup.second->get_remote_subscriptions(); - for (auto &s : its_subscriptions) { - for (auto its_client : s->get_clients()) { - if (_force) { - its_expired_subscriptions[s].insert(its_client); - } else { - auto its_expiration = s->get_expiration(its_client); - if (its_expiration != std::chrono::steady_clock::time_point()) { - if (its_expiration < now) { - its_expired_subscriptions[s].insert(its_client); - } else if (its_expiration < its_next_expiration) { - its_next_expiration = its_expiration; - } + its_eventgroups = eventgroups_; + } + + for (auto &its_service : its_eventgroups) { + for (auto &its_instance : its_service.second) { + for (auto &its_eventgroup : its_instance.second) { + auto its_subscriptions + = its_eventgroup.second->get_remote_subscriptions(); + for (auto &s : its_subscriptions) { + for (auto its_client : s->get_clients()) { + if (_force) { + its_expired_subscriptions[s].insert(its_client); + } else { + auto its_expiration = s->get_expiration(its_client); + if (its_expiration != std::chrono::steady_clock::time_point()) { + if (its_expiration < now) { + its_expired_subscriptions[s].insert(its_client); + } else if (its_expiration < its_next_expiration) { + its_next_expiration = its_expiration; } } } @@ -3152,22 +3258,53 @@ routing_manager_impl::expire_subscriptions(bool _force) { auto its_service = its_info->get_service(); auto its_instance = its_info->get_instance(); auto its_eventgroup = its_info->get_eventgroup(); - auto its_major = its_info->get_major(); remote_subscription_id_t its_id; + update_remote_subscription_mutex_.lock(); auto its_result = its_info->update_remote_subscription( s.first, std::chrono::steady_clock::now(), s.second, its_id, false); if (its_result) { const client_t its_offering_client = find_local_client(its_service, its_instance); - send_unsubscription(its_offering_client, - its_service, its_instance, its_eventgroup, its_major, + const auto its_subscription = its_info->get_remote_subscription(its_id); + if (its_subscription) { + its_info->remove_remote_subscription(its_id); + + std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); + remote_subscribers_[its_service][its_instance].erase(its_offering_client); + + if (its_info->get_remote_subscriptions().size() == 0) { + for (const auto &its_event : its_info->get_events()) { + bool has_remote_subscriber(false); + for (const auto &its_eventgroup : its_event->get_eventgroups()) { + const auto its_eventgroup_info + = find_eventgroup(its_service, its_instance, its_eventgroup); + if (its_eventgroup_info + && its_eventgroup_info->get_remote_subscriptions().size() > 0) { + has_remote_subscriber = true; + } + } + if (!has_remote_subscriber && its_event->is_shadow()) { + its_event->unset_payload(); + } + } + } + } else { + VSOMEIP_ERROR << __func__ + << ": Unknown expired subscription " << std::dec << its_id << " for eventgroup [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]"; + } + send_expired_subscription(its_offering_client, + its_service, its_instance, its_eventgroup, s.second, s.first->get_id()); } + update_remote_subscription_mutex_.unlock(); if (s.first->get_unreliable()) { - VSOMEIP_INFO << "Expired subscription [" + VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription [" << std::hex << std::setfill('0') << std::setw(4) << its_service << "." << std::hex << std::setfill('0') << std::setw(4) << its_instance << "." << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] unreliable from " @@ -3176,7 +3313,7 @@ routing_manager_impl::expire_subscriptions(bool _force) { } if (s.first->get_reliable()) { - VSOMEIP_INFO << "Expired subscription [" + VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription [" << std::hex << std::setfill('0') << std::setw(4) << its_service << "." << std::hex << std::setfill('0') << std::setw(4) << its_instance << "." << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] reliable from " @@ -3205,7 +3342,7 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const } std::stringstream its_last_resume; { - std::lock_guard<std::mutex> its_lock(last_resume_mutex_); + std::lock_guard<std::mutex> its_lock(routing_state_mutex_); if (last_resume_ != std::chrono::steady_clock::time_point::min()) { its_last_resume << " | " << std::dec << std::chrono::duration_cast<std::chrono::seconds>( @@ -3491,15 +3628,31 @@ void routing_manager_impl::send_subscribe(client_t _client, service_t _service, } void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { + + // Ignore setting to the current routing state + { + std::lock_guard<std::mutex> its_lock(routing_state_mutex_); + if (routing_state_ == _routing_state) { + VSOMEIP_INFO << "rmi::" << __func__ << " No routing state change --> do nothing."; + return; + } + + routing_state_ = _routing_state; + } + if(discovery_) { switch (_routing_state) { case routing_state_e::RS_SUSPENDED: { - VSOMEIP_INFO << "Set routing to suspend mode, diagnosis mode is " - << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode, diagnosis mode is " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + // stop processing of incoming SD messages discovery_->stop(); + VSOMEIP_INFO << "rmi::" << __func__ << " Inform all applications that we are going to suspend"; + send_suspend(); + // remove all remote subscriptions to remotely offered services on this node expire_subscriptions(true); @@ -3520,6 +3673,10 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); remote_subscription_state_.clear(); } + + // send StopSubscribes and clear subscribed_ map + discovery_->unsubscribe_all_on_suspend(); + // mark all external services as offline services_t its_remote_services; { @@ -3528,11 +3685,6 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } for (const auto &s : its_remote_services) { for (const auto &i : s.second) { - // determine existing subscriptions to remote service and send StopSubscribe - for (auto its_eventgroup : get_subscribed_eventgroups(s.first, i.first)) { - discovery_->unsubscribe(s.first, i.first, its_eventgroup, VSOMEIP_ROUTING_CLIENT); - } - const bool has_reliable(i.second->get_endpoint(true)); const bool has_unreliable(i.second->get_endpoint(false)); del_routing_info(s.first, i.first, has_reliable, has_unreliable); @@ -3541,14 +3693,18 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { unset_all_eventpayloads(s.first, i.first); } } + + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode done, diagnosis mode is " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + break; } case routing_state_e::RS_RESUMED: { - VSOMEIP_INFO << "Set routing to resume mode, diagnosis mode was " - << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode, diagnosis mode was " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); { - std::lock_guard<std::mutex> its_lock(last_resume_mutex_); + std::lock_guard<std::mutex> its_lock(routing_state_mutex_); last_resume_ = std::chrono::steady_clock::now(); } @@ -3575,11 +3731,14 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { discovery_->offer_service(its_instance.second); } } + + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode done, diagnosis mode was " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); break; } case routing_state_e::RS_DIAGNOSIS: { - VSOMEIP_INFO << "Set routing to diagnosis mode."; + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode."; discovery_->set_diagnosis_mode(true); // send StopOffer messages for all someip protocol services @@ -3591,11 +3750,13 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } } } + + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode done."; break; } case routing_state_e::RS_RUNNING: - VSOMEIP_INFO << "Set routing to running mode, diagnosis mode was " - << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode, diagnosis mode was " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); // Reset relevant in service info for (const auto &its_service : get_offered_services()) { @@ -3619,6 +3780,9 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } } } + + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode done, diagnosis mode was " + << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); break; default: break; @@ -3724,7 +3888,7 @@ void routing_manager_impl::requested_service_remove(client_t _client, if (!found_service->second.size()) { found_client->second.erase(_service); if (!found_client->second.size()) { - requested_services_.erase(client_); + requested_services_.erase(_client); } } } @@ -4057,6 +4221,7 @@ routing_manager_impl::on_unsubscribe_ack(client_t _client, std::shared_ptr<eventgroupinfo> its_info = find_eventgroup(_service, _instance, _eventgroup); if (its_info) { + update_remote_subscription_mutex_.lock(); const auto its_subscription = its_info->get_remote_subscription(_id); if (its_subscription) { its_info->remove_remote_subscription(_id); @@ -4088,6 +4253,7 @@ routing_manager_impl::on_unsubscribe_ack(client_t _client, << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } + update_remote_subscription_mutex_.unlock(); } else { VSOMEIP_ERROR << __func__ << ": Received StopSubscribe for unknown eventgroup: (" @@ -4122,7 +4288,7 @@ void routing_manager_impl::send_subscription( &routing_manager_stub_host::on_subscribe_nack, std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), its_client, _service, _instance, - _eventgroup, ANY_EVENT, _id); + _eventgroup, ANY_EVENT, _id, false); io_.post(its_callback); } else { const auto its_callback = std::bind( @@ -4146,7 +4312,7 @@ void routing_manager_impl::send_subscription( &routing_manager_stub_host::on_subscribe_nack, std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), its_client, _service, _instance, _eventgroup, - ANY_EVENT, _id); + ANY_EVENT, _id, true); io_.post(its_callback); } catch (const std::exception &e) { VSOMEIP_ERROR << __func__ << e.what(); @@ -4270,7 +4436,7 @@ routing_manager_impl::send_unsubscription(client_t _offering_client, host_->on_subscription(_service, _instance, _eventgroup, its_client, own_uid_, own_gid_, false, [this, self, _service, _instance, _eventgroup, - its_client, _id, _offering_client] + its_client, _id] (const bool _is_accepted) { (void)_is_accepted; try { @@ -4303,6 +4469,30 @@ routing_manager_impl::send_unsubscription(client_t _offering_client, } } +void +routing_manager_impl::send_expired_subscription(client_t _offering_client, + service_t _service, instance_t _instance, + eventgroup_t _eventgroup, + const std::set<client_t> &_removed, + remote_subscription_id_t _id) { + + if (host_->get_client() == _offering_client) { + auto self = shared_from_this(); + for (const auto its_client : _removed) { + host_->on_subscription(_service, _instance, + _eventgroup, its_client, own_uid_, own_gid_, false, + [] (const bool _subscription_accepted){ + (void)_subscription_accepted; + }); + } + } else { + for (const auto its_client : _removed) { + stub_->send_expired_subscription(find_local(_offering_client), its_client, + _service, _instance, _eventgroup, ANY_EVENT, _id); + } + } +} + bool routing_manager_impl::update_security_policy_configuration( uint32_t _uid, uint32_t _gid, @@ -4347,7 +4537,7 @@ bool routing_manager_impl::insert_event_statistics(service_t _service, instance_ // check list for entry with least counter value uint32_t its_min_count(0xFFFFFFFF); auto its_tuple_to_discard = std::make_tuple(0xFFFF, 0xFFFF, 0xFFFF); - for (const auto& it : message_statistics_) { + for (const auto &it : message_statistics_) { if (it.second.counter_ < its_min_count) { its_min_count = it.second.counter_; its_tuple_to_discard = it.first; @@ -4389,7 +4579,7 @@ void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code co std::stringstream its_log; { std::lock_guard<std::mutex> its_lock(message_statistics_mutex_); - for (const auto& s : message_statistics_) { + for (const auto &s : message_statistics_) { if (s.second.counter_ / (its_interval / 1000) >= its_min_freq) { uint16_t its_subscribed(0); std::shared_ptr<event> its_event = find_event(std::get<0>(s.first), std::get<1>(s.first), std::get<2>(s.first)); @@ -4431,4 +4621,9 @@ void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code co } } +void routing_manager_impl::send_suspend() const { + + stub_->send_suspend(); +} + } // namespace vsomeip_v3 diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index 872067c..06573a5 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -1071,6 +1071,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, << " ~> Skip message!"; return; } + cache_event_payload(its_message); } } #ifdef USE_DLT @@ -1162,34 +1163,35 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, [this, self, its_client, its_service, its_instance, its_eventgroup, its_event, its_subscription_id, its_major] (const bool _subscription_accepted){ + std::uint32_t its_count = 0; if(_subscription_accepted) { send_subscribe_ack(its_client, its_service, its_instance, its_eventgroup, its_event, its_subscription_id); + std::set<event_t> its_already_subscribed_events; + bool inserted = insert_subscription(its_service, its_instance, its_eventgroup, + its_event, VSOMEIP_ROUTING_CLIENT, &its_already_subscribed_events); + if (inserted) { + notify_remote_initially(its_service, its_instance, its_eventgroup, + its_already_subscribed_events); + } +#ifdef VSOMEIP_ENABLE_COMPAT + send_pending_notify_ones(its_service, its_instance, its_eventgroup, its_client, true); +#endif + its_count = get_remote_subscriber_count(its_service, its_instance, its_eventgroup, true); } else { send_subscribe_nack(its_client, its_service, its_instance, its_eventgroup, its_event, its_subscription_id); } - std::set<event_t> its_already_subscribed_events; - bool inserted = insert_subscription(its_service, its_instance, its_eventgroup, - its_event, VSOMEIP_ROUTING_CLIENT, &its_already_subscribed_events); - if (inserted) { - notify_remote_initially(its_service, its_instance, its_eventgroup, - its_already_subscribed_events); - } -#ifdef VSOMEIP_ENABLE_COMPAT - send_pending_notify_ones(its_service, its_instance, its_eventgroup, its_client, true); -#endif - std::uint32_t its_count = get_remote_subscriber_count( - its_service, its_instance, its_eventgroup, true); VSOMEIP_INFO << "SUBSCRIBE(" << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << ":" << std::hex << std::setw(4) << std::setfill('0') << its_event << ":" - << std::dec << (uint16_t)its_major << "]" + << std::dec << (uint16_t)its_major << "] " << (bool)(its_subscription_id != PENDING_SUBSCRIPTION_ID) << " " - << std::dec << its_count; + << (_subscription_accepted ? std::to_string(its_count) : "-") + << (_subscription_accepted ? " ACCEPTED" : " NOT ACCEPTED"); #ifdef VSOMEIP_ENABLE_COMPAT routing_manager_base::erase_incoming_subscription_state(its_client, its_service, its_instance, its_eventgroup, its_event); @@ -1282,7 +1284,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, case VSOMEIP_UNSUBSCRIBE: if (_size != VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE) { - VSOMEIP_WARNING << "Received an UNSUBSCRIBE command with wrong ~> skip!"; + VSOMEIP_WARNING << "Received an UNSUBSCRIBE command with wrong size ~> skip!"; break; } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], @@ -1320,6 +1322,44 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, << std::dec << its_remote_subscriber_count; break; + case VSOMEIP_EXPIRED_SUBSCRIPTION: + if (_size != VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received an VSOMEIP_EXPIRED_SUBSCRIPTION command with wrong size ~> skip!"; + break; + } + std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], + sizeof(its_service)); + std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], + sizeof(its_instance)); + std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4], + sizeof(its_eventgroup)); + std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], + sizeof(its_event)); + std::memcpy(&its_subscription_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8], + sizeof(its_subscription_id)); + host_->on_subscription(its_service, its_instance, its_eventgroup, its_client, its_sender_uid, its_sender_gid, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; }); + if (its_subscription_id == PENDING_SUBSCRIPTION_ID) { + // Local subscriber: withdraw subscription + routing_manager_base::unsubscribe(its_client, its_sender_uid, its_sender_gid, its_service, its_instance, its_eventgroup, its_event); + } else { + // Remote subscriber: withdraw subscription only if no more remote subscriber exists + its_remote_subscriber_count = get_remote_subscriber_count(its_service, + its_instance, its_eventgroup, false); + if (!its_remote_subscriber_count) { + routing_manager_base::unsubscribe(VSOMEIP_ROUTING_CLIENT, ANY_UID, ANY_GID, its_service, + its_instance, its_eventgroup, its_event); + } + } + VSOMEIP_INFO << "UNSUBSCRIBE EXPIRED SUBSCRIPTION(" + << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "." + << std::hex << std::setw(4) << std::setfill('0') << its_event << "] " + << (bool)(its_subscription_id != PENDING_SUBSCRIPTION_ID) << " " + << std::dec << its_remote_subscriber_count; + break; + case VSOMEIP_SUBSCRIBE_NACK: if (_size != VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE) { VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_NACK command with wrong size ~> skip!"; @@ -1538,6 +1578,11 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, } break; } + + case VSOMEIP_SUSPEND: + on_suspend(); // cleanup remote subscribers + break; + default: break; } @@ -1666,6 +1711,10 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data, j += uint32_t(sizeof(minor_version_t)); if (routing_info_entry == routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE) { + if (get_routing_state() == routing_state_e::RS_SUSPENDED) { + VSOMEIP_INFO << "rmp::" <<__func__ << " We are in suspended mode, the service will not be added!"; + return; + } { std::lock_guard<std::mutex> its_lock(known_clients_mutex_); known_clients_.insert(its_client); @@ -1704,6 +1753,14 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data, << std::hex << std::setw(4) << std::setfill('0') << its_service << "." << std::hex << std::setw(4) << std::setfill('0') << its_instance << ":" << std::dec << int(its_major) << "." << std::dec << its_minor << "]"; + + if (its_client == get_client()) { + VSOMEIP_INFO << __func__ + << ": Clearing subscriptions for service [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance << "]"; + unsubscribe_all(its_service, its_instance); + } } its_services_size -= uint32_t(sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t) ); @@ -2652,4 +2709,26 @@ void routing_manager_proxy::on_client_assign_ack(const client_t &_client) { } } +void routing_manager_proxy::on_suspend() { + + VSOMEIP_INFO << __func__ << ": Application " + << std::hex << std::setw(4) << std::setfill('0') + << host_->get_client(); + + std::lock_guard<std::mutex> its_lock(remote_subscriber_count_mutex_); + + // Unsubscribe everything that is left over. + for (const auto &s : remote_subscriber_count_) { + for (const auto &i : s.second) { + for (const auto e : i.second) + routing_manager_base::unsubscribe( + VSOMEIP_ROUTING_CLIENT, ANY_UID, ANY_GID, + s.first, i.first, e.first, ANY_EVENT); + } + } + + // Remove all entries. + remote_subscriber_count_.clear(); +} + } // namespace vsomeip_v3 diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp index 9668352..406b0d9 100644 --- a/implementation/routing/src/routing_manager_stub.cpp +++ b/implementation/routing/src/routing_manager_stub.cpp @@ -399,7 +399,8 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, std::memcpy(&its_subscription_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 10], sizeof(its_subscription_id)); host_->on_subscribe_nack(its_subscriber, its_service, - its_instance, its_eventgroup, its_notifier, its_subscription_id); + its_instance, its_eventgroup, its_notifier, + its_subscription_id, false); VSOMEIP_INFO << "SUBSCRIBE NACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." @@ -946,6 +947,8 @@ void routing_manager_stub::on_stop_offer_service(client_t _client, if (0 == found_service->second.size()) { found_client->second.second.erase(_service); } + inform_provider(_client, _service, _instance, _major, _minor, + routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE); inform_requesters(_client, _service, _instance, _major, _minor, routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE, false); } else if( _major == DEFAULT_MAJOR && _minor == DEFAULT_MINOR) { @@ -953,6 +956,8 @@ void routing_manager_stub::on_stop_offer_service(client_t _client, if (0 == found_service->second.size()) { found_client->second.second.erase(_service); } + inform_provider(_client, _service, _instance, _major, _minor, + routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE); inform_requesters(_client, _service, _instance, _major, _minor, routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE, false); } @@ -1285,16 +1290,9 @@ void routing_manager_stub::distribute_credentials(client_t _hoster, service_t _s for (auto its_requesting_client : service_requests_) { auto its_service = its_requesting_client.second.find(_service); if (its_service != its_requesting_client.second.end()) { - for (auto its_instance : its_service->second) { - if (its_instance.first == ANY_INSTANCE || - its_instance.first == _instance) { - its_requesting_clients.insert(its_requesting_client.first); - } else { - auto found_instance = its_service->second.find(_instance); - if (found_instance != its_service->second.end()) { - its_requesting_clients.insert(its_requesting_client.first); - } - } + if (its_service->second.find(_instance) != its_service->second.end() + || its_service->second.find(ANY_INSTANCE) != its_service->second.end()) { + its_requesting_clients.insert(its_requesting_client.first); } } } @@ -1302,11 +1300,11 @@ void routing_manager_stub::distribute_credentials(client_t _hoster, service_t _s // search for UID / GID linked with the client ID that offers the requested services std::pair<uint32_t, uint32_t> its_uid_gid; if (security::get()->get_client_to_uid_gid_mapping(_hoster, its_uid_gid)) { + its_credentials.insert(its_uid_gid); for (auto its_requesting_client : its_requesting_clients) { std::pair<uint32_t, uint32_t> its_requester_uid_gid; if (security::get()->get_client_to_uid_gid_mapping(its_requesting_client, its_requester_uid_gid)) { if (its_uid_gid != its_requester_uid_gid) { - its_credentials.insert(std::make_pair(std::get<0>(its_uid_gid), std::get<1>(its_uid_gid))); create_client_credentials_info(its_requesting_client); insert_client_credentials_info(its_requesting_client, its_credentials); send_client_credentials_info(its_requesting_client); @@ -1316,20 +1314,27 @@ void routing_manager_stub::distribute_credentials(client_t _hoster, service_t _s } } +void routing_manager_stub::inform_provider(client_t _hoster, service_t _service, + instance_t _instance, major_version_t _major, minor_version_t _minor, + routing_info_entry_e _entry) { + + if (_hoster != VSOMEIP_ROUTING_CLIENT + && _hoster != host_->get_client()) { + create_client_routing_info(_hoster); + insert_client_routing_info(_hoster, _entry, _hoster, + _service, _instance, _major, _minor); + send_client_routing_info(_hoster); + } +}; + void routing_manager_stub::inform_requesters(client_t _hoster, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, routing_info_entry_e _entry, bool _inform_service) { for (auto its_client : service_requests_) { auto its_service = its_client.second.find(_service); if (its_service != its_client.second.end()) { - bool send(false); - for (auto its_instance : its_service->second) { - if (its_instance.first == ANY_INSTANCE || - its_instance.first == _instance) { - send = true; - } - } - if (send) { + if (its_service->second.find(_instance) != its_service->second.end() + || its_service->second.find(ANY_INSTANCE) != its_service->second.end()) { if (_inform_service) { if (_hoster != VSOMEIP_ROUTING_CLIENT && _hoster != host_->get_client()) { @@ -1447,6 +1452,44 @@ bool routing_manager_stub::send_unsubscribe( } } +bool routing_manager_stub::send_expired_subscription( + const std::shared_ptr<endpoint>& _target, + client_t _client, service_t _service, instance_t _instance, + eventgroup_t _eventgroup, event_t _event, + remote_subscription_id_t _id) { + if (_target) { + byte_t its_command[VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE]; + uint32_t its_size = VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE + - VSOMEIP_COMMAND_HEADER_SIZE; + its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_EXPIRED_SUBSCRIPTION; + std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &_client, + sizeof(_client)); + std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, + sizeof(its_size)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, + sizeof(_service)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2], &_instance, + sizeof(_instance)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, + sizeof(_eventgroup)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_event, + sizeof(_event)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_id, + sizeof(_id)); + + return _target->send(its_command, sizeof(its_command)); + } else { + VSOMEIP_WARNING << __func__ << " Couldn't send expired subscription to local client [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "." + << std::hex << std::setw(4) << std::setfill('0') << _event << "]" + << " subscriber: "<< std::hex << std::setw(4) << std::setfill('0') + << _client; + return false; + } +} + void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event) { @@ -2257,7 +2300,7 @@ routing_manager_stub::send_requester_policies(const std::unordered_set<client_t> pending_security_update_id_t its_policy_id; // serialize the policies and send them... - for (const auto& p : _policies) { + for (const auto &p : _policies) { std::vector<byte_t> its_policy_data; if (p->serialize(its_policy_data)) { std::vector<byte_t> its_message; @@ -2569,4 +2612,12 @@ void routing_manager_stub::on_security_update_response( } } +void routing_manager_stub::send_suspend() const { + + static const std::vector<byte_t> its_suspend( + { VSOMEIP_SUSPEND, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }); + + broadcast(its_suspend); +} + } // namespace vsomeip_v3 diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index aba8647..467bca2 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -961,7 +961,7 @@ void application_impl::on_subscription(service_t _service, instance_t _instance, } } - if(handler_found) { + if (handler_found) { if(auto its_handler = its_handlers.first) { // "normal" subscription handler exists _accepted_cb(its_handler(_client, _uid, _gid, _subscribed)); diff --git a/implementation/security/src/policy.cpp b/implementation/security/src/policy.cpp index 9630e5f..260198a 100644 --- a/implementation/security/src/policy.cpp +++ b/implementation/security/src/policy.cpp @@ -305,7 +305,7 @@ policy::serialize(std::vector<byte_t> &_data) const { uint32_t its_requests_size(0); serialize_u32(its_requests_size, _data); - for (const auto& its_request : requests_) { + for (const auto &its_request : requests_) { for (auto its_service = its_request.first.lower(); its_service <= its_request.first.upper(); its_service++) { @@ -316,7 +316,7 @@ policy::serialize(std::vector<byte_t> &_data) const { uint32_t its_instances_size(0); serialize_u32(its_instances_size, _data); - for (const auto& i : its_request.second) { + for (const auto &i : its_request.second) { boost::icl::interval_set<instance_t> its_instances; its_instances.insert(i.first); serialize_interval_set(its_instances, _data); diff --git a/implementation/security/src/security_impl.cpp b/implementation/security/src/security_impl.cpp index 8a3d276..377e310 100644 --- a/implementation/security/src/security_impl.cpp +++ b/implementation/security/src/security_impl.cpp @@ -470,7 +470,7 @@ security_impl::update_security_policy(uint32_t _uid, uint32_t _gid, } if (its_matching_policy) { - for (const auto& r : _policy->requests_) { + for (const auto &r : _policy->requests_) { service_t its_lower, its_upper; get_bounds(r.first, its_lower, its_upper); for (auto s = its_lower; s <= its_upper; s++) { @@ -479,7 +479,7 @@ security_impl::update_security_policy(uint32_t _uid, uint32_t _gid, its_matching_policy->requests_ += std::make_pair(its_service, r.second); } } - for (const auto& o : _policy->offers_) { + for (const auto &o : _policy->offers_) { service_t its_lower, its_upper; get_bounds(o.first, its_lower, its_upper); for (auto s = its_lower; s <= its_upper; s++) { @@ -1081,8 +1081,8 @@ security_impl::get_requester_policies(const std::shared_ptr<policy> _policy, } std::lock_guard<std::mutex> its_lock(_policy->mutex_); - for (const auto& o : _policy->offers_) { - for (const auto& p : its_policies) { + for (const auto &o : _policy->offers_) { + for (const auto &p : its_policies) { if (p == _policy) continue; @@ -1091,7 +1091,7 @@ security_impl::get_requester_policies(const std::shared_ptr<policy> _policy, auto its_policy = std::make_shared<policy>(); its_policy->credentials_ = p->credentials_; - for (const auto& r : p->requests_) { + for (const auto &r : p->requests_) { // o represents an offer by a service interval and its instances // (a set of intervals) // r represents a request by a service interval and its instances @@ -1109,8 +1109,8 @@ security_impl::get_requester_policies(const std::shared_ptr<policy> _policy, auto its_service_min = std::max(its_o_lower, its_r_lower); auto its_service_max = std::min(its_r_upper, its_o_upper); - for (const auto& i : o.second) { - for (const auto& j : r.second) { + for (const auto &i : o.second) { + for (const auto &j : r.second) { for (const auto& k : j.second) { instance_t its_i_lower, its_i_upper, its_k_lower, its_k_upper; get_bounds(i, its_i_lower, its_i_upper); @@ -1151,7 +1151,7 @@ security_impl::get_clients(uid_t _uid, gid_t _gid, std::unordered_set<client_t> &_clients) const { std::lock_guard<std::mutex> its_lock(ids_mutex_); - for (const auto& i : ids_) { + for (const auto &i : ids_) { if (i.second.first == _uid && i.second.second == _gid) _clients.insert(i.first); } diff --git a/implementation/service_discovery/include/service_discovery.hpp b/implementation/service_discovery/include/service_discovery.hpp index 778ce08..77b4258 100644 --- a/implementation/service_discovery/include/service_discovery.hpp +++ b/implementation/service_discovery/include/service_discovery.hpp @@ -45,6 +45,7 @@ public: virtual void unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, client_t _client) = 0; virtual void unsubscribe_all(service_t _service, instance_t _instance) = 0; + virtual void unsubscribe_all_on_suspend() = 0; virtual bool send(bool _is_announcing) = 0; diff --git a/implementation/service_discovery/include/service_discovery_host.hpp b/implementation/service_discovery/include/service_discovery_host.hpp index f864571..0f992d7 100644 --- a/implementation/service_discovery/include/service_discovery_host.hpp +++ b/implementation/service_discovery/include/service_discovery_host.hpp @@ -86,7 +86,7 @@ public: virtual void on_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event, remote_subscription_id_t _subscription_id) = 0; + event_t _event, remote_subscription_id_t _subscription_id, bool _simulated) = 0; virtual std::chrono::steady_clock::time_point expire_subscriptions(bool _force) = 0; diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 81bb91f..c4e3835 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -79,7 +79,8 @@ public: void unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, client_t _client); void unsubscribe_all(service_t _service, instance_t _instance); - void reset_subscriptions(service_t _service, instance_t _instance); + void unsubscribe_all_on_suspend(); + void remove_subscriptions(service_t _service, instance_t _instance); bool send(bool _is_announcing); @@ -239,7 +240,7 @@ private: const std::shared_ptr<endpoint> &_unreliable, boost::asio::ip::address &_address) const; - std::shared_ptr<request> find_request(service_t _service, instance_t _instance); + void update_request(service_t _service, instance_t _instance); void start_offer_debounce_timer(bool _first_start); void on_offer_debounce_timer_expired(const boost::system::error_code &_error); diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index ca4e131..74e509b 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -165,9 +165,10 @@ service_discovery_impl::start() { } } if (endpoint_ && !reliable_) { - // rejoin multicast group - dynamic_cast<udp_server_endpoint_impl*>( - endpoint_.get())->join(sd_multicast_); + auto its_endpoint = std::dynamic_pointer_cast< + udp_server_endpoint_impl>(endpoint_); + if (its_endpoint) + its_endpoint->join(sd_multicast_); } } is_suspended_ = false; @@ -213,17 +214,17 @@ service_discovery_impl::release_service( } } -std::shared_ptr<request> -service_discovery_impl::find_request(service_t _service, instance_t _instance) { +void +service_discovery_impl::update_request(service_t _service, instance_t _instance) { std::lock_guard<std::mutex> its_lock(requested_mutex_); auto find_service = requested_.find(_service); if (find_service != requested_.end()) { auto find_instance = find_service->second.find(_instance); if (find_instance != find_service->second.end()) { - return find_instance->second; + find_instance->second->set_sent_counter( + std::uint8_t(repetitions_max_ + 1)); } } - return nullptr; } void @@ -232,6 +233,13 @@ service_discovery_impl::subscribe( eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, client_t _client, const std::shared_ptr<eventgroupinfo> &_info) { + + if (is_suspended_) { + VSOMEIP_WARNING << "service_discovery::" << __func__ + << ": Ignoring subscription as we are suspended."; + return; + } + #ifdef VSOMEIP_ENABLE_COMPAT bool is_selective(_info ? _info->is_selective() : false); #endif // VSOMEIP_ENABLE_COMPAT @@ -248,8 +256,8 @@ service_discovery_impl::subscribe( if (!its_subscription->is_selective() && is_selective) { its_subscription->set_selective(true); its_subscription->remove_client(VSOMEIP_ROUTING_CLIENT); - for (const auto& e : _info->get_events()) { - for (const auto& c : e->get_subscribers(_eventgroup)) { + for (const auto &e : _info->get_events()) { + for (const auto &c : e->get_subscribers(_eventgroup)) { its_subscription->add_client(c); } } @@ -517,23 +525,65 @@ service_discovery_impl::unsubscribe_all( } void -service_discovery_impl::reset_subscriptions( +service_discovery_impl::unsubscribe_all_on_suspend() { + + std::map<boost::asio::ip::address, + std::vector<std::shared_ptr<message_impl> > > its_stopsubscribes; + + { + std::lock_guard<std::mutex> its_lock(subscribed_mutex_); + for (auto its_service : subscribed_) { + for (auto its_instance : its_service.second) { + for (auto &its_eventgroup : its_instance.second) { + boost::asio::ip::address its_address; + auto its_current_message = std::make_shared<message_impl>(); + auto its_subscription = its_eventgroup.second; + its_subscription->set_ttl(0); + const reliability_type_e its_reliability = + get_eventgroup_reliability(its_service.first, its_instance.first, + its_eventgroup.first, its_subscription); + auto its_data = create_eventgroup_entry(its_service.first, its_instance.first, + its_eventgroup.first, its_subscription, its_reliability); + auto its_reliable = its_subscription->get_endpoint(true); + auto its_unreliable = its_subscription->get_endpoint(false); + get_subscription_address( + its_reliable, its_unreliable, its_address); + if (its_data.entry_ + && its_current_message->add_entry_data(its_data.entry_, its_data.options_)) { + its_stopsubscribes[its_address].push_back(its_current_message); + } else { + VSOMEIP_WARNING << __func__ << ": Failed to create StopSubscribe entry for: " + << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << "." + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup.first + << " address: " << its_address.to_string(); + } + } + its_instance.second.clear(); + } + its_service.second.clear(); + } + subscribed_.clear(); + } + + for (auto its_address : its_stopsubscribes) { + if (!serialize_and_send(its_address.second, its_address.first)) { + VSOMEIP_WARNING << __func__ << ": Failed to send StopSubscribe to address: " + << its_address.first.to_string(); + } + } +} + +void +service_discovery_impl::remove_subscriptions( service_t _service, instance_t _instance) { std::lock_guard<std::mutex> its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - for (auto &its_eventgroup : found_instance->second) { - auto its_subscription = its_eventgroup.second; - for (auto its_client : its_subscription->get_clients()) { - its_subscription->set_state(its_client, - subscription_state_e::ST_UNKNOWN); - } - its_subscription->set_endpoint(nullptr, true); - its_subscription->set_endpoint(nullptr, false); - } + found_service->second.erase(_instance); + if (found_service->second.empty()) { + subscribed_.erase(found_service); } } } @@ -641,16 +691,16 @@ void service_discovery_impl::insert_find_entries( std::vector<std::shared_ptr<message_impl> > &_messages, const requests_t &_requests) { - std::lock_guard<std::mutex> its_lock(requested_mutex_); entry_data_t its_data; its_data.entry_ = its_data.other_ = nullptr; for (const auto& its_service : _requests) { for (const auto& its_instance : its_service.second) { + std::lock_guard<std::mutex> its_lock(requested_mutex_); auto its_request = its_instance.second; - // check if release_service was called + // check if release_service was called / offer was received auto the_service = requested_.find(its_service.first); if ( the_service != requested_.end() ) { auto the_instance = the_service->second.find(its_instance.first); @@ -722,17 +772,37 @@ service_discovery_impl::create_eventgroup_entry( case reliability_type_e::RT_RELIABLE: if (its_reliable_endpoint) { insert_reliable = true; + } else { + VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " + "reliable endpoint is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } break; case reliability_type_e::RT_UNRELIABLE: if (its_unreliable_endpoint) { insert_unreliable = true; + } else { + VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " + "unreliable endpoint is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; } break; case reliability_type_e::RT_BOTH: if (its_reliable_endpoint && its_unreliable_endpoint) { insert_reliable = true; insert_unreliable = true; + } else { + VSOMEIP_WARNING << __func__ << ": Cannot create subscription as " + "endpoint is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " reliable: " << !!its_reliable_endpoint + << " unreliable: " << !!its_unreliable_endpoint; } break; default: @@ -817,7 +887,6 @@ service_discovery_impl::create_eventgroup_entry( its_entry->set_counter(0); its_entry->set_major_version(_subscription->get_major()); its_entry->set_ttl(_subscription->get_ttl()); - its_data.entry_ = its_entry; } @@ -1008,14 +1077,18 @@ service_discovery_impl::on_message( } const bool received_via_mcast = (_destination == sd_multicast_address_); if (received_via_mcast) { - std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_); + static bool must_start_last_msg_received_timer(true); boost::system::error_code ec; - last_msg_received_timer_.cancel(ec); - last_msg_received_timer_.expires_from_now( - last_msg_received_timer_timeout_, ec); - last_msg_received_timer_.async_wait( - std::bind(&service_discovery_impl::on_last_msg_received_timer_expired, - shared_from_this(), std::placeholders::_1)); + + std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_); + if (0 < last_msg_received_timer_.cancel(ec) || must_start_last_msg_received_timer) { + must_start_last_msg_received_timer = false; + last_msg_received_timer_.expires_from_now( + last_msg_received_timer_timeout_, ec); + last_msg_received_timer_.async_wait( + std::bind(&service_discovery_impl::on_last_msg_received_timer_expired, + shared_from_this(), std::placeholders::_1)); + } } current_remote_address_ = _sender; @@ -1256,17 +1329,15 @@ service_discovery_impl::process_serviceentry( default: VSOMEIP_ERROR << __func__ << ": Unsupported service entry type"; } - } else if (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_) { - std::shared_ptr<request> its_request = find_request(its_service, its_instance); - if (its_request) { - std::lock_guard<std::mutex> its_lock(requested_mutex_); - // ID: SIP_SD_830 - its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); - } + } else if (its_type != entry_type_e::FIND_SERVICE + && (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_)) { + // stop sending find service in repetition phase + update_request(its_service, its_instance); + remove_remote_offer_type(its_service, its_instance, its_reliable_address, its_reliable_port, its_unreliable_address, its_unreliable_port); - reset_subscriptions(its_service, its_instance); + remove_subscriptions(its_service, its_instance); if (!is_diagnosis_ && !is_suspended_) { host_->del_routing_info(its_service, its_instance, (its_reliable_port != ILLEGAL_PORT), @@ -1304,11 +1375,9 @@ service_discovery_impl::process_offerservice_serviceentry( return; } - std::shared_ptr<request> its_request = find_request(_service, _instance); - if (its_request) { - std::lock_guard<std::mutex> its_lock(requested_mutex_); - its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); - } + // stop sending find service in repetition phase + update_request(_service, _instance); + remote_offer_type_e offer_type(remote_offer_type_e::UNKNOWN); if (_reliable_port != ILLEGAL_PORT && _unreliable_port != ILLEGAL_PORT @@ -1430,17 +1499,9 @@ service_discovery_impl::process_offerservice_serviceentry( } } - host_->add_routing_info(_service, _instance, - _major, _minor, - _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_), - _reliable_address, _reliable_port, - _unreliable_address, _unreliable_port); // No need to resubscribe for unicast offers if (_received_via_mcast) { - std::int32_t its_remaining = VSOMEIP_MAX_UDP_MESSAGE_SIZE; - its_remaining -= _resubscribes.back()->get_size(); - std::lock_guard<std::mutex> its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { @@ -1482,6 +1543,12 @@ service_discovery_impl::process_offerservice_serviceentry( } } } + + host_->add_routing_info(_service, _instance, + _major, _minor, + _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_), + _reliable_address, _reliable_port, + _unreliable_address, _unreliable_port); } void @@ -1616,7 +1683,7 @@ service_discovery_impl::on_endpoint_connected( its_subscription->set_endpoint(its_reliable, true); its_subscription->set_endpoint(its_unreliable, false); - for (const auto& its_client : its_subscription->get_clients()) + for (const auto its_client : its_subscription->get_clients()) its_subscription->set_state(its_client, subscription_state_e::ST_NOT_ACKNOWLEDGED); @@ -1726,7 +1793,7 @@ service_discovery_impl::process_eventgroupentry( // We received a subscription for a non-existing eventgroup. // --> Create dummy eventgroupinfo to send Nack. its_info = std::make_shared<eventgroupinfo>(its_service, its_instance, - its_eventgroup, its_major, its_ttl); + its_eventgroup, its_major, its_ttl, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS); boost::system::error_code ec; VSOMEIP_ERROR << __func__ << ": Received a SubscribeEventGroup entry for unknown eventgroup " @@ -1743,6 +1810,16 @@ service_discovery_impl::process_eventgroupentry( // We received a subscription [n]ack for an eventgroup that does not exist. // --> Remove subscription. unsubscribe(its_service, its_instance, its_eventgroup, VSOMEIP_ROUTING_CLIENT); + + boost::system::error_code ec; + VSOMEIP_WARNING << __func__ + << ": Received a SubscribeEventGroup[N]Ack entry for unknown eventgroup " + << " from: " << its_sender.to_string(ec) << " for: [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup + << "] session: " << std::hex << std::setw(4) << std::setfill('0') + << its_session << ", ttl: " << its_ttl; } return; } @@ -2226,7 +2303,7 @@ service_discovery_impl::handle_eventgroup_subscription( if (_major != _info->get_major()) { // Create a temporary info object with TTL=0 --> send NACK auto its_info = std::make_shared<eventgroupinfo>(_service, _instance, - _eventgroup, _major, 0); + _eventgroup, _major, 0, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS); boost::system::error_code ec; // TODO: Add session id VSOMEIP_ERROR << __func__ @@ -2325,42 +2402,44 @@ service_discovery_impl::handle_eventgroup_subscription( } } - // Create subscription object - auto its_subscription = std::make_shared<remote_subscription>(); - its_subscription->set_eventgroupinfo(_info); - its_subscription->set_subscriber(its_subscriber); - its_subscription->set_reliable(its_reliable); - its_subscription->set_unreliable(its_unreliable); - its_subscription->reset(_clients); + if (its_subscriber) { + // Create subscription object + auto its_subscription = std::make_shared<remote_subscription>(); + its_subscription->set_eventgroupinfo(_info); + its_subscription->set_subscriber(its_subscriber); + its_subscription->set_reliable(its_reliable); + its_subscription->set_unreliable(its_unreliable); + its_subscription->reset(_clients); - if (_ttl == 0) { // --> unsubscribe - its_subscription->set_ttl(0); - if (!_is_stop_subscribe_subscribe) { - { - std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); - pending_remote_subscriptions_[its_subscription] = _acknowledgement; - _acknowledgement->add_subscription(its_subscription); + if (_ttl == 0) { // --> unsubscribe + its_subscription->set_ttl(0); + if (!_is_stop_subscribe_subscribe) { + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + pending_remote_subscriptions_[its_subscription] = _acknowledgement; + _acknowledgement->add_subscription(its_subscription); + } + host_->on_remote_unsubscribe(its_subscription); } - host_->on_remote_unsubscribe(its_subscription); + return; } - return; - } - if (_force_initial_events) { - its_subscription->set_force_initial_events(true); - } - its_subscription->set_ttl(_ttl - * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_)); + if (_force_initial_events) { + its_subscription->set_force_initial_events(true); + } + its_subscription->set_ttl(_ttl + * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_)); - { - std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); - pending_remote_subscriptions_[its_subscription] = _acknowledgement; - _acknowledgement->add_subscription(its_subscription); - } + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + pending_remote_subscriptions_[its_subscription] = _acknowledgement; + _acknowledgement->add_subscription(its_subscription); + } - host_->on_remote_subscribe(its_subscription, - std::bind(&service_discovery_impl::update_remote_subscription, - shared_from_this(), std::placeholders::_1)); + host_->on_remote_subscribe(its_subscription, + std::bind(&service_discovery_impl::update_remote_subscription, + shared_from_this(), std::placeholders::_1)); + } } void @@ -2380,7 +2459,7 @@ service_discovery_impl::handle_eventgroup_subscription_nack( for (const auto& its_client : _clients) { host_->on_subscribe_nack(its_client, _service, _instance, _eventgroup, ANY_EVENT, - PENDING_SUBSCRIPTION_ID); // TODO: This is a dummy call... + PENDING_SUBSCRIPTION_ID, false); // TODO: This is a dummy call... } @@ -3339,6 +3418,7 @@ service_discovery_impl::get_ttl_factor( void service_discovery_impl::on_last_msg_received_timer_expired( const boost::system::error_code &_error) { + if (!_error) { // We didn't receive a multicast message within 110% of the cyclic_offer_delay_ VSOMEIP_WARNING << "Didn't receive a multicast SD message for " << @@ -3346,8 +3426,10 @@ service_discovery_impl::on_last_msg_received_timer_expired( // Rejoin multicast group if (endpoint_ && !reliable_) { - dynamic_cast<udp_server_endpoint_impl*>( - endpoint_.get())->join(sd_multicast_); + auto its_endpoint = std::dynamic_pointer_cast< + udp_server_endpoint_impl>(endpoint_); + if (its_endpoint) + its_endpoint->join(sd_multicast_); } { boost::system::error_code ec; |