diff options
author | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:05 -0800 |
---|---|---|
committer | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:05 -0800 |
commit | 79fd5f7a34ed33392f71fa914a60b2e68b28de68 (patch) | |
tree | 5ea93513d0173ffe6dea57545cc5b28db591f082 | |
parent | 5c43d511bd5b5e15eca521c4c71dfa69c6f1c90f (diff) | |
download | vSomeIP-79fd5f7a34ed33392f71fa914a60b2e68b28de68.tar.gz |
vsomeip 2.10.02.10.0
86 files changed, 2838 insertions, 981 deletions
@@ -1,6 +1,13 @@ Changes ======= +v2.10.0 +- Add register_async_subscription_handler to application interface +- Ensure faster stopping of UDP and TCP endpoints +- StopSubscribe eventgroup entries of StopSubscribe/Subscribe + eventgroup entry sequences in incoming SD messages are now + completely handled in the service discovery module + v2.9.5 - Change magic cookie behaviour to only send a magic cookie every 10 seconds instead of in front of every SOME/IP message diff --git a/CMakeLists.txt b/CMakeLists.txt index 14d2b2b..39f583c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,8 +7,8 @@ cmake_minimum_required (VERSION 2.8.12) project (vsomeip) set (VSOMEIP_MAJOR_VERSION 2) -set (VSOMEIP_MINOR_VERSION 9) -set (VSOMEIP_PATCH_VERSION 5) +set (VSOMEIP_MINOR_VERSION 10) +set (VSOMEIP_PATCH_VERSION 0) set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION}) set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in set (CMAKE_VERBOSE_MAKEFILE off) @@ -447,6 +447,25 @@ if((${TEST_IP_MASTER} STREQUAL ${TEST_IP_DEFAULT_VALUE}) OR "-DTEST_IP_MASTER=10.0.3.1 -DTEST_IP_SLAVE=10.0.3.125") endif() +SET(TEST_UID_DEFAULT_VALUE "123456789") +SET(TEST_UID "${TEST_UID_DEFAULT_VALUE}" CACHE STRING + "The User ID of the user running the test: Needed for security") +SET(TEST_GID_DEFAULT_VALUE "123456789") +SET(TEST_GID "${TEST_GID_DEFAULT_VALUE}" CACHE STRING + "The Group ID of the user running the test: Needed for security") + +SET(TEST_SECURITY "ON" CACHE BOOL + "Controls whether security tests should run or not") + +if((${TEST_UID} STREQUAL ${TEST_UID_DEFAULT_VALUE}) OR + (${TEST_GID} STREQUAL ${TEST_GID_DEFAULT_VALUE})) + message(WARNING "TEST_UID and/or TEST_GID isn't set. " + "Security Tests are not runnable " + "Please specify them for example " + "-DTEST_UID=1000 -DTEST_GID=1000") + SET(TEST_SECURITY "OFF") +endif() + add_custom_target(build_tests) add_dependencies(build_tests vsomeip) add_dependencies(build_tests vsomeip-sd) diff --git a/daemon/vsomeipd.cpp b/daemon/vsomeipd.cpp index 0cff747..37f22a0 100644 --- a/daemon/vsomeipd.cpp +++ b/daemon/vsomeipd.cpp @@ -22,11 +22,14 @@ #endif static std::shared_ptr<vsomeip::application> its_application; + +#ifndef VSOMEIP_ENABLE_SIGNAL_HANDLING static vsomeip::routing_state_e routing_state = vsomeip::routing_state_e::RS_RUNNING; static bool stop_application = false; static bool stop_sighandler = false; static std::condition_variable_any sighandler_condition; static std::recursive_mutex sighandler_mutex; +#endif #ifndef VSOMEIP_ENABLE_SIGNAL_HANDLING /* diff --git a/documentation/vsomeipUserGuide b/documentation/vsomeipUserGuide index 8b2e498..b0fbb14 100644 --- a/documentation/vsomeipUserGuide +++ b/documentation/vsomeipUserGuide @@ -572,6 +572,36 @@ service instance. The list of client ports to be used for unreliable (UDP) communication to the given service instance. ++ +Additionally there is the possibility to configure mappings between ranges of client +ports and ranges of remote service ports. +(If a client port is configured for a specific service / instance, the port range mapping is ignored) + +** `reliable_remote_ports` ++ +Specifies a range of reliable remote service ports + +** `unreliable_remote_ports` ++ +Specifies a range of unreliable remote service ports + +** `reliable_client_ports` ++ +Specifies the range of reliable client ports to be mapped to the reliable_remote_ports range + +** `unreliable_client_ports` ++ +Specifies the range of unreliable client ports to be mapped to the unreliable_remote_ports range + + +** `first` ++ +Specifies the lower bound of a port range + +** `last` ++ +Specifies the upper bound of a port range + * `payload-sizes` (array) + Array to limit the maximum allowed payload sizes per IP and port. If not diff --git a/implementation/configuration/include/client.hpp b/implementation/configuration/include/client.hpp index 2ad9e1f..2b48a98 100644 --- a/implementation/configuration/include/client.hpp +++ b/implementation/configuration/include/client.hpp @@ -16,10 +16,14 @@ namespace vsomeip { namespace cfg { struct client { + // ports for specific service / instance service_t service_; instance_t instance_; - std::map<bool, std::set<uint16_t> > ports_; + + // client port ranges mapped to remote port ranges + std::map<bool, std::pair<uint16_t, uint16_t> > remote_ports_; + std::map<bool, std::pair<uint16_t, uint16_t> > client_ports_; }; } // namespace cfg diff --git a/implementation/configuration/include/configuration.hpp b/implementation/configuration/include/configuration.hpp index 85b4019..22ae6c3 100644 --- a/implementation/configuration/include/configuration.hpp +++ b/implementation/configuration/include/configuration.hpp @@ -63,9 +63,9 @@ public: virtual bool is_someip(service_t _service, instance_t _instance) const = 0; - virtual bool get_client_port( - service_t _service, instance_t _instance, bool _reliable, - std::map<bool, std::set<uint16_t> > &_used, uint16_t &_port) const = 0; + virtual bool get_client_port(service_t _service, instance_t _instance, + uint16_t _remote_port, bool _reliable, + std::map<bool, std::set<uint16_t> > &_used_client_ports, uint16_t &_client_port) const = 0; virtual std::set<std::pair<service_t, instance_t> > get_remote_services() const = 0; diff --git a/implementation/configuration/include/configuration_impl.hpp b/implementation/configuration/include/configuration_impl.hpp index 1775f00..4f8886a 100644 --- a/implementation/configuration/include/configuration_impl.hpp +++ b/implementation/configuration/include/configuration_impl.hpp @@ -11,6 +11,7 @@ #include <mutex> #include <vector> #include <unordered_set> +#include <list> #include <boost/property_tree/ptree.hpp> @@ -76,8 +77,9 @@ public: VSOMEIP_EXPORT bool is_someip(service_t _service, instance_t _instance) const; - VSOMEIP_EXPORT bool get_client_port(service_t _service, instance_t _instance, bool _reliable, - std::map<bool, std::set<uint16_t> > &_used, uint16_t &_port) const; + VSOMEIP_EXPORT bool get_client_port(service_t _service, instance_t _instance, + uint16_t _remote_port, bool _reliable, + std::map<bool, std::set<uint16_t> > &_used_client_ports, uint16_t &_client_port) const; VSOMEIP_EXPORT const std::string & get_routing_host() const; @@ -201,7 +203,9 @@ private: void load_clients(const element &_element); void load_client(const boost::property_tree::ptree &_tree); + std::set<uint16_t> load_client_ports(const boost::property_tree::ptree &_tree); + std::pair<uint16_t, uint16_t> load_client_port_range(const boost::property_tree::ptree &_tree); void load_watchdog(const element &_element); @@ -212,7 +216,8 @@ private: void load_policy(const boost::property_tree::ptree &_tree); servicegroup *find_servicegroup(const std::string &_name) const; - std::shared_ptr<client> find_client(service_t _service, instance_t _instance) const; + std::shared_ptr<client> find_client(service_t _service, + instance_t _instance, uint16_t _remote_port, bool _reliable) const; std::shared_ptr<service> find_service(service_t _service, instance_t _instance) const; std::shared_ptr<eventgroup> find_eventgroup(service_t _service, instance_t _instance, eventgroup_t _eventgroup) const; @@ -222,6 +227,7 @@ private: bool is_mandatory(const std::string &_name) const; bool is_remote(std::shared_ptr<service> _service) const; bool is_internal_service(service_t _service, instance_t _instance) const; + bool is_in_port_range(uint16_t _port, std::pair<uint16_t, uint16_t> _port_range) const; void set_mandatory(const std::string &_input); void trim(std::string &_s); @@ -256,9 +262,7 @@ protected: std::map<instance_t, std::shared_ptr<service> > > services_; - std::map<service_t, - std::map<instance_t, - std::shared_ptr<client> > > clients_; + std::list< std::shared_ptr<client> > clients_; std::string routing_host_; diff --git a/implementation/configuration/include/e2e.hpp b/implementation/configuration/include/e2e.hpp index 349c6f4..6f7bcda 100644 --- a/implementation/configuration/include/e2e.hpp +++ b/implementation/configuration/include/e2e.hpp @@ -3,8 +3,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef CONFIGURATION_INCLUDE_E2E_HPP_ -#define CONFIGURATION_INCLUDE_E2E_HPP_ +#ifndef VSOMEIP_CFG_E2E_HPP_ +#define VSOMEIP_CFG_E2E_HPP_ #include <string> #include <vector> @@ -14,7 +14,6 @@ namespace vsomeip { namespace cfg { - struct e2e { e2e() : @@ -64,8 +63,7 @@ struct e2e { uint16_t counter_offset; }; +} // namespace cfg +} // namespace vsomeip - -} -} -#endif /* CONFIGURATION_INCLUDE_E2E_HPP_ */ +#endif // VSOMEIP_CFG_E2E_HPP_ diff --git a/implementation/configuration/include/internal.hpp.in b/implementation/configuration/include/internal.hpp.in index 6972759..682dbcc 100644 --- a/implementation/configuration/include/internal.hpp.in +++ b/implementation/configuration/include/internal.hpp.in @@ -106,9 +106,9 @@ #define VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE 17 #define VSOMEIP_RELEASE_SERVICE_COMMAND_SIZE 11 #define VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE 16 -#define VSOMEIP_SUBSCRIBE_COMMAND_SIZE 18 -#define VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE 17 -#define VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE 17 +#define VSOMEIP_SUBSCRIBE_COMMAND_SIZE 19 +#define VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE 19 +#define VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE 19 #define VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE 16 #define VSOMEIP_REGISTER_EVENT_COMMAND_SIZE 15 #define VSOMEIP_UNREGISTER_EVENT_COMMAND_SIZE 14 diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp index 706f389..ea14755 100644 --- a/implementation/configuration/src/configuration_impl.cpp +++ b/implementation/configuration/src/configuration_impl.cpp @@ -245,6 +245,9 @@ bool configuration_impl::load(const std::string &_name) { if (policy_enabled_ && check_credentials_) VSOMEIP_INFO << "Security configuration is active."; + if (policy_enabled_ && !check_credentials_) + VSOMEIP_INFO << "Security configuration is active but in audit mode (allow all)"; + is_loaded_ = true; return is_loaded_; @@ -1249,16 +1252,26 @@ void configuration_impl::load_clients(const element &_element) { void configuration_impl::load_client(const boost::property_tree::ptree &_tree) { try { - bool is_loaded(true); - std::shared_ptr<client> its_client(std::make_shared<client>()); + its_client->remote_ports_[true] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT); + its_client->remote_ports_[false] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT); + its_client->client_ports_[true] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT); + its_client->client_ports_[false] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT); for (auto i = _tree.begin(); i != _tree.end(); ++i) { std::string its_key(i->first); std::string its_value(i->second.data()); std::stringstream its_converter; - if (its_key == "reliable") { + if (its_key == "reliable_remote_ports") { + its_client->remote_ports_[true] = load_client_port_range(i->second); + } else if (its_key == "unreliable_remote_ports") { + its_client->remote_ports_[false] = load_client_port_range(i->second); + } else if (its_key == "reliable_client_ports") { + its_client->client_ports_[true] = load_client_port_range(i->second); + } else if (its_key == "unreliable_client_ports") { + its_client->client_ports_[false] = load_client_port_range(i->second); + } else if (its_key == "reliable") { its_client->ports_[true] = load_client_ports(i->second); } else if (its_key == "unreliable") { its_client->ports_[false] = load_client_ports(i->second); @@ -1277,22 +1290,7 @@ void configuration_impl::load_client(const boost::property_tree::ptree &_tree) { } } } - - auto found_service = clients_.find(its_client->service_); - if (found_service != clients_.end()) { - auto found_instance = found_service->second.find( - its_client->instance_); - if (found_instance != found_service->second.end()) { - VSOMEIP_ERROR << "Multiple client configurations for service [" - << std::hex << its_client->service_ << "." - << its_client->instance_ << "]"; - is_loaded = false; - } - } - - if (is_loaded) { - clients_[its_client->service_][its_client->instance_] = its_client; - } + clients_.push_back(its_client); } catch (...) { } } @@ -1316,6 +1314,40 @@ std::set<uint16_t> configuration_impl::load_client_ports( return its_ports; } +std::pair<uint16_t,uint16_t> configuration_impl::load_client_port_range( + const boost::property_tree::ptree &_tree) { + std::pair<uint16_t,uint16_t> its_port_range; + uint16_t its_first_port = ILLEGAL_PORT; + uint16_t its_last_port = ILLEGAL_PORT; + + for (auto i = _tree.begin(); i != _tree.end(); ++i) { + std::string its_key(i->first); + std::string its_value(i->second.data()); + std::stringstream its_converter; + + if (its_value.size() > 1 && its_value[0] == '0' && its_value[1] == 'x') { + its_converter << std::hex << its_value; + } else { + its_converter << std::dec << its_value; + } + + if (its_key == "first") { + its_converter >> its_first_port; + } else if (its_key == "last") { + its_converter >> its_last_port; + } + } + + if (its_last_port < its_first_port) { + VSOMEIP_WARNING << "Port range invalid: first: " << std::dec << its_first_port << " last: " << its_last_port; + its_port_range = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT); + } else { + its_port_range = std::make_pair(its_first_port, its_last_port); + } + + return its_port_range; +} + void configuration_impl::load_watchdog(const element &_element) { try { auto its_service_discovery = _element.tree_.get_child("watchdog"); @@ -1763,6 +1795,16 @@ bool configuration_impl::is_internal_service(service_t _service, return false; } +bool configuration_impl::is_in_port_range(uint16_t _port, + std::pair<uint16_t, uint16_t> _port_range) const { + + if (_port >= _port_range.first && + _port <= _port_range.second ) { + return true; + } + return false; +} + /////////////////////////////////////////////////////////////////////////////// // Public interface /////////////////////////////////////////////////////////////////////////////// @@ -1850,27 +1892,51 @@ bool configuration_impl::is_someip(service_t _service, } bool configuration_impl::get_client_port( - service_t _service, instance_t _instance, bool _reliable, - std::map<bool, std::set<uint16_t> > &_used, - uint16_t &_port) const { - _port = ILLEGAL_PORT; - auto its_client = find_client(_service, _instance); + service_t _service, instance_t _instance, + uint16_t _remote_port, bool _reliable, + std::map<bool, std::set<uint16_t> > &_used_client_ports, + uint16_t &_client_port) const { + + _client_port = ILLEGAL_PORT; + auto its_client = find_client(_service, _instance, _remote_port, _reliable); // If no client ports are configured, return true - if (!its_client || its_client->ports_[_reliable].empty()) { + if (!its_client || + (its_client->ports_[_reliable].empty() + && its_client->client_ports_[_reliable].first == ILLEGAL_PORT)) { return true; } - for (auto its_port : its_client->ports_[_reliable]) { - // Found free configured port - if (_used[_reliable].find(its_port) == _used[_reliable].end()) { - _port = its_port; - return true; + // specific ports to service / instance are prioritized + if (!its_client->ports_[_reliable].empty()) { + for (auto its_port : its_client->ports_[_reliable]) { + // Found free configured port + if (_used_client_ports[_reliable].find(its_port) == _used_client_ports[_reliable].end()) { + _client_port = its_port; + return true; + } + } + } + + // If no client port ranges are configured use auto port assignment + if (its_client->client_ports_[_reliable].first != ILLEGAL_PORT + && its_client->client_ports_[_reliable].second != ILLEGAL_PORT) { + // look for free port in configured client range + for (uint16_t its_port = its_client->client_ports_[_reliable].first; + its_port <= its_client->client_ports_[_reliable].second; its_port++ ) { + if (_used_client_ports[_reliable].find(its_port) == _used_client_ports[_reliable].end()) { + _client_port = its_port; + return true; + } } } // Configured ports do exist, but they are all in use - VSOMEIP_ERROR << "Cannot find free client port!"; + VSOMEIP_ERROR << "Cannot find free client port for communication to service: " + << _service << " instance: " + << _instance << " remote_port: " + << _remote_port << " reliable: " + << _reliable; return false; } @@ -2053,13 +2119,19 @@ uint8_t configuration_impl::get_threshold(service_t _service, } std::shared_ptr<client> configuration_impl::find_client(service_t _service, - instance_t _instance) const { + instance_t _instance, uint16_t _remote_port, bool _reliable) const { std::shared_ptr<client> its_client; - auto find_service = clients_.find(_service); - if (find_service != clients_.end()) { - auto find_instance = find_service->second.find(_instance); - if (find_instance != find_service->second.end()) { - its_client = find_instance->second; + std::list<std::shared_ptr<client>>::const_iterator it; + + for (it = clients_.begin(); it != clients_.end(); ++it){ + // client was configured for specific service / instance + if ((*it)->service_ == _service + && (*it)->instance_ == _instance) { + its_client = *it; + break; + } else if (is_in_port_range(_remote_port, (*it)->remote_ports_[_reliable])) { + its_client = *it; + break; } } return its_client; @@ -2274,6 +2346,13 @@ bool configuration_impl::check_credentials(client_t _client, uint32_t _uid, return true; } } + + if (!check_credentials_) { + VSOMEIP_INFO << "vSomeIP Security: Check credentials failed for client 0x" + << std::hex << _client << " with UID/GID=" << std::dec << _uid + << "/" << _gid << " but will be allowed due to audit mode is active!"; + } + return !check_credentials_; } @@ -2284,6 +2363,12 @@ bool configuration_impl::is_client_allowed(client_t _client, service_t _service, } auto its_client = policies_.find(_client); if (its_client == policies_.end()) { + if (!check_credentials_) { + VSOMEIP_INFO << "vSomeIP Security: Client 0x" << std::hex << _client + << " isn't allowed to communicate with service/instance " + << _service << "/" << _instance + << " but will be allowed due to audit mode is active!"; + } return !check_credentials_; } @@ -2301,6 +2386,13 @@ bool configuration_impl::is_client_allowed(client_t _client, service_t _service, } } + if (!check_credentials_) { + VSOMEIP_INFO << "vSomeIP Security: Client 0x" << std::hex << _client + << " isn't allowed to communicate with service/instance " + << _service << "/" << _instance + << " but will be allowed due to audit mode is active!"; + } + return !check_credentials_; } @@ -2311,6 +2403,12 @@ bool configuration_impl::is_offer_allowed(client_t _client, service_t _service, } auto its_client = policies_.find(_client); if (its_client == policies_.end()) { + if (!check_credentials_) { + VSOMEIP_INFO << "vSomeIP Security: Client 0x" << std::hex << _client + << " isn't allowed to offer service/instance " + << _service << "/" << _instance + << " but will be allowed due to audit mode is active!"; + } return !check_credentials_; } @@ -2328,6 +2426,13 @@ bool configuration_impl::is_offer_allowed(client_t _client, service_t _service, } } + if (!check_credentials_) { + VSOMEIP_INFO << "vSomeIP Security: Client 0x" << std::hex << _client + << " isn't allowed to offer service/instance " + << _service << "/" << _instance + << " but will be allowed due to audit mode is active!"; + } + return !check_credentials_; } diff --git a/implementation/e2e_protection/include/buffer/buffer.hpp b/implementation/e2e_protection/include/buffer/buffer.hpp index b92d409..5ddcf2e 100644 --- a/implementation/e2e_protection/include/buffer/buffer.hpp +++ b/implementation/e2e_protection/include/buffer/buffer.hpp @@ -3,45 +3,46 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef BUFFER_BUFFER_HPP -#define BUFFER_BUFFER_HPP +#ifndef VSOMEIP_E2E_BUFFER_HPP +#define VSOMEIP_E2E_BUFFER_HPP #include <stdexcept> #include <cstdint> #include <ostream> #include <vector> -namespace buffer { +namespace vsomeip { using e2e_buffer = std::vector<uint8_t>; class buffer_view { public: - buffer_view(const uint8_t *_data_ptr, size_t _data_length) : data_ptr(_data_ptr), data_length(_data_length) { - + buffer_view(const uint8_t *_data_ptr, size_t _data_length) + : data_ptr_(_data_ptr), data_length_(_data_length) { } - buffer_view(const buffer::e2e_buffer &_buffer) : data_ptr(_buffer.data()), data_length(_buffer.size()) {} - - buffer_view(const buffer::e2e_buffer &_buffer, size_t _length) : data_ptr(_buffer.data()), data_length(_length) { + buffer_view(const e2e_buffer &_buffer) + : data_ptr_(_buffer.data()), data_length_(_buffer.size()) {} + buffer_view(const e2e_buffer &_buffer, size_t _length) + : data_ptr_(_buffer.data()), data_length_(_length) { } - buffer_view(const buffer::e2e_buffer &_buffer, size_t _begin, size_t _end) - : data_ptr(_buffer.data() + _begin), data_length(_end - _begin) { - + buffer_view(const e2e_buffer &_buffer, size_t _begin, size_t _end) + : data_ptr_(_buffer.data() + _begin), data_length_(_end - _begin) { } - const uint8_t *begin(void) const { return data_ptr; } + const uint8_t *begin(void) const { return data_ptr_; } - const uint8_t *end(void) const { return data_ptr + data_length; } + const uint8_t *end(void) const { return data_ptr_ + data_length_; } private: - const uint8_t *data_ptr; - size_t data_length; + const uint8_t *data_ptr_; + size_t data_length_; }; -} -std::ostream &operator<<(std::ostream &_os, const buffer::e2e_buffer &_buffer); +std::ostream &operator<<(std::ostream &_os, const e2e_buffer &_buffer); + +} // namespace vsomeip -#endif +#endif // VSOMEIP_E2E_BUFFER_HPP diff --git a/implementation/e2e_protection/include/crc/crc.hpp b/implementation/e2e_protection/include/crc/crc.hpp index 8e393ea..1a54312 100644 --- a/implementation/e2e_protection/include/crc/crc.hpp +++ b/implementation/e2e_protection/include/crc/crc.hpp @@ -3,28 +3,30 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef CRC_CRC_HPP -#define CRC_CRC_HPP +#ifndef VSOMEIP_E2E_CRC_HPP +#define VSOMEIP_E2E_CRC_HPP #include <cstdint> #include "../buffer/buffer.hpp" -namespace crc { +namespace vsomeip { + class e2e_crc { public: - static uint8_t calculate_profile_01(buffer::buffer_view _buffer_view, + static uint8_t calculate_profile_01(buffer_view _buffer_view, const uint8_t _start_value = 0x00U); - static uint32_t calculate_profile_04(buffer::buffer_view _buffer_view, + static uint32_t calculate_profile_04(buffer_view _buffer_view, const uint32_t _start_value = 0x00000000U); - static uint32_t calculate_profile_custom(buffer::buffer_view _buffer_view); + static uint32_t calculate_profile_custom(buffer_view _buffer_view); private: - static const uint8_t lookup_table_profile_01[256]; - static const uint32_t lookup_table_profile_04[256]; - static const uint32_t lookup_table_profile_custom[256]; + static const uint8_t lookup_table_profile_01_[256]; + static const uint32_t lookup_table_profile_04_[256]; + static const uint32_t lookup_table_profile_custom_[256]; }; -} -#endif +} // namespace vsomeip + +#endif // VSOMEIP_E2E_CRC_HPP diff --git a/implementation/e2e_protection/include/e2e/profile/profile01/checker.hpp b/implementation/e2e_protection/include/e2e/profile/profile01/checker.hpp index bc97f0d..5fa243a 100644 --- a/implementation/e2e_protection/include/e2e/profile/profile01/checker.hpp +++ b/implementation/e2e_protection/include/e2e/profile/profile01/checker.hpp @@ -3,35 +3,36 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef E2E_PROFILE_PROFILE01_CHECKER_HPP -#define E2E_PROFILE_PROFILE01_CHECKER_HPP +#ifndef VSOMEIP_E2E_PROFILE01_CHECKER_HPP +#define VSOMEIP_E2E_PROFILE01_CHECKER_HPP #include "../profile01/profile_01.hpp" #include "../profile_interface/checker.hpp" +namespace vsomeip { namespace e2e { -namespace profile { namespace profile01 { -class profile_01_checker final : public e2e::profile::profile_interface::checker { +class profile_01_checker final : public e2e::profile_interface::checker { public: profile_01_checker(void) = delete; // [SWS_E2E_00389] initialize state - explicit profile_01_checker(const Config &_config) : - config(_config) {} + explicit profile_01_checker(const profile_config &_config) : + config_(_config) {} - virtual void check(const buffer::e2e_buffer &_buffer, - e2e::profile::profile_interface::generic_check_status &_generic_check_status) override final; + virtual void check(const e2e_buffer &_buffer, + e2e::profile_interface::generic_check_status &_generic_check_status) override final; private: - Config config; - std::mutex check_mutex; + profile_config config_; + std::mutex check_mutex_; }; -} -} -} -#endif +} // namespace profile01 +} // namespace e2e +} // namespace vsomeip + +#endif // VSOMEIP_E2E_PROFILE01_CHECKER_HPP diff --git a/implementation/e2e_protection/include/e2e/profile/profile01/profile_01.hpp b/implementation/e2e_protection/include/e2e/profile/profile01/profile_01.hpp index 569a20d..810ec56 100644 --- a/implementation/e2e_protection/include/e2e/profile/profile01/profile_01.hpp +++ b/implementation/e2e_protection/include/e2e/profile/profile01/profile_01.hpp @@ -3,52 +3,54 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef E2E_PROFILE_PROFILE01_PROFILE01_HPP -#define E2E_PROFILE_PROFILE01_PROFILE01_HPP +#ifndef VSOMEIP_E2E_PROFILE01_PROFILE01_HPP +#define VSOMEIP_E2E_PROFILE01_PROFILE01_HPP #include <cstdint> #include "../../../buffer/buffer.hpp" +namespace vsomeip { namespace e2e { -namespace profile { namespace profile01 { -struct Config; +struct profile_config; class profile_01 { public: - static uint8_t compute_crc(const Config &_config, const buffer::e2e_buffer &_buffer); + static uint8_t compute_crc(const profile_config &_config, const e2e_buffer &_buffer); - static bool is_buffer_length_valid(const Config &_config, const buffer::e2e_buffer &_buffer); + static bool is_buffer_length_valid(const profile_config &_config, const e2e_buffer &_buffer); }; // [SWS_E2E_00200] -enum class p01_data_id_mode : uint8_t { E2E_P01_DATAID_BOTH, E2E_P01_DATAID_ALT, E2E_P01_DATAID_LOW, E2E_P01_DATAID_NIBBLE}; +enum class p01_data_id_mode : uint8_t {E2E_P01_DATAID_BOTH, E2E_P01_DATAID_ALT, E2E_P01_DATAID_LOW, E2E_P01_DATAID_NIBBLE}; -struct Config { +struct profile_config { // [SWS_E2E_00018] - uint16_t crc_offset; - uint16_t data_id; - p01_data_id_mode data_id_mode; - uint16_t data_length; - uint16_t counter_offset; - uint16_t data_id_nibble_offset; - -#ifndef E2E_DEVELOPMENT - Config() = delete; -#else - Config() = default; -#endif - Config(uint16_t _crc_offset, uint16_t _data_id, p01_data_id_mode _data_id_mode, uint16_t _data_length, uint16_t _counter_offset, uint16_t _data_id_nibble_offset) - - : crc_offset(_crc_offset), data_id(_data_id), - data_id_mode(_data_id_mode), data_length(_data_length), counter_offset(_counter_offset), data_id_nibble_offset(_data_id_nibble_offset) { + uint16_t crc_offset_; + uint16_t data_id_; + p01_data_id_mode data_id_mode_; + uint16_t data_length_; + uint16_t counter_offset_; + uint16_t data_id_nibble_offset_; + + profile_config() = delete; + + profile_config(uint16_t _crc_offset, uint16_t _data_id, + p01_data_id_mode _data_id_mode, uint16_t _data_length, + uint16_t _counter_offset, uint16_t _data_id_nibble_offset) + + : crc_offset_(_crc_offset), data_id_(_data_id), + data_id_mode_(_data_id_mode), data_length_(_data_length), + counter_offset_(_counter_offset), + data_id_nibble_offset_(_data_id_nibble_offset) { } - Config(const Config &_config) = default; - Config &operator=(const Config &_config) = default; + profile_config(const profile_config &_config) = default; + profile_config &operator=(const profile_config &_config) = default; }; -} -} -} -#endif +} // namespace profile01 +} // namespace e2e +} // namespace vsomeip + +#endif // VSOMEIP_E2E_PROFILE01_PROFILE01_HPP diff --git a/implementation/e2e_protection/include/e2e/profile/profile01/protector.hpp b/implementation/e2e_protection/include/e2e/profile/profile01/protector.hpp index f5a129d..204c7a1 100644 --- a/implementation/e2e_protection/include/e2e/profile/profile01/protector.hpp +++ b/implementation/e2e_protection/include/e2e/profile/profile01/protector.hpp @@ -3,42 +3,44 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef E2E_PROFILE_PROFILE01_PROTECTOR_HPP -#define E2E_PROFILE_PROFILE01_PROTECTOR_HPP +#ifndef VSOMEIP_E2E_PROFILE01_PROTECTOR_HPP +#define VSOMEIP_E2E_PROFILE01_PROTECTOR_HPP #include <mutex> #include "../profile01/profile_01.hpp" #include "../profile_interface/protector.hpp" +namespace vsomeip { namespace e2e { -namespace profile { namespace profile01 { -class protector final : public e2e::profile::profile_interface::protector { +class protector final : public e2e::profile_interface::protector { public: protector(void) = delete; - explicit protector(const Config &_config) : config(_config), counter(0){}; + explicit protector(const profile_config &_config) : config_(_config), counter_(0){}; - void protect(buffer::e2e_buffer &_buffer) override final; + void protect(e2e_buffer &_buffer) override final; private: - void write_counter(buffer::e2e_buffer &_buffer); + void write_counter(e2e_buffer &_buffer); - void write_data_id(buffer::e2e_buffer &_buffer); + void write_data_id(e2e_buffer &_buffer); - void write_crc(buffer::e2e_buffer &_buffer, uint8_t _computed_crc); + void write_crc(e2e_buffer &_buffer, uint8_t _computed_crc); void increment_counter(void); private: - Config config; - uint8_t counter; - std::mutex protect_mutex; + profile_config config_; + uint8_t counter_; + std::mutex protect_mutex_; }; -} -} -} -#endif + +} // namespace profile01 +} // namespace e2e +} // namespace vsomeip + +#endif // VSOMEIP_E2E_PROFILE01_PROTECTOR_HPP diff --git a/implementation/e2e_protection/include/e2e/profile/profile_custom/checker.hpp b/implementation/e2e_protection/include/e2e/profile/profile_custom/checker.hpp index 00ea47c..7080329 100644 --- a/implementation/e2e_protection/include/e2e/profile/profile_custom/checker.hpp +++ b/implementation/e2e_protection/include/e2e/profile/profile_custom/checker.hpp @@ -3,38 +3,39 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef E2E_PROFILE_PROFILE_CUSTOM_CHECKER_HPP -#define E2E_PROFILE_PROFILE_CUSTOM_CHECKER_HPP +#ifndef VSOMEIP_E2E_PROFILE_CUSTOM_CHECKER_HPP +#define VSOMEIP_E2E_PROFILE_CUSTOM_CHECKER_HPP #include "../profile_custom/profile_custom.hpp" #include "../profile_interface/checker.hpp" #include <mutex> +namespace vsomeip { namespace e2e { -namespace profile { namespace profile_custom { -class profile_custom_checker final : public e2e::profile::profile_interface::checker { +class profile_custom_checker final : public vsomeip::e2e::profile_interface::checker { public: profile_custom_checker(void) = delete; - explicit profile_custom_checker(const Config &_config) : - config(_config) {} + explicit profile_custom_checker(const vsomeip::e2e::profile_custom::profile_config &_config) : + config_(_config) {} - virtual void check(const buffer::e2e_buffer &_buffer, - e2e::profile::profile_interface::generic_check_status &_generic_check_status); + virtual void check(const e2e_buffer &_buffer, + vsomeip::e2e::profile_interface::generic_check_status &_generic_check_status); private: - uint32_t read_crc(const buffer::e2e_buffer &_buffer) const; + uint32_t read_crc(const e2e_buffer &_buffer) const; private: - Config config; - std::mutex check_mutex; + profile_config config_; + std::mutex check_mutex_; }; -} -} -} -#endif +} // namespace profile_custom +} // namespace e2e +} // namespace vsomeip + +#endif // VSOMEIP_E2E_PROFILE_CUSTOM_CHECKER_HPP diff --git a/implementation/e2e_protection/include/e2e/profile/profile_custom/profile_custom.hpp b/implementation/e2e_protection/include/e2e/profile/profile_custom/profile_custom.hpp index da0e57f..f41942c 100644 --- a/implementation/e2e_protection/include/e2e/profile/profile_custom/profile_custom.hpp +++ b/implementation/e2e_protection/include/e2e/profile/profile_custom/profile_custom.hpp @@ -3,42 +3,39 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef E2E_PROFILE_PROFILE_CUSTOM_PROFILE_CUSTOM_HPP -#define E2E_PROFILE_PROFILE_CUSTOM_PROFILE_CUSTOM_HPP +#ifndef VSOMEIP_E2E_PROFILE_CUSTOM_PROFILE_CUSTOM_HPP +#define VSOMEIP_E2E_PROFILE_CUSTOM_PROFILE_CUSTOM_HPP #include <cstdint> #include "../../../buffer/buffer.hpp" +namespace vsomeip { namespace e2e { -namespace profile { namespace profile_custom { -struct Config; +struct profile_config; class profile_custom { public: - static uint32_t compute_crc(const Config &_config, const buffer::e2e_buffer &_buffer); + static uint32_t compute_crc(const profile_config &_config, const e2e_buffer &_buffer); - static bool is_buffer_length_valid(const Config &_config, const buffer::e2e_buffer &_buffer); + static bool is_buffer_length_valid(const profile_config &_config, const e2e_buffer &_buffer); }; -struct Config { - uint16_t crc_offset; +struct profile_config { + uint16_t crc_offset_; -#ifndef E2E_DEVELOPMENT - Config() = delete; -#else - Config() = default; -#endif - Config(uint16_t _crc_offset) + profile_config() = delete; - : crc_offset(_crc_offset) { + profile_config(uint16_t _crc_offset) + : crc_offset_(_crc_offset) { } - Config(const Config &_config) = default; - Config &operator=(const Config &_config) = default; + profile_config(const profile_config &_config) = default; + profile_config &operator=(const profile_config &_config) = default; }; -} -} -} -#endif +} // namespace profile_custom +} // namespace e2e +} // namespace vsomeip + +#endif // VSOMEIP_E2E_PROFILE_CUSTOM_PROFILE_CUSTOM_HPP diff --git a/implementation/e2e_protection/include/e2e/profile/profile_custom/protector.hpp b/implementation/e2e_protection/include/e2e/profile/profile_custom/protector.hpp index 40307c4..4b6c3b4 100644 --- a/implementation/e2e_protection/include/e2e/profile/profile_custom/protector.hpp +++ b/implementation/e2e_protection/include/e2e/profile/profile_custom/protector.hpp @@ -3,34 +3,36 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef E2E_PROFILE_PROFILE_CUSTOM_PROTECTOR_HPP -#define E2E_PROFILE_PROFILE_CUSTOM_PROTECTOR_HPP +#ifndef VSOMEIP_E2E_PROFILE_CUSTOM_PROTECTOR_HPP +#define VSOMEIP_E2E_PROFILE_CUSTOM_PROTECTOR_HPP #include <mutex> #include "../profile_custom/profile_custom.hpp" #include "../profile_interface/protector.hpp" +namespace vsomeip { namespace e2e { -namespace profile { namespace profile_custom { -class protector final : public e2e::profile::profile_interface::protector { +class protector final : public vsomeip::e2e::profile_interface::protector { public: protector(void) = delete; - explicit protector(const Config &_config) : config(_config){}; + explicit protector(const profile_config &_config) : config_(_config){}; - void protect(buffer::e2e_buffer &_buffer) override final; + void protect(e2e_buffer &_buffer) override final; private: - void write_crc(buffer::e2e_buffer &_buffer, uint32_t _computed_crc); + void write_crc(e2e_buffer &_buffer, uint32_t _computed_crc); private: - Config config; - std::mutex protect_mutex; + profile_config config_; + std::mutex protect_mutex_; }; -} -} -} -#endif + +} // namespace profile_custom +} // namespace e2e +} // namespace vsomeip + +#endif // VSOMEIP_E2E_PROFILE_CUSTOM_PROTECTOR_HPP diff --git a/implementation/e2e_protection/include/e2e/profile/profile_interface/checker.hpp b/implementation/e2e_protection/include/e2e/profile/profile_interface/checker.hpp index ed102a5..c66d1b0 100644 --- a/implementation/e2e_protection/include/e2e/profile/profile_interface/checker.hpp +++ b/implementation/e2e_protection/include/e2e/profile/profile_interface/checker.hpp @@ -3,26 +3,25 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. - -#ifndef E2E_PROFILE_INTERFACE_CHECKER_HPP -#define E2E_PROFILE_INTERFACE_CHECKER_HPP +#ifndef VSOMEIP_E2E_PROFILE_INTERFACE_CHECKER_HPP +#define VSOMEIP_E2E_PROFILE_INTERFACE_CHECKER_HPP #include "../profile_interface/profile_interface.hpp" #include "../../../buffer/buffer.hpp" #include <mutex> - +namespace vsomeip { namespace e2e { -namespace profile { namespace profile_interface { class checker : public profile_interface { public: - virtual void check(const buffer::e2e_buffer &_buffer, - e2e::profile::profile_interface::generic_check_status &_generic_check_status) = 0; + virtual void check(const e2e_buffer &_buffer, + vsomeip::e2e::profile_interface::generic_check_status &_generic_check_status) = 0; }; -} -} -} -#endif +} // namespace profile_interface +} // namespace e2e +} // namespace vsomeip + +#endif // VSOMEIP_E2E_PROFILE_INTERFACE_CHECKER_HPP diff --git a/implementation/e2e_protection/include/e2e/profile/profile_interface/profile_interface.hpp b/implementation/e2e_protection/include/e2e/profile/profile_interface/profile_interface.hpp index 996a86d..377bbee 100644 --- a/implementation/e2e_protection/include/e2e/profile/profile_interface/profile_interface.hpp +++ b/implementation/e2e_protection/include/e2e/profile/profile_interface/profile_interface.hpp @@ -3,22 +3,25 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. - -#ifndef E2E_PROFILE_INTERFACE_INTERFACE_HPP -#define E2E_PROFILE_INTERFACE_INTERFACE_HPP +#ifndef VSOMEIP_E2E_PROFILE_INTERFACE_PROFILE_INTERFACE_HPP +#define VSOMEIP_E2E_PROFILE_INTERFACE_PROFILE_INTERFACE_HPP #include <cstdint> +namespace vsomeip { namespace e2e { -namespace profile { namespace profile_interface { enum class generic_check_status : uint8_t { E2E_OK, E2E_WRONG_CRC, E2E_ERROR}; class profile_interface { - +public: + virtual ~profile_interface() { + } }; -} -} -} -#endif + +} // namespace profile_interface +} // namespace e2e +} // namespace vsomeip + +#endif // VSOMEIP_E2E_PROFILE_INTERFACE_PROFILE_INTERFACE_HPP diff --git a/implementation/e2e_protection/include/e2e/profile/profile_interface/protector.hpp b/implementation/e2e_protection/include/e2e/profile/profile_interface/protector.hpp index a904585..a031a9e 100644 --- a/implementation/e2e_protection/include/e2e/profile/profile_interface/protector.hpp +++ b/implementation/e2e_protection/include/e2e/profile/profile_interface/protector.hpp @@ -3,22 +3,23 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef E2E_PROFILE_INTERFACE_PROTECTOR_HPP -#define E2E_PROFILE_INTERFACE_PROTECTOR_HPP +#ifndef VSOMEIP_E2E_PROFILE_INTERFACE_PROTECTOR_HPP +#define VSOMEIP_E2E_PROFILE_INTERFACE_PROTECTOR_HPP #include "../../../buffer/buffer.hpp" #include "../profile_interface/profile_interface.hpp" +namespace vsomeip { namespace e2e { -namespace profile { namespace profile_interface { class protector : public profile_interface { public: - virtual void protect(buffer::e2e_buffer &_buffer) = 0; + virtual void protect(e2e_buffer &_buffer) = 0; }; -} -} -} -#endif +} // namespace profile_interface +} // namespace e2e +} // namespace vsomeip + +#endif // VSOMEIP_E2E_PROFILE_INTERFACE_PROTECTOR_HPP diff --git a/implementation/e2e_protection/include/e2exf/config.hpp b/implementation/e2e_protection/include/e2exf/config.hpp index 4945a7b..3667211 100644 --- a/implementation/e2e_protection/include/e2exf/config.hpp +++ b/implementation/e2e_protection/include/e2exf/config.hpp @@ -3,8 +3,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef E2EXF_CONFIG_HPP -#define E2EXF_CONFIG_HPP +#ifndef VSOMEIP_E2EXF_CONFIG_HPP +#define VSOMEIP_E2EXF_CONFIG_HPP #include "../e2e/profile/profile_interface/checker.hpp" #include "../e2e/profile/profile_interface/protector.hpp" @@ -12,6 +12,7 @@ #include <memory> #include <map> +namespace vsomeip { namespace e2exf { using session_id = uint16_t; @@ -19,8 +20,9 @@ using instance_id = uint16_t; using data_identifier = std::pair<session_id, instance_id>; -} - std::ostream &operator<<(std::ostream &_os, const e2exf::data_identifier &_data_identifier); -#endif +} // namespace e2exf +} // namespace vsomeip + +#endif // VSOMEIP_E2EXF_CONFIG_HPP diff --git a/implementation/e2e_protection/src/buffer/buffer.cpp b/implementation/e2e_protection/src/buffer/buffer.cpp index ad7cf69..e133d42 100644 --- a/implementation/e2e_protection/src/buffer/buffer.cpp +++ b/implementation/e2e_protection/src/buffer/buffer.cpp @@ -6,7 +6,9 @@ #include "../../../e2e_protection/include/buffer/buffer.hpp" #include <iomanip> -std::ostream &operator<<(std::ostream &_os, const buffer::e2e_buffer &_buffer) { +namespace vsomeip { + +std::ostream &operator<<(std::ostream &_os, const e2e_buffer &_buffer) { for (auto b : _buffer) { if (isupper(b)) { _os << b; @@ -16,3 +18,5 @@ std::ostream &operator<<(std::ostream &_os, const buffer::e2e_buffer &_buffer) { } return _os; } + +} // namespace vsomeip diff --git a/implementation/e2e_protection/src/crc/crc.cpp b/implementation/e2e_protection/src/crc/crc.cpp index a3e6758..d9f1f99 100644 --- a/implementation/e2e_protection/src/crc/crc.cpp +++ b/implementation/e2e_protection/src/crc/crc.cpp @@ -8,7 +8,7 @@ #include <string> #include <iomanip> -namespace crc { +namespace vsomeip { /** * Calculates the crc over the provided range. @@ -27,16 +27,16 @@ namespace crc { * - ReflectOut = false * - Algorithm = table-driven */ -uint8_t e2e_crc::calculate_profile_01(buffer::buffer_view _buffer_view, const uint8_t _start_value) { +uint8_t e2e_crc::calculate_profile_01(buffer_view _buffer_view, const uint8_t _start_value) { uint8_t crc = _start_value ^ 0xFFU; for (uint8_t byte : _buffer_view) { - crc = static_cast<uint8_t>(lookup_table_profile_01[static_cast<uint8_t>((byte) ^ crc)] ^ (crc >> 8U)); + crc = static_cast<uint8_t>(lookup_table_profile_01_[static_cast<uint8_t>((byte) ^ crc)] ^ (crc >> 8U)); } crc = crc ^ 0xFFU; return crc; } -const uint8_t e2e_crc::lookup_table_profile_01[256] = { +const uint8_t e2e_crc::lookup_table_profile_01_[256] = { 0x00U, 0x1DU, 0x3AU, 0x27U, 0x74U, 0x69U, 0x4EU, 0x53U, 0xE8U, 0xF5U, 0xD2U, 0xCFU, 0x9CU, 0x81U, 0xA6U, 0xBBU, 0xCDU, 0xD0U, 0xF7U, 0xEAU, 0xB9U, 0xA4U, 0x83U, 0x9EU, 0x25U, 0x38U, 0x1FU, 0x02U, 0x51U, 0x4CU, 0x6BU, 0x76U, 0x87U, 0x9AU, 0xBDU, 0xA0U, 0xF3U, 0xEEU, 0xC9U, 0xD4U, 0x6FU, 0x72U, 0x55U, 0x48U, 0x1BU, 0x06U, 0x21U, 0x3CU, @@ -71,10 +71,10 @@ const uint8_t e2e_crc::lookup_table_profile_01[256] = { * - XorOut = 0xFFFFFFFF * - ReflectOut = true */ -uint32_t e2e_crc::calculate_profile_04(buffer::buffer_view _buffer_view, const uint32_t _start_value) { +uint32_t e2e_crc::calculate_profile_04(buffer_view _buffer_view, const uint32_t _start_value) { uint32_t crc = _start_value ^ 0xFFFFFFFFU; for (uint8_t byte : _buffer_view) { - crc = lookup_table_profile_04[static_cast<uint8_t>(byte ^ crc)] ^ (crc >> 8U); + crc = lookup_table_profile_04_[static_cast<uint8_t>(byte ^ crc)] ^ (crc >> 8U); } crc = crc ^ 0xFFFFFFFFU; @@ -82,7 +82,7 @@ uint32_t e2e_crc::calculate_profile_04(buffer::buffer_view _buffer_view, const u return crc; } -const uint32_t e2e_crc::lookup_table_profile_04[256] = { +const uint32_t e2e_crc::lookup_table_profile_04_[256] = { 0x00000000U, 0x30850FF5U, 0x610A1FEAU, 0x518F101FU, 0xC2143FD4U, 0xF2913021U, 0xA31E203EU, 0x939B2FCBU, 0x159615F7U, 0x25131A02U, 0x749C0A1DU, 0x441905E8U, 0xD7822A23U, 0xE70725D6U, 0xB68835C9U, 0x860D3A3CU, 0x2B2C2BEEU, 0x1BA9241BU, 0x4A263404U, 0x7AA33BF1U, 0xE938143AU, 0xD9BD1BCFU, 0x88320BD0U, 0xB8B70425U, 0x3EBA3E19U, 0x0E3F31ECU, 0x5FB021F3U, @@ -114,9 +114,6 @@ const uint32_t e2e_crc::lookup_table_profile_04[256] = { 0xCE99CC86U, 0xFE1CC373U, 0xAF93D36CU, 0x9F16DC99U }; - - - /** * Calculates the CRC over the provided range. * @@ -133,12 +130,12 @@ const uint32_t e2e_crc::lookup_table_profile_04[256] = { * - XorOut = 0xFFFFFFFF * - ReflectOut = true */ -uint32_t e2e_crc::calculate_profile_custom(buffer::buffer_view _buffer_view) { +uint32_t e2e_crc::calculate_profile_custom(buffer_view _buffer_view) { // InitValue uint32_t crc = 0xFFFFFFFFU; for (uint8_t byte : _buffer_view) { - crc = lookup_table_profile_custom[static_cast<uint8_t>(byte ^ crc)] ^ (crc >> 8U); + crc = lookup_table_profile_custom_[static_cast<uint8_t>(byte ^ crc)] ^ (crc >> 8U); } // XorOut @@ -146,8 +143,7 @@ uint32_t e2e_crc::calculate_profile_custom(buffer::buffer_view _buffer_view) { return crc; } - -const uint32_t e2e_crc::lookup_table_profile_custom[256] = { +const uint32_t e2e_crc::lookup_table_profile_custom_[256] = { 0x00000000U, 0x77073096U, 0xEE0E612CU, 0x990951BAU, 0x076DC419U, 0x706AF48FU, 0xE963A535U, 0x9E6495A3U, 0x0EDB8832U, 0x79DCB8A4U, 0xE0D5E91EU, 0x97D2D988U, 0x09B64C2BU, 0x7EB17CBDU, 0xE7B82D07U, 0x90BF1D91U, 0x1DB71064U, 0x6AB020F2U, 0xF3B97148U, 0x84BE41DEU, 0x1ADAD47DU, 0x6DDDE4EBU, 0xF4D4B551U, 0x83D385C7U, @@ -182,5 +178,5 @@ const uint32_t e2e_crc::lookup_table_profile_custom[256] = { 0xB3667A2EU, 0xC4614AB8U, 0x5D681B02U, 0x2A6F2B94U, 0xB40BBE37U, 0xC30C8EA1U, 0x5A05DF1BU, 0x2D02EF8DU }; -} +} // namespace vsomeip diff --git a/implementation/e2e_protection/src/e2e/profile/profile01/checker.cpp b/implementation/e2e_protection/src/e2e/profile/profile01/checker.cpp index 7379445..e4c0504 100644 --- a/implementation/e2e_protection/src/e2e/profile/profile01/checker.cpp +++ b/implementation/e2e_protection/src/e2e/profile/profile01/checker.cpp @@ -11,32 +11,32 @@ #include <iomanip> #include <algorithm> +namespace vsomeip { namespace e2e { -namespace profile { namespace profile01 { // [SWS_E2E_00196] -void profile_01_checker::check(const buffer::e2e_buffer &_buffer, - e2e::profile::profile_interface::generic_check_status &_generic_check_status) { - std::lock_guard<std::mutex> lock(check_mutex); - _generic_check_status = e2e::profile::profile_interface::generic_check_status::E2E_ERROR; +void profile_01_checker::check(const e2e_buffer &_buffer, + e2e::profile_interface::generic_check_status &_generic_check_status) { + std::lock_guard<std::mutex> lock(check_mutex_); + _generic_check_status = e2e::profile_interface::generic_check_status::E2E_ERROR; - - if(profile_01::is_buffer_length_valid(config, _buffer)) { + if (profile_01::is_buffer_length_valid(config_, _buffer)) { uint8_t received_crc(0); uint8_t calculated_crc(0); - received_crc = _buffer[config.crc_offset]; - calculated_crc = profile_01::compute_crc(config, _buffer); - if(received_crc == calculated_crc) { - _generic_check_status = e2e::profile::profile_interface::generic_check_status::E2E_OK; + received_crc = _buffer[config_.crc_offset_]; + calculated_crc = profile_01::compute_crc(config_, _buffer); + if (received_crc == calculated_crc) { + _generic_check_status = e2e::profile_interface::generic_check_status::E2E_OK; } else { - _generic_check_status = e2e::profile::profile_interface::generic_check_status::E2E_WRONG_CRC; - VSOMEIP_INFO << std::hex << "E2E protection: CRC8 does not match: calculated CRC: " << (uint32_t) calculated_crc << " received CRC: " << (uint32_t) received_crc; + _generic_check_status = e2e::profile_interface::generic_check_status::E2E_WRONG_CRC; + VSOMEIP_INFO << std::hex << "E2E protection: CRC8 does not match: calculated CRC: " + << (uint32_t) calculated_crc << " received CRC: " << (uint32_t) received_crc; } } return; } -} -} -} +} // namespace profile01 +} // namespace e2e +} // namespace vsomeip diff --git a/implementation/e2e_protection/src/e2e/profile/profile01/profile_01.cpp b/implementation/e2e_protection/src/e2e/profile/profile01/profile_01.cpp index d28d17c..72bd14c 100644 --- a/implementation/e2e_protection/src/e2e/profile/profile01/profile_01.cpp +++ b/implementation/e2e_protection/src/e2e/profile/profile01/profile_01.cpp @@ -10,26 +10,26 @@ #include <string> #include <iomanip> +namespace vsomeip { namespace e2e { -namespace profile { namespace profile01 { -uint8_t profile_01::compute_crc(const Config &_config, const buffer::e2e_buffer &_buffer) { +uint8_t profile_01::compute_crc(const profile_config &_config, const e2e_buffer &_buffer) { uint8_t computed_crc = 0xFF; - buffer::e2e_buffer data_id_buffer; //(_data, _data+_size); - data_id_buffer.push_back((uint8_t) (_config.data_id >> 8)); // insert MSB - data_id_buffer.push_back((uint8_t) _config.data_id); // insert LSB + e2e_buffer data_id_buffer; //(_data, _data+_size); + data_id_buffer.push_back((uint8_t) (_config.data_id_ >> 8)); // insert MSB + data_id_buffer.push_back((uint8_t) _config.data_id_); // insert LSB - switch (_config.data_id_mode) { + switch (_config.data_id_mode_) { case p01_data_id_mode::E2E_P01_DATAID_BOTH: // CRC over 2 bytes /* * Two bytes are included in the CRC (double ID configuration) This is used in E2E variant 1A. */ // CRC = Crc_CalculateCRC8(Config->DataID, 1, 0xFF, FALSE) - computed_crc = crc::e2e_crc::calculate_profile_01(buffer::buffer_view(data_id_buffer, 1, 2), 0xFF); //CRC over low byte of Data ID (LSB) + computed_crc = e2e_crc::calculate_profile_01(buffer_view(data_id_buffer, 1, 2), 0xFF); //CRC over low byte of Data ID (LSB) // CRC = Crc_CalculateCRC8(Config->DataID >> 8, 1, CRC, FALSE) - computed_crc = crc::e2e_crc::calculate_profile_01(buffer::buffer_view(data_id_buffer, 0, 1), computed_crc); //CRC over high byte of Data ID (MSB) + computed_crc = e2e_crc::calculate_profile_01(buffer_view(data_id_buffer, 0, 1), computed_crc); //CRC over high byte of Data ID (MSB) break; case p01_data_id_mode::E2E_P01_DATAID_LOW: // CRC over low byte only @@ -38,7 +38,7 @@ uint8_t profile_01::compute_crc(const Config &_config, const buffer::e2e_buffer * This is applicable if the IDs in a particular system are 8 bits */ // CRC = Crc_CalculateCRC8(Config->DataID, 1, 0xFF, FALSE) - computed_crc = crc::e2e_crc::calculate_profile_01(buffer::buffer_view(data_id_buffer, 1, 2), 0xFF); //CRC over low byte of Data ID (LSB) + computed_crc = e2e_crc::calculate_profile_01(buffer_view(data_id_buffer, 1, 2), 0xFF); //CRC over low byte of Data ID (LSB) break; case p01_data_id_mode::E2E_P01_DATAID_ALT: @@ -67,12 +67,12 @@ uint8_t profile_01::compute_crc(const Config &_config, const buffer::e2e_buffer * up to 12 bits. This is used in E2E variant 1C. */ // CRC = Crc_CalculateCRC8(Config->DataID, 1, 0xFF, FALSE) - computed_crc = crc::e2e_crc::calculate_profile_01(buffer::buffer_view(data_id_buffer, 1, 2), 0xFF); //CRC over low byte of Data ID (LSB) + computed_crc = e2e_crc::calculate_profile_01(buffer_view(data_id_buffer, 1, 2), 0xFF); //CRC over low byte of Data ID (LSB) // CRC = Crc_CalculateCRC8 (0, 1, CRC, FALSE) data_id_buffer.clear(); data_id_buffer.push_back(0x00); - computed_crc = crc::e2e_crc::calculate_profile_01(buffer::buffer_view(data_id_buffer, 0, 1), computed_crc); // CRC with 0x00 + computed_crc = e2e_crc::calculate_profile_01(buffer_view(data_id_buffer, 0, 1), computed_crc); // CRC with 0x00 break; default: @@ -80,15 +80,15 @@ uint8_t profile_01::compute_crc(const Config &_config, const buffer::e2e_buffer } // Compute CRC over the area before the CRC (if CRC is not the first byte) - if(_config.crc_offset >= 1) { + if (_config.crc_offset_ >= 1) { // CRC = Crc_CalculateCRC8 (Data, (Config->CRCOffset / 8), CRC, FALSE) - computed_crc = crc::e2e_crc::calculate_profile_01(buffer::buffer_view(_buffer, 0, _config.crc_offset), computed_crc); + computed_crc = e2e_crc::calculate_profile_01(buffer_view(_buffer, 0, _config.crc_offset_), computed_crc); } // Compute the area after CRC, if CRC is not the last byte. Start with the byte after CRC, finish with the last byte of Data. - if((_config.crc_offset) < (_config.data_length / 8) - 1) { + if ((_config.crc_offset_) < (_config.data_length_ / 8) - 1) { // CRC = Crc_CalculateCRC8 (& Data[Config->CRCOffset/8 + 1], (Config->DataLength / 8 - Config->CRCOffset / 8 - 1), CRC, FALSE) - computed_crc = crc::e2e_crc::calculate_profile_01(buffer::buffer_view(_buffer, _config.crc_offset + 1, _buffer.size()), computed_crc); + computed_crc = e2e_crc::calculate_profile_01(buffer_view(_buffer, _config.crc_offset_ + 1, _buffer.size()), computed_crc); } // CRC = CRC ^ 0xFF @@ -98,12 +98,13 @@ uint8_t profile_01::compute_crc(const Config &_config, const buffer::e2e_buffer } /** @req [SWS_E2E_00356] */ -bool profile_01::is_buffer_length_valid(const Config &_config, const buffer::e2e_buffer &_buffer) { - return (((_config.data_length / 8) + 1U <= _buffer.size()) - && _config.crc_offset <= _buffer.size() - && _config.counter_offset / 8 <= _buffer.size() - && _config.data_id_nibble_offset / 8 <= _buffer.size()); -} -} -} +bool profile_01::is_buffer_length_valid(const profile_config &_config, const e2e_buffer &_buffer) { + return (((_config.data_length_ / 8) + 1U <= _buffer.size()) + && _config.crc_offset_ <= _buffer.size() + && _config.counter_offset_ / 8 <= _buffer.size() + && _config.data_id_nibble_offset_ / 8 <= _buffer.size()); } + +} // namespace profile01 +} // namespace e2e +} // namespace vsomeip diff --git a/implementation/e2e_protection/src/e2e/profile/profile01/protector.cpp b/implementation/e2e_protection/src/e2e/profile/profile01/protector.cpp index 639eb4c..0bf0199 100644 --- a/implementation/e2e_protection/src/e2e/profile/profile01/protector.cpp +++ b/implementation/e2e_protection/src/e2e/profile/profile01/protector.cpp @@ -11,16 +11,15 @@ #include <string> #include <iomanip> - +namespace vsomeip { namespace e2e { -namespace profile { namespace profile01 { /** @req [SWS_E2E_00195] */ -void protector::protect(buffer::e2e_buffer &_buffer) { - std::lock_guard<std::mutex> lock(protect_mutex); +void protector::protect(e2e_buffer &_buffer) { + std::lock_guard<std::mutex> lock(protect_mutex_); - if(profile_01::is_buffer_length_valid(config, _buffer)) { + if (profile_01::is_buffer_length_valid(config_, _buffer)) { // write the current Counter value in Data write_counter(_buffer); @@ -28,7 +27,7 @@ void protector::protect(buffer::e2e_buffer &_buffer) { write_data_id(_buffer); // compute the CRC over DataID and Data - uint8_t computed_crc = profile_01::compute_crc(config, _buffer); + uint8_t computed_crc = profile_01::compute_crc(config_, _buffer); // write CRC in Data write_crc(_buffer, computed_crc); @@ -38,40 +37,43 @@ void protector::protect(buffer::e2e_buffer &_buffer) { } /** @req [SRS_E2E_08528] */ -void protector::write_counter(buffer::e2e_buffer &_buffer) { - if(config.counter_offset % 8 == 0) { +void protector::write_counter(e2e_buffer &_buffer) { + if (config_.counter_offset_ % 8 == 0) { // write write counter value into low nibble - _buffer[config.counter_offset / 8] = static_cast<uint8_t>((_buffer[config.counter_offset / 8] & 0xF0) | (counter & 0x0F)); + _buffer[config_.counter_offset_ / 8] = + static_cast<uint8_t>((_buffer[config_.counter_offset_ / 8] & 0xF0) | (counter_ & 0x0F)); } else { // write counter into high nibble - _buffer[config.counter_offset / 8] = static_cast<uint8_t>((_buffer[config.counter_offset / 8] & 0x0F) | ((counter << 4) & 0xF0)); + _buffer[config_.counter_offset_ / 8] = + static_cast<uint8_t>((_buffer[config_.counter_offset_ / 8] & 0x0F) | ((counter_ << 4) & 0xF0)); } } /** @req [SRS_E2E_08528] */ -void protector::write_data_id(buffer::e2e_buffer &_buffer) { - if(config.data_id_mode == p01_data_id_mode::E2E_P01_DATAID_NIBBLE) { - if(config.data_id_nibble_offset % 8 == 0) { +void protector::write_data_id(e2e_buffer &_buffer) { + if (config_.data_id_mode_ == p01_data_id_mode::E2E_P01_DATAID_NIBBLE) { + if (config_.data_id_nibble_offset_ % 8 == 0) { // write low nibble of high byte of Data ID - _buffer[config.data_id_nibble_offset / 8] = static_cast<uint8_t>((_buffer[config.data_id_nibble_offset / 8] & 0xF0) | ((config.data_id >> 8) & 0x0F)); + _buffer[config_.data_id_nibble_offset_ / 8] = + static_cast<uint8_t>((_buffer[config_.data_id_nibble_offset_ / 8] & 0xF0) | ((config_.data_id_ >> 8) & 0x0F)); } else { // write low nibble of high byte of Data ID - _buffer[config.data_id_nibble_offset / 8] = static_cast<uint8_t>((_buffer[config.data_id_nibble_offset / 8] & 0x0F) | ((config.data_id >> 4) & 0xF0)); + _buffer[config_.data_id_nibble_offset_ / 8] = + static_cast<uint8_t>((_buffer[config_.data_id_nibble_offset_ / 8] & 0x0F) | ((config_.data_id_ >> 4) & 0xF0)); } } } /** @req [SRS_E2E_08528] */ -void protector::write_crc(buffer::e2e_buffer &_buffer, uint8_t _computed_crc) { - _buffer[config.crc_offset] = _computed_crc; +void protector::write_crc(e2e_buffer &_buffer, uint8_t _computed_crc) { + _buffer[config_.crc_offset_] = _computed_crc; } /** @req [SWS_E2E_00075] */ void protector::increment_counter(void) { - counter = static_cast<uint8_t>((counter + 1U) % 15); + counter_ = static_cast<uint8_t>((counter_ + 1U) % 15); } - -} -} -} +} // namespace profile01 +} // namespace e2e +} // namespace vsomeip diff --git a/implementation/e2e_protection/src/e2e/profile/profile_custom/checker.cpp b/implementation/e2e_protection/src/e2e/profile/profile_custom/checker.cpp index d2608d3..eb6f557 100644 --- a/implementation/e2e_protection/src/e2e/profile/profile_custom/checker.cpp +++ b/implementation/e2e_protection/src/e2e/profile/profile_custom/checker.cpp @@ -11,39 +11,39 @@ #include <iomanip> #include <algorithm> +namespace vsomeip { namespace e2e { -namespace profile { namespace profile_custom { -void profile_custom_checker::check(const buffer::e2e_buffer &_buffer, - e2e::profile::profile_interface::generic_check_status &_generic_check_status) { - std::lock_guard<std::mutex> lock(check_mutex); - _generic_check_status = e2e::profile::profile_interface::generic_check_status::E2E_ERROR; +void profile_custom_checker::check(const e2e_buffer &_buffer, + e2e::profile_interface::generic_check_status &_generic_check_status) { + std::lock_guard<std::mutex> lock(check_mutex_); + _generic_check_status = e2e::profile_interface::generic_check_status::E2E_ERROR; - if(profile_custom::is_buffer_length_valid(config, _buffer)) { + if (profile_custom::is_buffer_length_valid(config_, _buffer)) { uint32_t received_crc(0); uint32_t calculated_crc(0); received_crc = read_crc(_buffer); - calculated_crc = profile_custom::compute_crc(config, _buffer); - if(received_crc == calculated_crc) { - _generic_check_status = e2e::profile::profile_interface::generic_check_status::E2E_OK; + calculated_crc = profile_custom::compute_crc(config_, _buffer); + if (received_crc == calculated_crc) { + _generic_check_status = e2e::profile_interface::generic_check_status::E2E_OK; } else { - _generic_check_status = e2e::profile::profile_interface::generic_check_status::E2E_WRONG_CRC; - VSOMEIP_INFO << std::hex << "E2E protection: CRC32 does not match: calculated CRC: " << (uint32_t) calculated_crc << " received CRC: " << (uint32_t) received_crc; + _generic_check_status = e2e::profile_interface::generic_check_status::E2E_WRONG_CRC; + VSOMEIP_INFO << std::hex << "E2E protection: CRC32 does not match: calculated CRC: " + << (uint32_t) calculated_crc << " received CRC: " << (uint32_t) received_crc; } } return; } -uint32_t profile_custom_checker::read_crc(const buffer::e2e_buffer &_buffer) const { - return (static_cast<uint32_t>(_buffer[config.crc_offset ]) << 24U) | - (static_cast<uint32_t>(_buffer[config.crc_offset + 1U]) << 16U) | - (static_cast<uint32_t>(_buffer[config.crc_offset + 2U]) << 8U) | - static_cast<uint32_t>(_buffer[config.crc_offset + 3U]); +uint32_t profile_custom_checker::read_crc(const e2e_buffer &_buffer) const { + return (static_cast<uint32_t>(_buffer[config_.crc_offset_ ]) << 24U) | + (static_cast<uint32_t>(_buffer[config_.crc_offset_ + 1U]) << 16U) | + (static_cast<uint32_t>(_buffer[config_.crc_offset_ + 2U]) << 8U) | + static_cast<uint32_t>(_buffer[config_.crc_offset_ + 3U]); } - -} -} -} +} // namespace profile_custom +} // namespace e2e +} // namespace vsomeip diff --git a/implementation/e2e_protection/src/e2e/profile/profile_custom/profile_custom.cpp b/implementation/e2e_protection/src/e2e/profile/profile_custom/profile_custom.cpp index bc39842..d7de34b 100644 --- a/implementation/e2e_protection/src/e2e/profile/profile_custom/profile_custom.cpp +++ b/implementation/e2e_protection/src/e2e/profile/profile_custom/profile_custom.cpp @@ -10,20 +10,19 @@ #include <string> #include <iomanip> +namespace vsomeip { namespace e2e { -namespace profile { namespace profile_custom { -uint32_t profile_custom::compute_crc(const Config &_config, const buffer::e2e_buffer &_buffer) { - uint32_t computed_crc = crc::e2e_crc::calculate_profile_custom(buffer::buffer_view(_buffer, _config.crc_offset + 4, _buffer.size())); +uint32_t profile_custom::compute_crc(const profile_config &_config, const e2e_buffer &_buffer) { + uint32_t computed_crc = e2e_crc::calculate_profile_custom(buffer_view(_buffer, _config.crc_offset_ + 4, _buffer.size())); return computed_crc; } -bool profile_custom::is_buffer_length_valid(const Config &_config, const buffer::e2e_buffer &_buffer) { - - return ( (_config.crc_offset + 4U) <=_buffer.size()); +bool profile_custom::is_buffer_length_valid(const profile_config &_config, const e2e_buffer &_buffer) { + return ((_config.crc_offset_ + 4U) <=_buffer.size()); } -} -} -} +} // namespace profile_custom +} // namespace e2e +} // namespace vsomeip diff --git a/implementation/e2e_protection/src/e2e/profile/profile_custom/protector.cpp b/implementation/e2e_protection/src/e2e/profile/profile_custom/protector.cpp index 3c77116..f061e55 100644 --- a/implementation/e2e_protection/src/e2e/profile/profile_custom/protector.cpp +++ b/implementation/e2e_protection/src/e2e/profile/profile_custom/protector.cpp @@ -13,29 +13,28 @@ #include <iostream> #include <sstream> - +namespace vsomeip { namespace e2e { -namespace profile { namespace profile_custom { -void protector::protect(buffer::e2e_buffer &_buffer) { - std::lock_guard<std::mutex> lock(protect_mutex); +void protector::protect(e2e_buffer &_buffer) { + std::lock_guard<std::mutex> lock(protect_mutex_); - if(profile_custom::is_buffer_length_valid(config, _buffer)) { + if (profile_custom::is_buffer_length_valid(config_, _buffer)) { // compute the CRC over DataID and Data - uint32_t computed_crc = profile_custom::compute_crc(config, _buffer); + uint32_t computed_crc = profile_custom::compute_crc(config_, _buffer); // write CRC in Data write_crc(_buffer, computed_crc); } } -void protector::write_crc(buffer::e2e_buffer &_buffer, uint32_t _computed_crc) { - _buffer[config.crc_offset] = static_cast<uint8_t>(_computed_crc >> 24U); - _buffer[config.crc_offset + 1U] = static_cast<uint8_t>(_computed_crc >> 16U); - _buffer[config.crc_offset + 2U] = static_cast<uint8_t>(_computed_crc >> 8U); - _buffer[config.crc_offset + 3U] = static_cast<uint8_t>(_computed_crc); +void protector::write_crc(e2e_buffer &_buffer, uint32_t _computed_crc) { + _buffer[config_.crc_offset_] = static_cast<uint8_t>(_computed_crc >> 24U); + _buffer[config_.crc_offset_ + 1U] = static_cast<uint8_t>(_computed_crc >> 16U); + _buffer[config_.crc_offset_ + 2U] = static_cast<uint8_t>(_computed_crc >> 8U); + _buffer[config_.crc_offset_ + 3U] = static_cast<uint8_t>(_computed_crc); } -} -} -} +} // namespace profile_custom +} // namespace e2e +} // namespace vsomeip diff --git a/implementation/e2e_protection/src/e2exf/config.cpp b/implementation/e2e_protection/src/e2exf/config.cpp index 6aa9223..e210faf 100644 --- a/implementation/e2e_protection/src/e2exf/config.cpp +++ b/implementation/e2e_protection/src/e2exf/config.cpp @@ -3,15 +3,14 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -#ifndef PROJECT_CONFIG_CPP_HPP -#define PROJECT_CONFIG_CPP_HPP - #include <ostream> #include "../../../e2e_protection/include/e2exf/config.hpp" +namespace vsomeip { + std::ostream &operator<<(std::ostream &_os, const e2exf::data_identifier &_data_identifier) { _os << _data_identifier.first << _data_identifier.second; return _os; } -#endif // PROJECT_CONFIG_CPP_HPP +} // namespace vsomeip diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index 57a3084..b656f8b 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -45,7 +45,7 @@ public: const byte_t *_data, uint32_t _size, bool _flush = true);
bool flush();
- void stop();
+ virtual void stop();
virtual void restart() = 0;
bool is_client() const;
diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp index eb1b310..7ff96ff 100644 --- a/implementation/endpoints/include/local_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp @@ -39,6 +39,7 @@ public: virtual ~local_client_endpoint_impl(); void start(); + void stop(); bool is_local() const; diff --git a/implementation/endpoints/include/local_server_endpoint_impl.hpp b/implementation/endpoints/include/local_server_endpoint_impl.hpp index f9fa0ab..33658fb 100644 --- a/implementation/endpoints/include/local_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_server_endpoint_impl.hpp @@ -60,7 +60,7 @@ public: bool send_to(const std::shared_ptr<endpoint_definition>, const byte_t *_data, uint32_t _size, bool _flush); - void send_queued(queue_iterator_type _queue_iterator); + void send_queued(const queue_iterator_type _queue_iterator); bool get_default_target(service_t, endpoint_type &) const; @@ -84,7 +84,7 @@ private: void start(); void stop(); - void send_queued(queue_iterator_type _queue_iterator); + void send_queued(const queue_iterator_type _queue_iterator); void set_bound_client(client_t _client); @@ -95,7 +95,6 @@ private: std::uint32_t _buffer_shrink_threshold, boost::asio::io_service &_io_service); - void send_magic_cookie(); void receive_cbk(boost::system::error_code const &_error, std::size_t _bytes); void calculate_shrink_count(); diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp index 710df35..ce89c6b 100644 --- a/implementation/endpoints/include/server_endpoint_impl.hpp +++ b/implementation/endpoints/include/server_endpoint_impl.hpp @@ -46,7 +46,7 @@ public: public:
void connect_cbk(boost::system::error_code const &_error);
- void send_cbk(queue_iterator_type _queue_iterator,
+ void send_cbk(const queue_iterator_type _queue_iterator,
boost::system::error_code const &_error, std::size_t _bytes);
void flush_cbk(endpoint_type _target,
const boost::system::error_code &_error);
@@ -54,7 +54,7 @@ public: public:
virtual bool send_intern(endpoint_type _target, const byte_t *_data,
uint32_t _port, bool _flush);
- virtual void send_queued(queue_iterator_type _queue_iterator) = 0;
+ virtual void send_queued(const queue_iterator_type _queue_iterator) = 0;
virtual bool get_default_target(service_t _service,
endpoint_type &_target) const = 0;
diff --git a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp index fd725d2..09c2ae5 100644 --- a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp @@ -38,7 +38,7 @@ public: bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush);
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
VSOMEIP_EXPORT bool is_established(std::shared_ptr<endpoint_definition> _endpoint);
@@ -72,7 +72,7 @@ private: void stop();
void receive();
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
void set_remote_info(const endpoint_type &_remote);
diff --git a/implementation/endpoints/include/udp_server_endpoint_impl.hpp b/implementation/endpoints/include/udp_server_endpoint_impl.hpp index 787a824..aae7a6b 100644 --- a/implementation/endpoints/include/udp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/udp_server_endpoint_impl.hpp @@ -35,7 +35,7 @@ public: bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush);
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
void join(const std::string &_address);
void leave(const std::string &_address);
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index 9744a22..3783add 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -60,6 +60,8 @@ void client_endpoint_impl<Protocol>::stop() { {
std::lock_guard<std::mutex> its_lock(mutex_);
endpoint_impl<Protocol>::sending_blocked_ = true;
+ // delete unsent messages
+ queue_.clear();
}
{
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
@@ -67,28 +69,6 @@ void client_endpoint_impl<Protocol>::stop() { connect_timer_.cancel(ec);
}
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
-
- bool is_open(false);
- {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- is_open = socket_->is_open();
- }
- if (is_open) {
- bool send_queue_empty(false);
- std::uint32_t times_slept(0);
-
- while (times_slept <= 50) {
- mutex_.lock();
- send_queue_empty = (queue_.size() == 0);
- mutex_.unlock();
- if (send_queue_empty) {
- break;
- } else {
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- times_slept++;
- }
- }
- }
shutdown_and_close_socket();
}
@@ -253,6 +233,10 @@ void client_endpoint_impl<Protocol>::send_cbk( std::lock_guard<std::mutex> its_lock(mutex_);
if (endpoint_impl<Protocol>::sending_blocked_) {
queue_.clear();
+ } else {
+ VSOMEIP_WARNING << "cei::send_cbk received error: "
+ << _error.message() << " (" << std::dec
+ << _error.value() << ") " << std::dec << queue_.size();
}
}
shutdown_and_close_socket();
@@ -264,6 +248,9 @@ void client_endpoint_impl<Protocol>::send_cbk( } else if (_error == boost::asio::error::operation_aborted) {
// endpoint was stopped
shutdown_and_close_socket();
+ } else {
+ VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ")" ;
}
}
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index f6e1b5c..52970ca 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -62,6 +62,42 @@ void local_client_endpoint_impl::start() { connect();
}
+void local_client_endpoint_impl::stop() {
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ sending_blocked_ = true;
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
+ boost::system::error_code ec;
+ connect_timer_.cancel(ec);
+ }
+ connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
+
+ bool is_open(false);
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ is_open = socket_->is_open();
+ }
+ if (is_open) {
+ bool send_queue_empty(false);
+ std::uint32_t times_slept(0);
+
+ while (times_slept <= 50) {
+ mutex_.lock();
+ send_queue_empty = (queue_.size() == 0);
+ mutex_.unlock();
+ if (send_queue_empty) {
+ break;
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ times_slept++;
+ }
+ }
+ }
+ shutdown_and_close_socket();
+}
+
void local_client_endpoint_impl::connect() {
boost::system::error_code its_connect_error;
{
@@ -87,6 +123,11 @@ void local_client_endpoint_impl::connect() { its_host->get_client());
}
}
+ } else {
+ VSOMEIP_WARNING << "local_client_endpoint::connect: Couldn't "
+ << "connect to: " << remote_.path() << " ("
+ << its_connect_error.message() << " / " << std::dec
+ << its_connect_error.value() << ")";
}
#endif
diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp index 5a44f2f..460fe40 100644 --- a/implementation/endpoints/src/local_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp @@ -136,11 +136,29 @@ bool local_server_endpoint_impl::send_to( } void local_server_endpoint_impl::send_queued( - queue_iterator_type _queue_iterator) { - std::lock_guard<std::mutex> its_lock(connections_mutex_); - auto connection_iterator = connections_.find(_queue_iterator->first); - if (connection_iterator != connections_.end()) - connection_iterator->second->send_queued(_queue_iterator); + const queue_iterator_type _queue_iterator) { + connection::ptr its_connection; + { + std::lock_guard<std::mutex> its_lock(connections_mutex_); + auto connection_iterator = connections_.find(_queue_iterator->first); + if (connection_iterator != connections_.end()) { + connection_iterator->second->send_queued(_queue_iterator); + } else { + VSOMEIP_INFO << "Didn't find connection: " +#ifdef _WIN32 + << _queue_iterator->first.address().to_string() << ":" << std::dec + << static_cast<std::uint16_t>(_queue_iterator->first.port()) +#else + << _queue_iterator->first.path() +#endif + << " dropping outstanding messages (" << std::dec + << _queue_iterator->second.size() << ")."; + _queue_iterator->second.clear(); + } + } + if (its_connection) { + its_connection->send_queued(_queue_iterator); + } } void local_server_endpoint_impl::receive() { @@ -155,13 +173,25 @@ bool local_server_endpoint_impl::get_default_target( void local_server_endpoint_impl::remove_connection( local_server_endpoint_impl::connection *_connection) { - std::lock_guard<std::mutex> its_lock(connections_mutex_); - for (auto it = connections_.begin(); it != connections_.end();) { - if (it->second.get() == _connection) { - it = connections_.erase(it); - break; - } else { - ++it; + endpoint_type its_target; + { + std::lock_guard<std::mutex> its_lock(connections_mutex_); + for (auto it = connections_.begin(); it != connections_.end();) { + if (it->second.get() == _connection) { + its_target = it->first; + it = connections_.erase(it); + break; + } else { + ++it; + } + } + } + { + // delete outstanding responses for this connection as well + std::lock_guard<std::mutex> its_lock(mutex_); + const auto found_target = queues_.find(its_target); + if (found_target != queues_.end()) { + found_target->second.clear(); } } } @@ -318,7 +348,7 @@ void local_server_endpoint_impl::connection::stop() { } void local_server_endpoint_impl::connection::send_queued( - queue_iterator_type _queue_iterator) { + const queue_iterator_type _queue_iterator) { // TODO: We currently do _not_ use the send method of the local server // endpoints. If we ever need it, we need to add the "start tag", "data", @@ -355,9 +385,6 @@ void local_server_endpoint_impl::connection::send_queued( } } -void local_server_endpoint_impl::connection::send_magic_cookie() { -} - void local_server_endpoint_impl::connection::receive_cbk( boost::system::error_code const &_error, std::size_t _bytes) { diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp index 3d00ca4..39c6be2 100644 --- a/implementation/endpoints/src/server_endpoint_impl.cpp +++ b/implementation/endpoints/src/server_endpoint_impl.cpp @@ -199,16 +199,20 @@ void server_endpoint_impl<Protocol>::connect_cbk( template<typename Protocol>
void server_endpoint_impl<Protocol>::send_cbk(
- queue_iterator_type _queue_iterator, boost::system::error_code const &_error,
- std::size_t _bytes) {
+ const queue_iterator_type _queue_iterator,
+ boost::system::error_code const &_error, std::size_t _bytes) {
(void)_bytes;
+ std::lock_guard<std::mutex> its_lock(mutex_);
if (!_error) {
- std::lock_guard<std::mutex> its_lock(mutex_);
_queue_iterator->second.pop_front();
if (_queue_iterator->second.size() > 0) {
send_queued(_queue_iterator);
}
+ } else {
+ // error: sending of outstanding responses isn't started again
+ // delete remaining outstanding responses
+ _queue_iterator->second.clear();
}
}
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index f91f7f6..e39589a 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -95,7 +95,7 @@ bool tcp_server_endpoint_impl::send_to( return send_intern(its_target, _data, _size, _flush); } -void tcp_server_endpoint_impl::send_queued(queue_iterator_type _queue_iterator) { +void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iterator) { connection::ptr its_connection; { std::lock_guard<std::mutex> its_lock(connections_mutex_); @@ -106,8 +106,9 @@ void tcp_server_endpoint_impl::send_queued(queue_iterator_type _queue_iterator) VSOMEIP_INFO << "Didn't find connection: " << _queue_iterator->first.address().to_string() << ":" << std::dec << static_cast<std::uint16_t>(_queue_iterator->first.port()) - << " dropping message."; - _queue_iterator->second.pop_front(); + << " dropping outstanding messages (" << std::dec + << _queue_iterator->second.size() << ")."; + _queue_iterator->second.clear(); } } if (its_connection) { @@ -139,13 +140,25 @@ bool tcp_server_endpoint_impl::get_default_target(service_t, void tcp_server_endpoint_impl::remove_connection( tcp_server_endpoint_impl::connection *_connection) { - std::lock_guard<std::mutex> its_lock(connections_mutex_); - for (auto it = connections_.begin(); it != connections_.end();) { - if (it->second.get() == _connection) { - it = connections_.erase(it); - break; - } else { - ++it; + endpoint_type its_target; + { + std::lock_guard<std::mutex> its_lock(connections_mutex_); + for (auto it = connections_.begin(); it != connections_.end();) { + if (it->second.get() == _connection) { + its_target = it->first; + it = connections_.erase(it); + break; + } else { + ++it; + } + } + } + { + // delete outstanding responses for this connection as well + std::lock_guard<std::mutex> its_lock(mutex_); + const auto found_target = queues_.find(its_target); + if (found_target != queues_.end()) { + found_target->second.clear(); } } } @@ -295,7 +308,7 @@ void tcp_server_endpoint_impl::connection::stop() { } void tcp_server_endpoint_impl::connection::send_queued( - queue_iterator_type _queue_iterator) { + const queue_iterator_type _queue_iterator) { std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock()); if (!its_server) { VSOMEIP_TRACE << "tcp_server_endpoint_impl::connection::send_queued " @@ -318,7 +331,8 @@ void tcp_server_endpoint_impl::connection::send_queued( boost::asio::async_write(socket_, boost::asio::buffer(*its_buffer), std::bind(&tcp_server_endpoint_base_impl::send_cbk, its_server, - _queue_iterator, std::placeholders::_1, + _queue_iterator, + std::placeholders::_1, std::placeholders::_2)); } } diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index ce1a276..7de96b4 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -120,7 +120,7 @@ bool udp_server_endpoint_impl::send_to( } void udp_server_endpoint_impl::send_queued( - queue_iterator_type _queue_iterator) { + const queue_iterator_type _queue_iterator) { message_buffer_ptr_t its_buffer = _queue_iterator->second.front(); #if 0 std::stringstream msg; diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp index a403ae8..164f922 100644 --- a/implementation/routing/include/event.hpp +++ b/implementation/routing/include/event.hpp @@ -110,6 +110,9 @@ public: bool is_reliable() const; void set_reliable(bool _is_reliable); + bool get_remote_notification_pending(); + void set_remote_notification_pending(bool _value); + private: void update_cbk(boost::system::error_code const &_error); void notify(bool _flush); @@ -151,6 +154,8 @@ private: epsilon_change_func_t epsilon_change_func_; std::atomic<bool> is_reliable_; + + std::atomic<bool> remote_notification_pending_; }; } // namespace vsomeip diff --git a/implementation/routing/include/eventgroupinfo.hpp b/implementation/routing/include/eventgroupinfo.hpp index 6f3e563..9e50e85 100644 --- a/implementation/routing/include/eventgroupinfo.hpp +++ b/implementation/routing/include/eventgroupinfo.hpp @@ -18,6 +18,8 @@ #include <vsomeip/export.hpp> #include <vsomeip/primitive_types.hpp> +#include "types.hpp" + namespace vsomeip { class endpoint_definition; @@ -77,6 +79,13 @@ public: VSOMEIP_EXPORT std::unique_lock<std::mutex> get_subscription_lock(); + VSOMEIP_EXPORT pending_subscription_id_t add_pending_subscription( + pending_subscription_t _pending_subscription); + + VSOMEIP_EXPORT pending_subscription_t remove_pending_subscription( + pending_subscription_id_t _subscription_id); + + VSOMEIP_EXPORT void clear_pending_subscriptions(); private: std::atomic<major_version_t> major_; std::atomic<ttl_t> ttl_; @@ -97,6 +106,10 @@ private: std::atomic<bool> has_reliable_; std::atomic<bool> has_unreliable_; + + std::mutex pending_subscriptions_mutex_; + std::map<pending_subscription_id_t, pending_subscription_t> pending_subscriptions_; + pending_subscription_id_t subscription_id_; }; } // namespace vsomeip diff --git a/implementation/routing/include/routing_manager_host.hpp b/implementation/routing/include/routing_manager_host.hpp index b282d75..c9f8841 100644 --- a/implementation/routing/include/routing_manager_host.hpp +++ b/implementation/routing/include/routing_manager_host.hpp @@ -32,8 +32,8 @@ public: virtual void on_state(state_type_e _state) = 0; virtual void on_message(const std::shared_ptr<message> &&_message) = 0; virtual void on_error(error_code_e _error) = 0; - virtual bool on_subscription(service_t _service, instance_t _instance, eventgroup_t _eventgroup, - client_t _client, bool _subscribed) = 0; + virtual void on_subscription(service_t _service, instance_t _instance, + eventgroup_t _eventgroup, client_t _client, bool _subscribed, std::function<void(bool)> _accepted_cb) = 0; virtual void on_subscription_error(service_t _service, instance_t _instance, eventgroup_t _eventgroup, uint16_t _error) = 0; virtual void on_subscription_status(service_t _service, instance_t _instance, diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp index c153e5e..6969069 100644 --- a/implementation/routing/include/routing_manager_impl.hpp +++ b/implementation/routing/include/routing_manager_impl.hpp @@ -22,6 +22,7 @@ #include "routing_manager_base.hpp" #include "routing_manager_stub_host.hpp" +#include "types.hpp" #include "../../endpoints/include/netlink_connector.hpp" #include "../../service_discovery/include/service_discovery_host.hpp" @@ -117,10 +118,12 @@ public: client_t _client, bool _force, bool _flush); void on_subscribe_nack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, event_t _event); + instance_t _instance, eventgroup_t _eventgroup, event_t _event, + pending_subscription_id_t _subscription_id); void on_subscribe_ack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, event_t _event); + instance_t _instance, eventgroup_t _eventgroup, event_t _event, + pending_subscription_id_t _subscription_id); void on_identify_response(client_t _client, service_t _service, instance_t _instance, bool _reliable); @@ -188,15 +191,12 @@ public: bool _has_reliable, bool _has_unreliable); std::chrono::milliseconds update_routing_info(std::chrono::milliseconds _elapsed); - void on_subscribe(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, - std::shared_ptr<endpoint_definition> _subscriber, - std::shared_ptr<endpoint_definition> _target, - const std::chrono::steady_clock::time_point &_expiration); - bool on_subscribe_accepted(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, - std::shared_ptr<endpoint_definition> _target, - const std::chrono::steady_clock::time_point &_expiration); + remote_subscription_state_e on_remote_subscription( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber, + const std::shared_ptr<endpoint_definition> &_target, + ttl_t _ttl, client_t *_client, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id); void on_unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, std::shared_ptr<endpoint_definition> _target); @@ -222,6 +222,10 @@ public: (void) _offer_type; } + void send_initial_events(service_t _service, instance_t _instance, + eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber); + private: bool deliver_message(const byte_t *_data, length_t _length, instance_t _instance, bool _reliable, bool _is_valid_crc = true); @@ -347,8 +351,6 @@ private: client_t is_specific_endpoint_client(client_t _client, service_t _service, instance_t _instance); std::unordered_set<client_t> get_specific_endpoint_clients(service_t _service, instance_t _instance); - bool remote_service_offered_via_tcp_and_udp(service_t _service, instance_t _instance) const; - std::shared_ptr<routing_manager_stub> stub_; std::shared_ptr<sd::service_discovery> discovery_; @@ -424,8 +426,8 @@ private: std::map<std::tuple<service_t, instance_t, eventgroup_t, client_t>, subscription_state_e> remote_subscription_state_; - std::map<e2exf::data_identifier, std::shared_ptr<e2e::profile::profile_interface::protector>> custom_protectors; - std::map<e2exf::data_identifier, std::shared_ptr<e2e::profile::profile_interface::checker>> custom_checkers; + std::map<e2exf::data_identifier, std::shared_ptr<e2e::profile_interface::protector>> custom_protectors; + std::map<e2exf::data_identifier, std::shared_ptr<e2e::profile_interface::checker>> custom_checkers; }; } // namespace vsomeip diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp index 1daeb23..dcf6152 100644 --- a/implementation/routing/include/routing_manager_proxy.hpp +++ b/implementation/routing/include/routing_manager_proxy.hpp @@ -15,6 +15,7 @@ #include <boost/asio/steady_timer.hpp> #include "routing_manager_base.hpp" +#include "types.hpp" #include <vsomeip/enumeration_types.hpp> #include <vsomeip/handler.hpp> @@ -28,7 +29,7 @@ class logger; class routing_manager_proxy: public routing_manager_base { public: - routing_manager_proxy(routing_manager_host *_host); + routing_manager_proxy(routing_manager_host *_host, bool _client_side_logging); virtual ~routing_manager_proxy(); void init(); @@ -125,10 +126,12 @@ private: subscription_type_e _subscription_type); void send_subscribe_nack(client_t _subscriber, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, event_t _event); + instance_t _instance, eventgroup_t _eventgroup, event_t _event, + pending_subscription_id_t _subscription_id); void send_subscribe_ack(client_t _subscriber, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, event_t _event); + instance_t _instance, eventgroup_t _eventgroup, event_t _event, + pending_subscription_id_t _subscription_id); bool is_field(service_t _service, instance_t _instance, event_t _event) const; @@ -232,7 +235,7 @@ private: boost::asio::steady_timer request_debounce_timer_; bool request_debounce_timer_running_; - bool client_side_logging_; + const bool client_side_logging_; }; } // namespace vsomeip diff --git a/implementation/routing/include/routing_manager_stub.hpp b/implementation/routing/include/routing_manager_stub.hpp index 1737d6f..d4eb962 100644 --- a/implementation/routing/include/routing_manager_stub.hpp +++ b/implementation/routing/include/routing_manager_stub.hpp @@ -61,7 +61,7 @@ public: void send_subscribe(std::shared_ptr<vsomeip::endpoint> _target, client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, - event_t _event, bool _is_remote_subscriber); + event_t _event, pending_subscription_id_t _subscription_id); void send_unsubscribe(std::shared_ptr<vsomeip::endpoint> _target, client_t _client, service_t _service, diff --git a/implementation/routing/include/routing_manager_stub_host.hpp b/implementation/routing/include/routing_manager_stub_host.hpp index ec60c37..065be80 100644 --- a/implementation/routing/include/routing_manager_stub_host.hpp +++ b/implementation/routing/include/routing_manager_stub_host.hpp @@ -9,6 +9,8 @@ #include <boost/asio/io_service.hpp> #include <vsomeip/handler.hpp> +#include "types.hpp" + namespace vsomeip { class routing_manager_stub_host { @@ -44,10 +46,12 @@ public: subscription_type_e _subscription_type) = 0; virtual void on_subscribe_nack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0; + instance_t _instance, eventgroup_t _eventgroup, event_t _event, + pending_subscription_id_t _subscription_id) = 0; virtual void on_subscribe_ack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0; + instance_t _instance, eventgroup_t _eventgroup, event_t _event, + pending_subscription_id_t _subscription_id) = 0; virtual void unsubscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0; diff --git a/implementation/routing/include/types.hpp b/implementation/routing/include/types.hpp index 720e937..16bc565 100644 --- a/implementation/routing/include/types.hpp +++ b/implementation/routing/include/types.hpp @@ -8,12 +8,17 @@ #include <map> #include <memory> +#include <boost/asio/ip/address.hpp> #include <vsomeip/primitive_types.hpp> +#include "../../service_discovery/include/message_impl.hpp" + namespace vsomeip { class serviceinfo; +class endpoint_definition; + typedef std::map<service_t, std::map<instance_t, @@ -33,6 +38,77 @@ enum class registration_type_e : std::uint8_t { DEREGISTER_ON_ERROR = 0x3 }; +struct sd_message_identifier_t { + sd_message_identifier_t(session_t _session, + boost::asio::ip::address _sender, + boost::asio::ip::address _destination, + const std::shared_ptr<sd::message_impl> &_response) : + session_(_session), + sender_(_sender), + destination_(_destination), + response_(_response) { + } + + sd_message_identifier_t() : + session_(0), + sender_(boost::asio::ip::address()), + destination_(boost::asio::ip::address()), + response_(std::shared_ptr<sd::message_impl>()) { + } + + bool operator==(const sd_message_identifier_t &_other) const { + return !(session_ != _other.session_ || + sender_ != _other.sender_ || + destination_ != _other.destination_ || + response_ != _other.response_); + } + + bool operator<(const sd_message_identifier_t &_other) const { + return (session_ < _other.session_ + || (session_ == _other.session_ && sender_ < _other.sender_) + || (session_ == _other.session_ && sender_ == _other.sender_ + && destination_ < _other.destination_) + || (session_ == _other.session_ && sender_ == _other.sender_ + && destination_ == _other.destination_ + && response_ < _other.response_)); + } + + session_t session_; + boost::asio::ip::address sender_; + boost::asio::ip::address destination_; + std::shared_ptr<sd::message_impl> response_; +}; + +struct pending_subscription_t { + pending_subscription_t( + std::shared_ptr<sd_message_identifier_t> _sd_message_identifier, + std::shared_ptr<endpoint_definition> _subscriber, + std::shared_ptr<endpoint_definition> _target, + ttl_t _ttl) : + sd_message_identifier_(_sd_message_identifier), + subscriber_(_subscriber), + target_(_target), + ttl_(_ttl) { + } + pending_subscription_t () : + sd_message_identifier_(std::shared_ptr<sd_message_identifier_t>()), + subscriber_(std::shared_ptr<endpoint_definition>()), + target_(std::shared_ptr<endpoint_definition>()), + ttl_(0) { + } + std::shared_ptr<sd_message_identifier_t> sd_message_identifier_; + std::shared_ptr<endpoint_definition> subscriber_; + std::shared_ptr<endpoint_definition> target_; + ttl_t ttl_; +}; + +enum remote_subscription_state_e : std::uint8_t { + SUBSCRIPTION_ACKED, + SUBSCRIPTION_NACKED, + SUBSCRIPTION_PENDING, + SUBSCRIPTION_ERROR +}; + } // namespace vsomeip diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp index 287c50c..e3dde2c 100644 --- a/implementation/routing/src/event.cpp +++ b/implementation/routing/src/event.cpp @@ -30,7 +30,9 @@ event::event(routing_manager *_routing, bool _is_shadow) : is_shadow_(_is_shadow), is_cache_placeholder_(false), epsilon_change_func_(std::bind(&event::compare, this, - std::placeholders::_1, std::placeholders::_2)) { + std::placeholders::_1, std::placeholders::_2)), + is_reliable_(false), + remote_notification_pending_(false) { } service_t event::get_service() const { @@ -459,4 +461,12 @@ void event::set_reliable(bool _is_reliable) { is_reliable_ = _is_reliable; } +bool event::get_remote_notification_pending() { + return remote_notification_pending_; +} + +void event::set_remote_notification_pending(bool _value) { + remote_notification_pending_ = _value; +} + } // namespace vsomeip diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp index c6635dc..2d54516 100644 --- a/implementation/routing/src/eventgroupinfo.cpp +++ b/implementation/routing/src/eventgroupinfo.cpp @@ -11,17 +11,29 @@ #include "../include/eventgroupinfo.hpp" #include "../include/event.hpp" #include "../../endpoints/include/endpoint_definition.hpp" +#include "../../logging/include/logger.hpp" +#include "../../configuration/include/internal.hpp" namespace vsomeip { -eventgroupinfo::eventgroupinfo() - : major_(DEFAULT_MAJOR), ttl_(DEFAULT_TTL), port_(ILLEGAL_PORT), threshold_(0), - has_reliable_(false), has_unreliable_(false) { +eventgroupinfo::eventgroupinfo() : + major_(DEFAULT_MAJOR), + ttl_(DEFAULT_TTL), + port_(ILLEGAL_PORT), + threshold_(0), + has_reliable_(false), + has_unreliable_(false), + subscription_id_(DEFAULT_SUBSCRIPTION) { } -eventgroupinfo::eventgroupinfo(major_version_t _major, ttl_t _ttl) - : major_(_major), ttl_(_ttl), port_(ILLEGAL_PORT), threshold_(0), - has_reliable_(false), has_unreliable_(false) { +eventgroupinfo::eventgroupinfo(major_version_t _major, ttl_t _ttl) : + major_(_major), + ttl_(_ttl), + port_(ILLEGAL_PORT), + threshold_(0), + has_reliable_(false), + has_unreliable_(false), + subscription_id_(DEFAULT_SUBSCRIPTION) { } eventgroupinfo::~eventgroupinfo() { @@ -202,4 +214,35 @@ std::unique_lock<std::mutex> eventgroupinfo::get_subscription_lock() { return std::unique_lock<std::mutex>(subscription_mutex_); } +pending_subscription_id_t eventgroupinfo::add_pending_subscription( + pending_subscription_t _pending_subscription) { + std::lock_guard<std::mutex> its_lock(pending_subscriptions_mutex_); + if (++subscription_id_ == DEFAULT_SUBSCRIPTION) { + subscription_id_++; + } + pending_subscriptions_[subscription_id_] = _pending_subscription; + return subscription_id_; +} + +pending_subscription_t eventgroupinfo::remove_pending_subscription( + pending_subscription_id_t _subscription_id) { + std::lock_guard<std::mutex> its_lock(pending_subscriptions_mutex_); + pending_subscription_t its_pending_subscription; + const auto found_pending_subscription = pending_subscriptions_.find( + _subscription_id); + if (found_pending_subscription != pending_subscriptions_.end()) { + its_pending_subscription = found_pending_subscription->second; + pending_subscriptions_.erase(found_pending_subscription); + } else { + VSOMEIP_ERROR << __func__ << " didn't find pending_subscription: " + << _subscription_id;; + } + return its_pending_subscription; +} + +void eventgroupinfo::clear_pending_subscriptions() { + std::lock_guard<std::mutex> its_lock(pending_subscriptions_mutex_); + pending_subscriptions_.clear(); +} + } // namespace vsomeip diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index 3aa33f5..7b45889 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -65,6 +65,14 @@ bool routing_manager_base::offer_service(client_t _client, service_t _service, its_info->set_ttl(DEFAULT_TTL); } else { host_->on_error(error_code_e::SERVICE_PROPERTY_MISMATCH); + VSOMEIP_ERROR << "rm_base::offer_service service property mismatch (" + << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << ":" + << std::dec << static_cast<std::uint32_t>(its_info->get_major()) << ":" + << std::dec << its_info->get_minor() << "] passed: " + << std::dec << static_cast<std::uint32_t>(_major) << ":" + << std::dec << _minor; return false; } } else { @@ -127,6 +135,14 @@ void routing_manager_base::request_service(client_t _client, service_t _service, its_info->add_client(_client); } else { host_->on_error(error_code_e::SERVICE_PROPERTY_MISMATCH); + VSOMEIP_ERROR << "rm_base::request_service service property mismatch (" + << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << ":" + << std::dec << static_cast<std::uint32_t>(its_info->get_major()) << ":" + << std::dec << its_info->get_minor() << "] passed: " + << std::dec << static_cast<std::uint32_t>(_major) << ":" + << std::dec << _minor; } } } @@ -701,7 +717,7 @@ void routing_manager_base::remove_local(client_t _client) { auto subscriptions = get_subscriptions(_client); for (auto its_subscription : subscriptions) { host_->on_subscription(std::get<0>(its_subscription), std::get<1>(its_subscription), - std::get<2>(its_subscription), _client, false); + std::get<2>(its_subscription), _client, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; }); routing_manager_base::unsubscribe(_client, std::get<0>(its_subscription), std::get<1>(its_subscription), std::get<2>(its_subscription), ANY_EVENT); } diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 594d687..2405b8a 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -108,23 +108,23 @@ void routing_manager_impl::init() { auto its_cfg = identifier.second; if(its_cfg->profile == "CRC8") { e2exf::data_identifier its_data_identifier = {its_cfg->service_id, its_cfg->event_id}; - e2e::profile::profile01::Config its_profile_config = e2e::profile::profile01::Config(its_cfg->crc_offset, its_cfg->data_id, - (e2e::profile::profile01::p01_data_id_mode) its_cfg->data_id_mode, its_cfg->data_length, its_cfg->counter_offset, its_cfg->data_id_nibble_offset); + e2e::profile01::profile_config its_profile_config = e2e::profile01::profile_config(its_cfg->crc_offset, its_cfg->data_id, + (e2e::profile01::p01_data_id_mode) its_cfg->data_id_mode, its_cfg->data_length, its_cfg->counter_offset, its_cfg->data_id_nibble_offset); if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) { - custom_protectors[its_data_identifier] = std::make_shared<e2e::profile::profile01::protector>(its_profile_config); + custom_protectors[its_data_identifier] = std::make_shared<e2e::profile01::protector>(its_profile_config); } if ((its_cfg->variant == "checker") || (its_cfg->variant == "both")) { - custom_checkers[its_data_identifier] = std::make_shared<e2e::profile::profile01::profile_01_checker>(its_profile_config); + custom_checkers[its_data_identifier] = std::make_shared<e2e::profile01::profile_01_checker>(its_profile_config); } } else if(its_cfg->profile == "CRC32") { e2exf::data_identifier its_data_identifier = {its_cfg->service_id, its_cfg->event_id}; - e2e::profile::profile_custom::Config its_profile_config = e2e::profile::profile_custom::Config(its_cfg->crc_offset); + e2e::profile_custom::profile_config its_profile_config = e2e::profile_custom::profile_config(its_cfg->crc_offset); if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) { - custom_protectors[its_data_identifier] = std::make_shared<e2e::profile::profile_custom::protector>(its_profile_config); + custom_protectors[its_data_identifier] = std::make_shared<e2e::profile_custom::protector>(its_profile_config); } if ((its_cfg->variant == "checker") || (its_cfg->variant == "both")) { - custom_checkers[its_data_identifier] = std::make_shared<e2e::profile::profile_custom::profile_custom_checker>(its_profile_config); + custom_checkers[its_data_identifier] = std::make_shared<e2e::profile_custom::profile_custom_checker>(its_profile_config); } } } @@ -264,7 +264,6 @@ void routing_manager_impl::stop_offer_service(client_t _client, } } - routing_manager_base::stop_offer_service(_client, _service, _instance, _major, _minor); on_stop_offer_service(_client, _service, _instance, _major, _minor); stub_->on_stop_offer_service(_client, _service, _instance, _major, _minor); on_availability(_service, _instance, false, _major, _minor); @@ -391,19 +390,24 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, << std::dec << (uint16_t)_major << "]"; const client_t its_local_client = find_local_client(_service, _instance); if (get_client() == its_local_client) { - bool subscription_accepted = host_->on_subscription(_service, _instance, _eventgroup, _client, true); - (void) find_or_create_local(_client); - if (!subscription_accepted) { - stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup, _event); - VSOMEIP_INFO << "Subscription request from client: 0x" << std::hex - << _client << std::dec << " for eventgroup: 0x" << _eventgroup - << " rejected from application handler."; - return; - } else { - stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event); - } - routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _event, _subscription_type); - send_pending_notify_ones(_service, _instance, _eventgroup, _client); + auto self = shared_from_this(); + host_->on_subscription(_service, _instance, _eventgroup, _client, true, + [this, self, _client, _service, _instance, _eventgroup, + _event, _major, _subscription_type] + (const bool _subscription_accepted) { + (void) find_or_create_local(_client); + if (!_subscription_accepted) { + stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup, _event); + VSOMEIP_INFO << "Subscription request from client: 0x" << std::hex + << _client << std::dec << " for eventgroup: 0x" << _eventgroup + << " rejected from application handler."; + return; + } else { + stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event); + } + routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _event, _subscription_type); + send_pending_notify_ones(_service, _instance, _eventgroup, _client); + }); } else { if (discovery_) { client_t subscriber = VSOMEIP_ROUTING_CLIENT; @@ -441,7 +445,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, } else { if (is_available(_service, _instance, _major)) { stub_->send_subscribe(find_local(_service, _instance), - _client, _service, _instance, _eventgroup, _major, _event, false); + _client, _service, _instance, _eventgroup, _major, _event, DEFAULT_SUBSCRIPTION); } } } else if (its_eventgroup) { @@ -486,7 +490,7 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, } if (discovery_) { - host_->on_subscription(_service, _instance, _eventgroup, _client, false); + host_->on_subscription(_service, _instance, _eventgroup, _client, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; }); if (0 == find_local_client(_service, _instance)) { client_t subscriber = is_specific_endpoint_client(_client, _service, _instance); if (last_subscriber_removed) { @@ -600,7 +604,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, // TODO: find out how to handle session id here is_sent = deliver_message(_data, _size, _instance, _reliable, _is_valid_crc); } else { - buffer::e2e_buffer outputBuffer; + e2e_buffer outputBuffer; if( configuration_->is_e2e_enabled()) { if ( !is_service_discovery) { service_t its_service = VSOMEIP_BYTES_TO_WORD( @@ -609,7 +613,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); if( custom_protectors.count({its_service, its_method})) { outputBuffer.assign(_data, _data + VSOMEIP_PAYLOAD_POS); - buffer::e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data +_size); + e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data +_size); custom_protectors[{its_service, its_method}]->protect( inputBuffer); outputBuffer.resize(inputBuffer.size() + VSOMEIP_PAYLOAD_POS); std::copy(inputBuffer.begin(), inputBuffer.end(), outputBuffer.begin() + VSOMEIP_PAYLOAD_POS); @@ -632,7 +636,15 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, #endif is_sent = its_target->send(_data, _size, _flush); } else { - VSOMEIP_ERROR<< "Routing error. Client from remote service could not be found!"; + const session_t its_session = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_SESSION_POS_MIN], + _data[VSOMEIP_SESSION_POS_MAX]); + VSOMEIP_ERROR<< "Routing info for remote service could not be found! (" + << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "] " + << std::hex << std::setw(4) << std::setfill('0') << its_session; } } else { std::shared_ptr<serviceinfo> its_info(find_service(its_service, _instance)); @@ -735,16 +747,29 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, #endif is_sent = its_target->send(_data, _size, _flush); } else { - VSOMEIP_ERROR << "Routing error. Endpoint for service [" - << std::hex << its_service << "." << _instance - << "] could not be found!"; + const session_t its_session = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_SESSION_POS_MIN], + _data[VSOMEIP_SESSION_POS_MAX]); + VSOMEIP_ERROR << "Routing error. Endpoint for service (" + << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "] " + << std::hex << std::setw(4) << std::setfill('0') << its_session + << " could not be found!"; } } } else { if (!is_notification) { - VSOMEIP_ERROR << "Routing error. Not hosting service [" - << std::hex << its_service << "." << _instance - << "]"; + const session_t its_session = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_SESSION_POS_MIN], + _data[VSOMEIP_SESSION_POS_MAX]); + VSOMEIP_ERROR << "Routing error. Not hosting service (" + << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "] " + << std::hex << std::setw(4) << std::setfill('0') << its_session; } } } @@ -763,7 +788,7 @@ bool routing_manager_impl::send_to( if (serializer_->serialize(_message.get())) { const byte_t *_data = serializer_->get_data(); length_t _size = serializer_->get_size(); - buffer::e2e_buffer outputBuffer; + e2e_buffer outputBuffer; if( configuration_->is_e2e_enabled()) { service_t its_service = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); @@ -771,7 +796,7 @@ bool routing_manager_impl::send_to( _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); if( custom_protectors.count({its_service, its_method})) { outputBuffer.assign(_data, _data + VSOMEIP_PAYLOAD_POS); - buffer::e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data +_size); + e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data +_size); custom_protectors[{its_service, its_method}]->protect( inputBuffer); outputBuffer.resize(inputBuffer.size() + VSOMEIP_PAYLOAD_POS); std::copy(inputBuffer.begin(), inputBuffer.end(), outputBuffer.begin() + VSOMEIP_PAYLOAD_POS); @@ -1041,11 +1066,11 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); if( custom_checkers.count({its_service, its_method})) { - buffer::e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data + _size); - e2e::profile::profile_interface::generic_check_status check_status; + e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data + _size); + e2e::profile_interface::generic_check_status check_status; custom_checkers[{its_service, its_method}]->check( inputBuffer, check_status); - if ( check_status != e2e::profile::profile_interface::generic_check_status::E2E_OK ) { + if ( check_status != e2e::profile_interface::generic_check_status::E2E_OK ) { VSOMEIP_INFO << std::hex << "E2E protection: CRC check failed for service: " << its_service << " method: " << its_method; its_is_crc_valid = false; } @@ -1138,42 +1163,48 @@ void routing_manager_impl::on_notification(client_t _client, if (its_event->is_set()) { its_event->set_payload(its_payload, false, true); } else { - // Set payload first time ~> notify all remote subscriber per unicast (initial field) - std::vector<std::unique_lock<std::mutex>> its_locks; - std::vector<std::shared_ptr<eventgroupinfo>> its_eventgroupinfos; - for (auto its_group : its_event->get_eventgroups()) { - auto its_eventgroup = find_eventgroup(_service, _instance, its_group); - if (its_eventgroup) { - its_locks.push_back(its_eventgroup->get_subscription_lock()); - its_eventgroupinfos.push_back(its_eventgroup); + // new subscribers will be notified by SD via sent_initiale_events; + if (its_event->get_remote_notification_pending()) { + // Set payload first time ~> notify all remote subscriber per unicast (initial field) + std::vector<std::unique_lock<std::mutex>> its_locks; + std::vector<std::shared_ptr<eventgroupinfo>> its_eventgroupinfos; + for (auto its_group : its_event->get_eventgroups()) { + auto its_eventgroup = find_eventgroup(_service, _instance, its_group); + if (its_eventgroup) { + its_locks.push_back(its_eventgroup->get_subscription_lock()); + its_eventgroupinfos.push_back(its_eventgroup); + } } - } - bool is_offered_both(false); - if (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT && - configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT) { - is_offered_both = true; - } - for (const auto &its_eventgroup : its_eventgroupinfos) { - //Unicast targets - for (auto its_remote : its_eventgroup->get_targets()) { - if (!is_offered_both) { - its_event->set_payload(its_payload, its_remote.endpoint_, true, true); - } else { - bool was_set(false); - if (its_event->is_reliable() && its_remote.endpoint_->is_reliable()) { - its_event->set_payload(its_payload, its_remote.endpoint_, true, true); - was_set = true; - } - if (!its_event->is_reliable() && !its_remote.endpoint_->is_reliable()) { + bool is_offered_both(false); + if (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT && + configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT) { + is_offered_both = true; + } + for (const auto &its_eventgroup : its_eventgroupinfos) { + //Unicast targets + for (auto its_remote : its_eventgroup->get_targets()) { + if (!is_offered_both) { its_event->set_payload(its_payload, its_remote.endpoint_, true, true); - was_set = true; - } - if (!was_set) { - its_event->set_payload_dont_notify(its_payload); + } else { + bool was_set(false); + if (its_event->is_reliable() && its_remote.endpoint_->is_reliable()) { + its_event->set_payload(its_payload, its_remote.endpoint_, true, true); + was_set = true; + } + if (!its_event->is_reliable() && !its_remote.endpoint_->is_reliable()) { + its_event->set_payload(its_payload, its_remote.endpoint_, true, true); + was_set = true; + } + if (!was_set) { + its_event->set_payload_dont_notify(its_payload); + } } } } + its_event->set_remote_notification_pending(false); + } else { + its_event->set_payload_dont_notify(its_payload); } } } else { @@ -1335,23 +1366,9 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se } } } - std::map<event_t, std::shared_ptr<event> > events; - { - std::unique_lock<std::mutex> its_lock(events_mutex_); - auto its_events_service = events_.find(_service); - if (its_events_service != events_.end()) { - auto its_events_instance = its_events_service->second.find(_instance); - if (its_events_instance != its_events_service->second.end()) { - for (auto &e : its_events_instance->second) { - events[e.first] = e.second; - } - } - } - } - for (auto &e : events) { - e.second->unset_payload(); - e.second->clear_subscribers(); - } + + routing_manager_base::stop_offer_service(_client, _service, _instance, + _major, _minor); /** * Hold reliable & unreliable server-endpoints from service info @@ -1813,38 +1830,48 @@ std::shared_ptr<endpoint> routing_manager_impl::create_remote_client( service_t _service, instance_t _instance, bool _reliable, client_t _client) { std::shared_ptr<endpoint> its_endpoint; std::shared_ptr<endpoint_definition> its_endpoint_def; - uint16_t its_local_port; - if (configuration_->get_client_port(_service, _instance, _reliable, - used_client_ports_, its_local_port)) { - auto found_service = remote_service_info_.find(_service); - if (found_service != remote_service_info_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_reliability = found_instance->second.find(_reliable); - if (found_reliability != found_instance->second.end()) { - its_endpoint_def = found_reliability->second; - its_endpoint = create_client_endpoint( - its_endpoint_def->get_address(), - its_local_port, - its_endpoint_def->get_port(), - _reliable, _client - ); - } + uint16_t its_remote_port = ILLEGAL_PORT; + + auto found_service = remote_service_info_.find(_service); + if (found_service != remote_service_info_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto found_reliability = found_instance->second.find(_reliable); + if (found_reliability != found_instance->second.end()) { + its_endpoint_def = found_reliability->second; + its_remote_port = its_endpoint_def->get_port(); } } - if (its_endpoint) { - used_client_ports_[_reliable].insert(its_local_port); - service_instances_[_service][its_endpoint.get()] = _instance; - remote_services_[_service][_instance][_client][_reliable] = its_endpoint; - if (_client == VSOMEIP_ROUTING_CLIENT) { - client_endpoints_by_ip_[its_endpoint_def->get_address()] - [its_endpoint_def->get_port()] - [_reliable] = its_endpoint; - // Set the basic route to the service in the service info - auto found_service_info = find_service(_service, _instance); - if (found_service_info) { - found_service_info->set_endpoint(its_endpoint, _reliable); + } + + if( its_remote_port != ILLEGAL_PORT) { + // if client port range for remote service port range is configured + // and remote port is in range, determine unused client port + if (configuration_->get_client_port(_service, _instance, its_remote_port, _reliable, + used_client_ports_, its_local_port)) { + if(its_endpoint_def) { + its_endpoint = create_client_endpoint( + its_endpoint_def->get_address(), + its_local_port, + its_endpoint_def->get_port(), + _reliable, _client + ); + } + + if (its_endpoint) { + used_client_ports_[_reliable].insert(its_local_port); + service_instances_[_service][its_endpoint.get()] = _instance; + remote_services_[_service][_instance][_client][_reliable] = its_endpoint; + if (_client == VSOMEIP_ROUTING_CLIENT) { + client_endpoints_by_ip_[its_endpoint_def->get_address()] + [its_endpoint_def->get_port()] + [_reliable] = its_endpoint; + // Set the basic route to the service in the service info + auto found_service_info = find_service(_service, _instance); + if (found_service_info) { + found_service_info->set_endpoint(its_endpoint, _reliable); + } } } } @@ -2203,6 +2230,7 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst if (found_instance != found_service->second.end()) { for (auto &its_eventgroup : found_instance->second) { its_eventgroup.second->clear_targets(); + its_eventgroup.second->clear_pending_subscriptions(); } } } @@ -2215,9 +2243,9 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); auto found_service = remote_subscribers_.find(_service); if (found_service != remote_subscribers_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - found_instance->second.clear(); + if (found_service->second.erase(_instance) > 0 && + !found_service->second.size()) { + remote_subscribers_.erase(found_service); } } } @@ -2337,6 +2365,12 @@ void routing_manager_impl::expire_subscriptions(const boost::asio::ip::address & std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); for (auto &its_service : eventgroups_) { for (auto &its_instance : its_service.second) { + const client_t its_hosting_client = find_local_client( + its_service.first, its_instance.first); + const bool service_offered_by_host = (its_hosting_client + == host_->get_client()); + auto target = find_local(its_hosting_client); + for (auto &its_eventgroup : its_instance.second) { std::set<std::shared_ptr<endpoint_definition>> its_invalid_endpoints; for (auto &its_target : its_eventgroup.second->get_targets()) { @@ -2346,11 +2380,24 @@ void routing_manager_impl::expire_subscriptions(const boost::asio::ip::address & for (auto &its_endpoint : its_invalid_endpoints) { its_eventgroup.second->remove_target(its_endpoint); - auto target = find_local(its_service.first, its_instance.first); + client_t its_client = find_client(its_service.first, + its_instance.first, its_eventgroup.second, + its_endpoint); + clear_remote_subscriber(its_service.first, + its_instance.first, its_client, its_endpoint); + if (target) { - stub_->send_unsubscribe(target, VSOMEIP_ROUTING_CLIENT, its_service.first, + stub_->send_unsubscribe(target, its_client, its_service.first, its_instance.first, its_eventgroup.first, ANY_EVENT, true); } + if (service_offered_by_host) { + host_->on_subscription(its_service.first, + its_instance.first, its_eventgroup.first, + its_client, false, + [](const bool _subscription_accepted){ + (void)_subscription_accepted; + }); + } } if(its_eventgroup.second->is_multicast() && its_invalid_endpoints.size() && 0 == its_eventgroup.second->get_unreliable_target_count() ) { @@ -2391,143 +2438,89 @@ void routing_manager_impl::init_routing_info() { } } -bool routing_manager_impl::on_subscribe_accepted(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, const std::shared_ptr<endpoint_definition> _target, - const std::chrono::steady_clock::time_point &_expiration) { +remote_subscription_state_e routing_manager_impl::on_remote_subscription( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber, + const std::shared_ptr<endpoint_definition> &_target, + ttl_t _ttl, client_t *_client, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) { std::shared_ptr<eventgroupinfo> its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); - if (its_eventgroup) { - client_t client = VSOMEIP_ROUTING_CLIENT; - if (!_target->is_reliable()) { - if (!its_eventgroup->is_multicast()) { - uint16_t unreliable_port = configuration_->get_unreliable_port(_service, _instance); - _target->set_remote_port(unreliable_port); - auto endpoint = find_server_endpoint(unreliable_port, false); - if (endpoint) { - client = std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint)-> - get_client(_target); - } - } - } - else { - uint16_t reliable_port = configuration_->get_reliable_port(_service, _instance); - _target->set_remote_port(reliable_port); - auto endpoint = find_server_endpoint(reliable_port, true); - if (endpoint) { - client = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(endpoint)-> - get_client(_target); - } - } - - if (its_eventgroup->update_target(_target, _expiration)) { - return true; - } - - if (!host_->on_subscription(_service, _instance, _eventgroup, client, true)) { - VSOMEIP_INFO << "Subscription request from client: 0x" << std::hex - << client << " for eventgroup: 0x" << _eventgroup << std::dec - << " rejected from application handler."; - return false; - } - - VSOMEIP_INFO << "REMOTE SUBSCRIBE(" - << std::hex << std::setw(4) << std::setfill('0') << client <<"): [" + if (!its_eventgroup) { + VSOMEIP_ERROR << "REMOTE SUBSCRIBE: attempt to subscribe to unknown eventgroup [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" - << " from " << _target->get_address().to_string() - << ":" << std::dec <<_target->get_port() - << (_target->is_reliable() ? " reliable" : " unreliable"); - - { - std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); - remote_subscribers_[_service][_instance][client].insert(_target); + << " from " << _subscriber->get_address().to_string() + << ":" << std::dec << _subscriber->get_port() + << (_subscriber->is_reliable() ? " reliable" : " unreliable"); + return remote_subscription_state_e::SUBSCRIPTION_ERROR; + } + // find out client id for selective subscriber + if (!_subscriber->is_reliable()) { + uint16_t unreliable_port = configuration_->get_unreliable_port(_service, _instance); + _subscriber->set_remote_port(unreliable_port); + if (!its_eventgroup->is_multicast()) { + auto endpoint = find_server_endpoint(unreliable_port, false); + if (endpoint) { + *_client = std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint)-> + get_client(_subscriber); + } } } else { - VSOMEIP_ERROR << "REMOTE SUBSCRIBE: attempt to subscribe to unknown eventgroup [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" - << " from " << _target->get_address().to_string() - << ":" << std::dec <<_target->get_port() - << (_target->is_reliable() ? " reliable" : " unreliable"); - return false; + uint16_t reliable_port = configuration_->get_reliable_port(_service, _instance); + _subscriber->set_remote_port(reliable_port); + auto endpoint = find_server_endpoint(reliable_port, true); + if (endpoint) { + *_client = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(endpoint)-> + get_client(_subscriber); + } } - return true; -} - -void routing_manager_impl::on_subscribe( - service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr<endpoint_definition> _subscriber, - std::shared_ptr<endpoint_definition> _target, - const std::chrono::steady_clock::time_point &_expiration) { - - std::shared_ptr<eventgroupinfo> its_eventgroup - = find_eventgroup(_service, _instance, _eventgroup); - if (its_eventgroup) { - std::unique_lock<std::mutex> its_subscriptions_lock(its_eventgroup->get_subscription_lock()); - // IP address of target is a multicast address if the event is in a multicast eventgroup - bool target_added(false); - if (its_eventgroup->is_multicast() && !_subscriber->is_reliable()) { - // Event is in multicast eventgroup and subscribe for UDP - target_added = its_eventgroup->add_target({ _target, _expiration }, {_subscriber, _expiration}); - - // If the target is multicast, we need to set the remote port - // of the unicast(!) here, as its only done in on_subscribe_accepted - // for unicast subscribes and it needs to be done before calling - // notify_one on the events. - uint16_t unreliable_port = - configuration_->get_unreliable_port(_service, _instance); - _subscriber->set_remote_port(unreliable_port); - } - else { - // subscribe for TCP or UDP - target_added = its_eventgroup->add_target({ _target, _expiration }); - } - - if (target_added) { // unicast or multicast - client_t client = VSOMEIP_ROUTING_CLIENT; - if (!_subscriber->is_reliable()) { - if (!its_eventgroup->is_multicast()) { - uint16_t unreliable_port = configuration_->get_unreliable_port(_service, _instance); - auto endpoint = find_server_endpoint(unreliable_port, false); - if (endpoint) { - client = std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint)-> - get_client(_subscriber); - } - } - } else { - uint16_t reliable_port = configuration_->get_reliable_port(_service, _instance); - auto endpoint = find_server_endpoint(reliable_port, true); - if (endpoint) { - client = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(endpoint)-> - get_client(_subscriber); - } - } - // send initial events if we already have a cached field (is_set) - for (auto its_event : its_eventgroup->get_events()) { - bool is_offered_both(false); - if (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT && - configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT) { - is_offered_both = true; - } - if (its_event->is_field() && its_event->is_set()) { - if (!is_offered_both) { - its_event->notify_one(_subscriber, true); + const std::chrono::steady_clock::time_point its_expiration = + std::chrono::steady_clock::now() + std::chrono::seconds(_ttl); + if (its_eventgroup->update_target(_subscriber, its_expiration)) { + return remote_subscription_state_e::SUBSCRIPTION_ACKED; + } else { + const pending_subscription_id_t its_subscription_id = + its_eventgroup->add_pending_subscription( + pending_subscription_t(_sd_message_id, + _subscriber, _target, _ttl)); + if (host_->get_client() == find_local_client(_service, _instance)) { + auto self = shared_from_this(); + const client_t its_client = *_client; + host_->on_subscription(_service, _instance, _eventgroup, *_client, true, + [this, self, _service, _instance, + _eventgroup, its_client, its_subscription_id] + (const bool _subscription_accepted) { + try { + if (!_subscription_accepted) { + const auto its_callback = std::bind( + &routing_manager_stub_host::on_subscribe_nack, + std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), + its_client, _service, _instance, + _eventgroup, ANY_EVENT, its_subscription_id); + io_.post(its_callback); } else { - if (its_event->is_reliable() && _subscriber->is_reliable()) { - its_event->notify_one(_subscriber, true); - } - if (!its_event->is_reliable() && !_subscriber->is_reliable()) { - its_event->notify_one(_subscriber, true); - } + const auto its_callback = std::bind( + &routing_manager_stub_host::on_subscribe_ack, + std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()), + its_client, _service, _instance, + _eventgroup, ANY_EVENT, its_subscription_id); + io_.post(its_callback); } + } catch (const std::exception &e) { + VSOMEIP_ERROR << __func__ << e.what(); } - } - stub_->send_subscribe(find_local(_service, _instance), - client, _service, _instance, _eventgroup, its_eventgroup->get_major(), ANY_EVENT, true); + }); + return remote_subscription_state_e::SUBSCRIPTION_PENDING; + } else { // service hosted by local client + stub_->send_subscribe(find_local(_service, _instance), *_client, + _service, _instance, _eventgroup, + its_eventgroup->get_major(), ANY_EVENT, its_subscription_id); + return remote_subscription_state_e::SUBSCRIPTION_PENDING; } } + return remote_subscription_state_e::SUBSCRIPTION_ERROR; } void routing_manager_impl::on_unsubscribe(service_t _service, @@ -2544,7 +2537,7 @@ void routing_manager_impl::on_unsubscribe(service_t _service, stub_->send_unsubscribe(find_local(_service, _instance), its_client, _service, _instance, _eventgroup, ANY_EVENT, true); - host_->on_subscription(_service, _instance, _eventgroup, its_client, false); + host_->on_subscription(_service, _instance, _eventgroup, its_client, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; }); if (its_eventgroup->get_targets().size() == 0) { std::set<std::shared_ptr<event> > its_events @@ -2621,13 +2614,15 @@ void routing_manager_impl::on_subscribe_ack(service_t _service, void routing_manager_impl::on_subscribe_ack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event) { + event_t _event, pending_subscription_id_t _subscription_id) { client_t its_client = is_specific_endpoint_client(_client, _service, _instance); bool specific_endpoint_client = its_client != VSOMEIP_ROUTING_CLIENT; auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); if (its_eventgroup) { std::unique_lock<std::mutex> eventgroup_lock(its_eventgroup->get_subscription_lock()); - { + if (_subscription_id == DEFAULT_SUBSCRIPTION) { + // ACK coming in via SD from remote or as answer to a subscription + // of the application hosting the rm_impl to a local service auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, its_client); std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); auto its_state = remote_subscription_state_.find(its_tuple); @@ -2638,6 +2633,57 @@ void routing_manager_impl::on_subscribe_ack(client_t _client, } } remote_subscription_state_[its_tuple] = subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED; + } else { // ACK sent back from local client as answer to a remote subscription + if (find_local_client(_service, _instance) == VSOMEIP_ROUTING_CLIENT) { + // service was stopped while subscription was pending + // send subscribe_nack back instead + on_subscribe_nack(_client, _service, _instance, _eventgroup, + _event, _subscription_id); + return; + } + if (discovery_) { + pending_subscription_t its_sd_message_id = + its_eventgroup->remove_pending_subscription( + _subscription_id); + if (its_sd_message_id.sd_message_identifier_ + && its_sd_message_id.subscriber_ + && its_sd_message_id.target_) { + { + std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_); + remote_subscribers_[_service][_instance][_client].insert(its_sd_message_id.subscriber_); + } + const std::chrono::steady_clock::time_point its_expiration = + std::chrono::steady_clock::now() + + std::chrono::seconds( + its_sd_message_id.ttl_); + // IP address of target is a multicast address if the event is in a multicast eventgroup + if (its_eventgroup->is_multicast() + && !its_sd_message_id.subscriber_->is_reliable()) { + // Event is in multicast eventgroup and subscribe for UDP + its_eventgroup->add_target( + { its_sd_message_id.target_, its_expiration }, + { its_sd_message_id.subscriber_, its_expiration }); + } else { + // subscribe for TCP or UDP + its_eventgroup->add_target( + { its_sd_message_id.subscriber_, its_expiration }); + } + discovery_->remote_subscription_acknowledge(_service, + _instance, _eventgroup, _client, true, + its_sd_message_id.sd_message_identifier_); + + VSOMEIP_INFO << "REMOTE SUBSCRIBE(" + << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " from " << its_sd_message_id.subscriber_->get_address().to_string() + << ":" << std::dec << its_sd_message_id.subscriber_->get_port() + << (its_sd_message_id.subscriber_->is_reliable() ? " reliable" : " unreliable") + << " was accepted"; + } + } + return; } if (specific_endpoint_client) { @@ -2675,13 +2721,15 @@ void routing_manager_impl::on_subscribe_ack(client_t _client, void routing_manager_impl::on_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event) { + event_t _event, pending_subscription_id_t _subscription_id) { client_t its_client = is_specific_endpoint_client(_client, _service, _instance); bool specific_endpoint_client = its_client != VSOMEIP_ROUTING_CLIENT; auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); if (its_eventgroup) { std::unique_lock<std::mutex> eventgroup_lock(its_eventgroup->get_subscription_lock()); - { + if (_subscription_id == DEFAULT_SUBSCRIPTION) { + // NACK coming in via SD from remote or as answer to a subscription + // of the application hosting the rm_impl to a local service auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, its_client); std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_); auto its_state = remote_subscription_state_.find(its_tuple); @@ -2692,6 +2740,25 @@ void routing_manager_impl::on_subscribe_nack(client_t _client, } } remote_subscription_state_[its_tuple] = subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED; + } else { // NACK sent back from local client as answer to a remote subscription + if (discovery_) { + pending_subscription_t its_sd_message_id = + its_eventgroup->remove_pending_subscription(_subscription_id); + if (its_sd_message_id.sd_message_identifier_ && its_sd_message_id.subscriber_) { + discovery_->remote_subscription_acknowledge(_service, _instance, + _eventgroup, _client, false, its_sd_message_id.sd_message_identifier_); + VSOMEIP_INFO << "REMOTE SUBSCRIBE(" + << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " from " << its_sd_message_id.subscriber_->get_address().to_string() + << ":" << std::dec <<its_sd_message_id.subscriber_->get_port() + << (its_sd_message_id.subscriber_->is_reliable() ? " reliable" : " unreliable") + << " was not accepted"; + } + } + return; } if (specific_endpoint_client) { if (_client == get_client()) { @@ -3204,6 +3271,8 @@ routing_manager_impl::expire_subscriptions() { for (auto &its_service : eventgroups_) { for (auto &its_instance : its_service.second) { + const client_t its_hosting_client = find_local_client( + its_service.first, its_instance.first); for (auto &its_eventgroup : its_instance.second) { std::set<std::shared_ptr<endpoint_definition>> its_expired_endpoints; for (auto &its_target : its_eventgroup.second->get_targets()) { @@ -3213,6 +3282,12 @@ routing_manager_impl::expire_subscriptions() { next_expiration = its_target.expiration_; } } + std::shared_ptr<endpoint> target; + bool service_offered_by_host(false); + if (its_expired_endpoints.size()) { + target = find_local(its_hosting_client); + service_offered_by_host = (its_hosting_client == host_->get_client()); + } for (auto its_endpoint : its_expired_endpoints) { its_eventgroup.second->remove_target(its_endpoint); @@ -3223,11 +3298,18 @@ routing_manager_impl::expire_subscriptions() { clear_remote_subscriber(its_service.first, its_instance.first, its_client, its_endpoint); - auto target = find_local(its_service.first, its_instance.first); if (target) { - stub_->send_unsubscribe(target, VSOMEIP_ROUTING_CLIENT, its_service.first, + stub_->send_unsubscribe(target, its_client, its_service.first, its_instance.first, its_eventgroup.first, ANY_EVENT, true); } + if (service_offered_by_host) { + host_->on_subscription(its_service.first, + its_instance.first, its_eventgroup.first, + its_client, false, + [](const bool _subscription_accepted){ + (void)_subscription_accepted; + }); + } VSOMEIP_INFO << "Expired subscription (" << std::hex << its_service.first << "." @@ -3671,7 +3753,7 @@ void routing_manager_impl::send_subscribe(client_t _client, service_t _service, auto endpoint = find_local(_service, _instance); if (endpoint) { stub_->send_subscribe(endpoint, _client, - _service, _instance, _eventgroup, _major, _event, false); + _service, _instance, _eventgroup, _major, _event, DEFAULT_SUBSCRIPTION); } } @@ -4008,21 +4090,39 @@ std::unordered_set<client_t> routing_manager_impl::get_specific_endpoint_clients return result; } -bool routing_manager_impl::remote_service_offered_via_tcp_and_udp( - service_t _service, instance_t _instance) const { - bool ret(false); - std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); - const auto found_service = remote_service_info_.find(_service); - if (found_service != remote_service_info_.end()) { - const auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - if (found_instance->second.find(false) != found_instance->second.end() && - found_instance->second.find(true) != found_instance->second.end()) { - ret = true; +void routing_manager_impl::send_initial_events( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber) { + std::shared_ptr<eventgroupinfo> its_eventgroup = find_eventgroup(_service, + _instance, _eventgroup); + if (!its_eventgroup) { + return; + } + bool is_offered_both(false); + if (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT && + configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT) { + is_offered_both = true; + } + // send initial events if we already have a cached field (is_set) + for (const auto &its_event : its_eventgroup->get_events()) { + if (its_event->is_field()) { + if (its_event->is_set()) { + if (!is_offered_both) { + its_event->notify_one(_subscriber, true); + } else { + if (its_event->is_reliable() && _subscriber->is_reliable()) { + its_event->notify_one(_subscriber, true); + } + if (!its_event->is_reliable() && !_subscriber->is_reliable()) { + its_event->notify_one(_subscriber, true); + } + } + } else { + // received a subscription but can't notify due to missing payload + its_event->set_remote_notification_pending(true); } } } - return ret; } } // namespace vsomeip diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index 7eba42b..4224b57 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -35,7 +35,8 @@ namespace vsomeip { -routing_manager_proxy::routing_manager_proxy(routing_manager_host *_host) : +routing_manager_proxy::routing_manager_proxy(routing_manager_host *_host, + bool _client_side_logging) : routing_manager_base(_host), is_connected_(false), is_started_(false), @@ -46,7 +47,7 @@ routing_manager_proxy::routing_manager_proxy(routing_manager_host *_host) : logger_(logger::get()), request_debounce_timer_ (io_), request_debounce_timer_running_(false), - client_side_logging_(false) + client_side_logging_(_client_side_logging) { } @@ -55,11 +56,6 @@ routing_manager_proxy::~routing_manager_proxy() { void routing_manager_proxy::init() { routing_manager_base::init(); - const char *client_side_logging = getenv(VSOMEIP_ENV_CLIENTSIDELOGGING); - if (client_side_logging != nullptr) { - client_side_logging_ = true; - VSOMEIP_INFO << "Client side logging is enabled"; - } { std::lock_guard<std::mutex> its_lock(sender_mutex_); sender_ = create_local(VSOMEIP_ROUTING_CLIENT); @@ -469,9 +465,10 @@ void routing_manager_proxy::send_subscribe(client_t _client, service_t _service, its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = _major; std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 7], &_event, sizeof(_event)); - its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9] = 0; // local subscriber - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 10], &_subscription_type, + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9], &_subscription_type, sizeof(_subscription_type)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 10], &DEFAULT_SUBSCRIPTION, + sizeof(DEFAULT_SUBSCRIPTION)); // local subscription client_t target_client = find_local_client(_service, _instance); if (target_client != VSOMEIP_ROUTING_CLIENT) { @@ -487,7 +484,7 @@ void routing_manager_proxy::send_subscribe(client_t _client, service_t _service, void routing_manager_proxy::send_subscribe_nack(client_t _subscriber, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event) { + event_t _event, pending_subscription_id_t _subscription_id) { byte_t its_command[VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE]; uint32_t its_size = VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE; @@ -508,11 +505,18 @@ void routing_manager_proxy::send_subscribe_nack(client_t _subscriber, sizeof(_subscriber)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_event, sizeof(_event)); - - auto its_target = find_local(_subscriber); - if (its_target) { - its_target->send(its_command, sizeof(its_command)); - } else { + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 10], + &_subscription_id, sizeof(_subscription_id)); + + if (_subscriber != VSOMEIP_ROUTING_CLIENT + && _subscription_id == DEFAULT_SUBSCRIPTION) { + auto its_target = find_local(_subscriber); + if (its_target) { + its_target->send(its_command, sizeof(its_command)); + return; + } + } + { std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); @@ -522,7 +526,7 @@ void routing_manager_proxy::send_subscribe_nack(client_t _subscriber, void routing_manager_proxy::send_subscribe_ack(client_t _subscriber, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event) { + event_t _event, pending_subscription_id_t _subscription_id) { byte_t its_command[VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE]; uint32_t its_size = VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE; @@ -543,11 +547,18 @@ void routing_manager_proxy::send_subscribe_ack(client_t _subscriber, sizeof(_subscriber)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_event, sizeof(_event)); - - auto its_target = find_local(_subscriber); - if (its_target) { - its_target->send(its_command, sizeof(its_command)); - } else { + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 10], + &_subscription_id, sizeof(_subscription_id)); + + if (_subscriber != VSOMEIP_ROUTING_CLIENT + && _subscription_id == DEFAULT_SUBSCRIPTION) { + auto its_target = find_local(_subscriber); + if (its_target) { + its_target->send(its_command, sizeof(its_command)); + return; + } + } + { std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); @@ -609,6 +620,35 @@ bool routing_manager_proxy::send(client_t _client, const byte_t *_data, return false; } } + if (client_side_logging_) { + if (_size > VSOMEIP_MESSAGE_TYPE_POS) { + service_t its_service = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_SERVICE_POS_MIN], + _data[VSOMEIP_SERVICE_POS_MAX]); + method_t its_method = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_METHOD_POS_MIN], + _data[VSOMEIP_METHOD_POS_MAX]); + session_t its_session = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_SESSION_POS_MIN], + _data[VSOMEIP_SESSION_POS_MAX]); + client_t its_client = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_CLIENT_POS_MIN], + _data[VSOMEIP_CLIENT_POS_MAX]); + VSOMEIP_INFO << "routing_manager_proxy::send: (" + << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << ":" + << std::hex << std::setw(4) << std::setfill('0') << its_session << ":" + << std::hex << std::setw(4) << std::setfill('0') << its_client << "] " + << "type=" << std::hex << static_cast<std::uint32_t>(_data[VSOMEIP_MESSAGE_TYPE_POS]) + << " thread=" << std::hex << std::this_thread::get_id(); + } else { + VSOMEIP_ERROR << "routing_manager_proxy::send: (" + << std::hex << std::setw(4) << std::setfill('0') << client_ + <<"): message too short to log: " << std::dec << _size; + } + } if (_size > VSOMEIP_MESSAGE_TYPE_POS) { std::shared_ptr<endpoint> its_target; if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) { @@ -803,6 +843,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host()); client_t its_subscriber; bool its_reliable; + pending_subscription_id_t its_subscription_id(DEFAULT_SUBSCRIPTION); if (_size > VSOMEIP_COMMAND_SIZE_POS_MAX) { its_command = _data[VSOMEIP_COMMAND_TYPE_POS]; @@ -930,24 +971,35 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, sizeof(its_major)); std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 7], sizeof(its_event)); - std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 9], - sizeof(is_remote_subscriber)); - + std::memcpy(&its_subscription_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 10], + sizeof(its_subscription_id)); { std::unique_lock<std::mutex> its_lock(incoming_subscripitons_mutex_); - if (is_remote_subscriber) { + if (its_subscription_id != DEFAULT_SUBSCRIPTION) { its_lock.unlock(); // Remote subscriber: Notify routing manager initially + count subscribes - (void)host_->on_subscription(its_service, its_instance, - its_eventgroup, its_client, true); - std::set<event_t> its_already_subscribed_events; - bool inserted = insert_subscription(its_service, its_instance, its_eventgroup, - its_event, VSOMEIP_ROUTING_CLIENT, &its_already_subscribed_events); - if (inserted) { - notify_remote_initially(its_service, its_instance, its_eventgroup, - its_already_subscribed_events); - } - (void)get_remote_subscriber_count(its_service, its_instance, its_eventgroup, true); + auto self = shared_from_this(); + host_->on_subscription(its_service, its_instance, its_eventgroup, + its_client, true, + [this, self, its_client, its_service, its_instance, + its_eventgroup, its_event, its_subscription_id] + (const bool _subscription_accepted){ + if(_subscription_accepted) { + send_subscribe_ack(its_client, its_service, its_instance, + its_eventgroup, its_event, its_subscription_id); + } else { + send_subscribe_nack(its_client, its_service, its_instance, + its_eventgroup, its_event, its_subscription_id); + } + std::set<event_t> its_already_subscribed_events; + bool inserted = insert_subscription(its_service, its_instance, its_eventgroup, + its_event, VSOMEIP_ROUTING_CLIENT, &its_already_subscribed_events); + if (inserted) { + notify_remote_initially(its_service, its_instance, its_eventgroup, + its_already_subscribed_events); + } + (void)get_remote_subscriber_count(its_service, its_instance, its_eventgroup, true); + }); } else if (is_client_known(its_client)) { its_lock.unlock(); if (!configuration_->is_client_allowed(its_client, @@ -961,19 +1013,24 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, // Local & already known subscriber: create endpoint + send (N)ACK + insert subscription (void) find_or_create_local(its_client); - bool subscription_accepted = host_->on_subscription(its_service, its_instance, - its_eventgroup, its_client, true); - if (!subscription_accepted) { - send_subscribe_nack(its_client, its_service, - its_instance, its_eventgroup, its_event); - } else { - send_subscribe_ack(its_client, its_service, its_instance, - its_eventgroup, its_event); - routing_manager_base::subscribe(its_client, its_service, its_instance, - its_eventgroup, its_major, its_event, - subscription_type_e::SU_RELIABLE_AND_UNRELIABLE); - send_pending_notify_ones(its_service, its_instance, its_eventgroup, its_client); - } + auto self = shared_from_this(); + host_->on_subscription(its_service, its_instance, + its_eventgroup, its_client, true, + [this, self, its_client, its_service, its_instance, its_eventgroup, + its_event, its_major] + (const bool _subscription_accepted) { + if (!_subscription_accepted) { + send_subscribe_nack(its_client, its_service, + its_instance, its_eventgroup, its_event, DEFAULT_SUBSCRIPTION); + } else { + send_subscribe_ack(its_client, its_service, its_instance, + its_eventgroup, its_event, DEFAULT_SUBSCRIPTION); + routing_manager_base::subscribe(its_client, its_service, its_instance, + its_eventgroup, its_major, its_event, + subscription_type_e::SU_RELIABLE_AND_UNRELIABLE); + send_pending_notify_ones(its_service, its_instance, its_eventgroup, its_client); + } + }); } else { // Local & not yet known subscriber ~> set pending until subscriber gets known! subscription_data_t subscription = { its_service, its_instance, @@ -1006,7 +1063,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, sizeof(its_event)); std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8], sizeof(is_remote_subscriber)); - host_->on_subscription(its_service, its_instance, its_eventgroup, its_client, false); + host_->on_subscription(its_service, its_instance, its_eventgroup, its_client, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; }); if (!is_remote_subscriber) { // Local subscriber: withdraw subscription routing_manager_base::unsubscribe(its_client, its_service, its_instance, its_eventgroup, its_event); @@ -1289,23 +1346,26 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data, } for (const subscription_info &si : subscription_actions) { (void) find_or_create_local(si.client_id_); - bool subscription_accepted = host_->on_subscription( + auto self = shared_from_this(); + host_->on_subscription( si.service_id_, si.instance_id_, si.eventgroup_id_, - si.client_id_, true); - if (!subscription_accepted) { - send_subscribe_nack(si.client_id_, si.service_id_, - si.instance_id_, si.eventgroup_id_, si.event_); - } else { - routing_manager_base::subscribe(si.client_id_, - si.service_id_, si.instance_id_, si.eventgroup_id_, - si.major_, si.event_, - subscription_type_e::SU_RELIABLE_AND_UNRELIABLE); - send_subscribe_ack(si.client_id_, si.service_id_, - si.instance_id_, si.eventgroup_id_, si.event_); - send_pending_notify_ones(si.service_id_, - si.instance_id_, si.eventgroup_id_, si.client_id_); - } - pending_incoming_subscripitons_.erase(si.client_id_); + si.client_id_, true, + [this, self, si](const bool _subscription_accepted) { + if (!_subscription_accepted) { + send_subscribe_nack(si.client_id_, si.service_id_, + si.instance_id_, si.eventgroup_id_, si.event_, DEFAULT_SUBSCRIPTION); + } else { + routing_manager_base::subscribe(si.client_id_, + si.service_id_, si.instance_id_, si.eventgroup_id_, + si.major_, si.event_, + subscription_type_e::SU_RELIABLE_AND_UNRELIABLE); + send_subscribe_ack(si.client_id_, si.service_id_, + si.instance_id_, si.eventgroup_id_, si.event_, DEFAULT_SUBSCRIPTION); + send_pending_notify_ones(si.service_id_, + si.instance_id_, si.eventgroup_id_, si.client_id_); + } + pending_incoming_subscripitons_.erase(si.client_id_); + }); } } } diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp index 0f3d0b5..d6099cc 100644 --- a/implementation/routing/src/routing_manager_stub.cpp +++ b/implementation/routing/src/routing_manager_stub.cpp @@ -223,11 +223,11 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, uint32_t its_size; bool its_reliable(false); subscription_type_e its_subscription_type; - bool is_remote_subscriber(false); client_t its_client_from_header; client_t its_target_client; client_t its_subscriber; bool its_is_valid_crc(true); + std::uint16_t its_subscription_id(DEFAULT_SUBSCRIPTION); offer_type_e its_offer_type; its_command = _data[VSOMEIP_COMMAND_TYPE_POS]; @@ -315,9 +315,7 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, sizeof(its_major)); std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 7], sizeof(its_event)); - std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 9], - sizeof(is_remote_subscriber)); - std::memcpy(&its_subscription_type, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 10], + std::memcpy(&its_subscription_type, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 9], sizeof(its_subscription_type)); if (configuration_->is_client_allowed(its_client, its_service, its_instance)) { @@ -364,8 +362,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, sizeof(its_subscriber)); std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8], sizeof(its_event)); + std::memcpy(&its_subscription_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 10], + sizeof(its_subscription_id)); host_->on_subscribe_ack(its_subscriber, its_service, - its_instance, its_eventgroup, its_event); + its_instance, its_eventgroup, its_event, its_subscription_id); VSOMEIP_INFO << "SUBSCRIBE ACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." @@ -389,8 +389,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, sizeof(its_subscriber)); std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8], sizeof(its_event)); + std::memcpy(&its_subscription_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 10], + sizeof(its_subscription_id)); host_->on_subscribe_nack(its_subscriber, its_service, - its_instance, its_eventgroup, its_event); + its_instance, its_eventgroup, its_event, its_subscription_id); VSOMEIP_INFO << "SUBSCRIBE NACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." @@ -1204,7 +1206,7 @@ void routing_manager_stub::broadcast(const std::vector<byte_t> &_command) const void routing_manager_stub::send_subscribe(std::shared_ptr<vsomeip::endpoint> _target, client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, - event_t _event, bool _is_remote_subscriber) { + event_t _event, pending_subscription_id_t _subscription_id) { if (_target) { byte_t its_command[VSOMEIP_SUBSCRIBE_COMMAND_SIZE]; uint32_t its_size = VSOMEIP_SUBSCRIBE_COMMAND_SIZE @@ -1223,10 +1225,11 @@ void routing_manager_stub::send_subscribe(std::shared_ptr<vsomeip::endpoint> _ta its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = _major; std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 7], &_event, sizeof(_event)); - its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9] = _is_remote_subscriber; // set byte for subscription_type to zero. It's only used // in subscribe messages sent from rm_proxies to rm_stub. - its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 10] = 0x0; + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9] = 0x0; + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 10], + &_subscription_id, sizeof(_subscription_id)); _target->send(its_command, sizeof(its_command)); } diff --git a/implementation/runtime/include/application_impl.hpp b/implementation/runtime/include/application_impl.hpp index 9da6bce..28ce86b 100644 --- a/implementation/runtime/include/application_impl.hpp +++ b/implementation/runtime/include/application_impl.hpp @@ -151,8 +151,8 @@ public: bool _is_available, major_version_t _major, minor_version_t _minor); VSOMEIP_EXPORT void on_message(const std::shared_ptr<message> &&_message); VSOMEIP_EXPORT void on_error(error_code_e _error); - VSOMEIP_EXPORT bool on_subscription(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, client_t _client, bool _subscribed); + VSOMEIP_EXPORT void on_subscription(service_t _service, instance_t _instance, + eventgroup_t _eventgroup, client_t _client, bool _subscribed, std::function<void(bool)> _accepted_cb); VSOMEIP_EXPORT void on_subscription_error(service_t _service, instance_t _instance, eventgroup_t _eventgroup, uint16_t _error); VSOMEIP_EXPORT void on_subscription_status(service_t _service, instance_t _instance, @@ -181,6 +181,9 @@ public: VSOMEIP_EXPORT void set_watchdog_handler(watchdog_handler_t _handler, std::chrono::seconds _interval); + VSOMEIP_EXPORT void register_async_subscription_handler(service_t _service, + instance_t _instance, eventgroup_t _eventgroup, async_subscription_handler_t _handler); + private: // // Types @@ -339,8 +342,10 @@ private: mutable available_t available_; // Subscription handlers - std::map<service_t, std::map<instance_t, std::map<eventgroup_t, subscription_handler_t>>> - subscription_; + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, + std::pair<subscription_handler_t, async_subscription_handler_t>>>> subscription_; mutable std::mutex subscription_mutex_; std::map<service_t, std::map<instance_t, std::map<eventgroup_t, @@ -411,6 +416,8 @@ private: boost::asio::steady_timer watchdog_timer_; watchdog_handler_t watchdog_handler_; std::chrono::seconds watchdog_interval_; + + bool client_side_logging_; }; } // namespace vsomeip diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index bb68e71..9c6629c 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -54,7 +54,8 @@ application_impl::application_impl(const std::string &_name) block_stopping_(false), is_routing_manager_host_(false), stopped_called_(false), - watchdog_timer_(io_) { + watchdog_timer_(io_), + client_side_logging_(false) { } application_impl::~application_impl() { @@ -108,6 +109,13 @@ bool application_impl::init() { } } + const char *client_side_logging = getenv(VSOMEIP_ENV_CLIENTSIDELOGGING); + if (client_side_logging != nullptr) { + client_side_logging_ = true; + VSOMEIP_INFO << "Client side logging for application: " << name_ + << " is enabled"; + } + std::shared_ptr<configuration> its_configuration = get_configuration(); if (its_configuration) { VSOMEIP_INFO << "Initializing vsomeip application \"" << name_ << "\"."; @@ -155,7 +163,7 @@ bool application_impl::init() { routing_ = std::make_shared<routing_manager_impl>(this); } else { VSOMEIP_INFO << "Instantiating routing manager [Proxy]."; - routing_ = std::make_shared<routing_manager_proxy>(this); + routing_ = std::make_shared<routing_manager_proxy>(this, client_side_logging_); } routing_->init(); @@ -304,6 +312,10 @@ void application_impl::start() { for (size_t i = 0; i < io_thread_count - 1; i++) { std::shared_ptr<std::thread> its_thread = std::make_shared<std::thread>([this, i] { + VSOMEIP_INFO << "io thread id from application: " + << std::hex << std::setw(4) << std::setfill('0') + << client_ << " (" << name_ << ") is: " << std::hex + << std::this_thread::get_id(); try { io_.run(); } catch (const std::exception &e) { @@ -329,6 +341,9 @@ void application_impl::start() { app_counter_mutex__.lock(); app_counter__++; app_counter_mutex__.unlock(); + VSOMEIP_INFO << "io thread id from application: " + << std::hex << std::setw(4) << std::setfill('0') << client_ << " (" + << name_ << ") is: " << std::hex << std::this_thread::get_id(); try { io_.run(); } catch (const std::exception &e) { @@ -681,9 +696,22 @@ bool application_impl::are_available_unlocked(available_t &_available, void application_impl::send(std::shared_ptr<message> _message, bool _flush) { std::lock_guard<std::mutex> its_lock(session_mutex_); + bool is_request = utility::is_request(_message); + if (client_side_logging_) { + VSOMEIP_INFO << "application_impl::send: (" + << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _message->get_service() << "." + << std::hex << std::setw(4) << std::setfill('0') << _message->get_instance() << "." + << std::hex << std::setw(4) << std::setfill('0') << _message->get_method() << ":" + << std::hex << std::setw(4) << std::setfill('0') + << ((is_request) ? session_ : _message->get_session()) << ":" + << std::hex << std::setw(4) << std::setfill('0') + << ((is_request) ? client_ : _message->get_client()) << "] " + << "type=" << std::hex << static_cast<std::uint32_t>(_message->get_message_type()) + << " thread=" << std::hex << std::this_thread::get_id(); + } if (routing_) { // in case of requests set the request-id (client-id|session-id) - bool is_request = utility::is_request(_message); if (is_request) { _message->set_client(client_); _message->set_session(session_); @@ -813,21 +841,36 @@ void application_impl::unregister_availability_handler(service_t _service, } } -bool application_impl::on_subscription(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, client_t _client, bool _subscribed) { - - std::lock_guard<std::mutex> its_lock(subscription_mutex_); - auto found_service = subscription_.find(_service); - if (found_service != subscription_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_eventgroup = found_instance->second.find(_eventgroup); - if (found_eventgroup != found_instance->second.end()) { - return found_eventgroup->second(_client, _subscribed); +void application_impl::on_subscription(service_t _service, instance_t _instance, + eventgroup_t _eventgroup, client_t _client, bool _subscribed, std::function<void(bool)> _accepted_cb) { + bool handler_found = false; + std::pair<subscription_handler_t, async_subscription_handler_t> its_handlers; + { + std::lock_guard<std::mutex> its_lock(subscription_mutex_); + auto found_service = subscription_.find(_service); + if (found_service != subscription_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto found_eventgroup = found_instance->second.find(_eventgroup); + if (found_eventgroup != found_instance->second.end()) { + its_handlers = found_eventgroup->second; + handler_found = true; + } } } } - return true; + + if(handler_found) { + if(auto its_handler = its_handlers.first) { + // "normal" subscription handler exists + _accepted_cb(its_handler(_client, _subscribed)); + } else if(auto its_handler = its_handlers.second) { + // async subscription handler exists + its_handler(_client, _subscribed, _accepted_cb); + } + } else { + _accepted_cb(true); + } } void application_impl::register_subscription_handler(service_t _service, @@ -835,7 +878,7 @@ void application_impl::register_subscription_handler(service_t _service, subscription_handler_t _handler) { std::lock_guard<std::mutex> its_lock(subscription_mutex_); - subscription_[_service][_instance][_eventgroup] = _handler; + subscription_[_service][_instance][_eventgroup] = std::make_pair(_handler, nullptr); message_handler_t handler([&](const std::shared_ptr<message>& request) { send(runtime_->create_response(request), true); @@ -1456,6 +1499,9 @@ routing_manager * application_impl::get_routing_manager() const { void application_impl::main_dispatch() { const std::thread::id its_id = std::this_thread::get_id(); + VSOMEIP_INFO << "main dispatch thread id from application: " + << std::hex << std::setw(4) << std::setfill('0') << client_ << " (" + << name_ << ") is: " << std::hex << its_id; std::unique_lock<std::mutex> its_lock(handlers_mutex_); while (is_dispatching_) { if (handlers_.empty() || !is_active_dispatcher(its_id)) { @@ -1489,6 +1535,9 @@ void application_impl::main_dispatch() { void application_impl::dispatch() { const std::thread::id its_id = std::this_thread::get_id(); + VSOMEIP_INFO << "dispatch thread id from application: " + << std::hex << std::setw(4) << std::setfill('0') << client_ << " (" + << name_ << ") is: " << std::hex << its_id; while (is_active_dispatcher(its_id)) { std::unique_lock<std::mutex> its_lock(handlers_mutex_); if (is_dispatching_ && handlers_.empty()) { @@ -1514,14 +1563,12 @@ void application_impl::dispatch() { } } } - { - std::lock_guard<std::mutex> its_lock(handlers_mutex_); - dispatcher_condition_.notify_all(); - } + std::lock_guard<std::mutex> its_lock(handlers_mutex_); if (is_dispatching_) { std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); elapsed_dispatchers_.insert(its_id); } + dispatcher_condition_.notify_all(); } void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) { @@ -1539,7 +1586,7 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) { if (!_error) { print_blocking_call(its_sync_handler); bool active_dispatcher_available(false); - { + if (is_dispatching_) { std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); blocked_dispatchers_.insert(its_id); active_dispatcher_available = has_active_dispatcher(); @@ -1565,7 +1612,16 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) { } } }); - + if (client_side_logging_) { + VSOMEIP_INFO << "Invoking handler: (" + << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->service_id_ << "." + << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->instance_id_ << "." + << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->method_id_ << ":" + << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->session_id_ << "] " + << "type=" << static_cast<std::uint32_t>(its_sync_handler->handler_type_) + << " thread=" << std::hex << its_id; + } _handler->handler_(); its_dispatcher_timer.cancel(); if (is_dispatching_) { @@ -1645,6 +1701,9 @@ void application_impl::clear_all_handler() { } void application_impl::shutdown() { + VSOMEIP_INFO << "shutdown thread id from application: " + << std::hex << std::setw(4) << std::setfill('0') << client_ << " (" + << name_ << ") is: " << std::hex << std::this_thread::get_id(); #ifndef _WIN32 boost::asio::detail::posix_signal_blocker blocker; #endif @@ -2061,4 +2120,17 @@ void application_impl::set_watchdog_handler(watchdog_handler_t _handler, } } +void application_impl::register_async_subscription_handler(service_t _service, + instance_t _instance, eventgroup_t _eventgroup, + async_subscription_handler_t _handler) { + + std::lock_guard<std::mutex> its_lock(subscription_mutex_); + subscription_[_service][_instance][_eventgroup] = std::make_pair(nullptr, _handler);; + + message_handler_t handler([&](const std::shared_ptr<message>& request) { + send(runtime_->create_response(request), true); + }); + register_message_handler(_service, _instance, ANY_METHOD - 1, handler); +} + } // namespace vsomeip diff --git a/implementation/service_discovery/include/entry_impl.hpp b/implementation/service_discovery/include/entry_impl.hpp index fabca3b..db12a1e 100755 --- a/implementation/service_discovery/include/entry_impl.hpp +++ b/implementation/service_discovery/include/entry_impl.hpp @@ -68,6 +68,8 @@ protected: std::vector<uint8_t> options_[VSOMEIP_MAX_OPTION_RUN];
uint8_t num_options_[VSOMEIP_MAX_OPTION_RUN];
+ std::uint8_t index1_;
+ std::uint8_t index2_;
entry_impl();
entry_impl(const entry_impl &entry_);
diff --git a/implementation/service_discovery/include/eventgroupentry_impl.hpp b/implementation/service_discovery/include/eventgroupentry_impl.hpp index d9a6571..877cdbf 100755 --- a/implementation/service_discovery/include/eventgroupentry_impl.hpp +++ b/implementation/service_discovery/include/eventgroupentry_impl.hpp @@ -7,6 +7,7 @@ #define VSOMEIP_SD_EVENTGROUPENTRY_IMPL_HPP
#include "entry_impl.hpp"
+#include "../../endpoints/include/endpoint_definition.hpp"
namespace vsomeip {
namespace sd {
@@ -29,6 +30,24 @@ public: bool serialize(vsomeip::serializer *_to) const;
bool deserialize(vsomeip::deserializer *_from);
+ bool operator==(const eventgroupentry_impl& _other) const {
+ return !(ttl_ != _other.ttl_ ||
+ service_ != _other.service_ ||
+ instance_ != _other.instance_ ||
+ eventgroup_ != _other.eventgroup_ ||
+ index1_ != _other.index1_ ||
+ index2_ != _other.index2_ ||
+ num_options_[0] != _other.num_options_[0] ||
+ num_options_[1] != _other.num_options_[1] ||
+ major_version_ != _other.major_version_ ||
+ counter_ != _other.counter_);
+ }
+
+ bool is_matching_subscribe(const eventgroupentry_impl& _other) const;
+
+ void add_target(const std::shared_ptr<endpoint_definition> &_target);
+ std::shared_ptr<endpoint_definition> get_target(bool _reliable) const;
+
private:
eventgroup_t eventgroup_;
uint16_t reserved_;
@@ -36,6 +55,9 @@ private: // counter field to differentiate parallel subscriptions on same event group
// 4Bit only (max 16. parralel subscriptions)
uint8_t counter_;
+
+ std::shared_ptr<endpoint_definition> target_reliable_;
+ std::shared_ptr<endpoint_definition> target_unreliable_;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/message_impl.hpp b/implementation/service_discovery/include/message_impl.hpp index 1d658a5..f679e9c 100755 --- a/implementation/service_discovery/include/message_impl.hpp +++ b/implementation/service_discovery/include/message_impl.hpp @@ -8,11 +8,14 @@ #include <memory>
#include <vector>
+#include <atomic>
+#include <mutex>
#include <vsomeip/message.hpp>
#include "../include/primitive_types.hpp"
#include "../../message/include/message_base_impl.hpp"
+#include "../../endpoints/include/endpoint_definition.hpp"
# if _MSC_VER >= 1300
/*
@@ -40,6 +43,14 @@ class protection_option_impl; class message_impl: public vsomeip::message, public vsomeip::message_base_impl {
public:
+ typedef std::vector<std::shared_ptr<entry_impl>> entries_t;
+ typedef std::vector<std::shared_ptr<option_impl>> options_t;
+ struct forced_initial_events_t {
+ std::shared_ptr<vsomeip::endpoint_definition> target_;
+ vsomeip::service_t service_;
+ vsomeip::instance_t instance_;
+ vsomeip::eventgroup_t eventgroup_;
+ };
message_impl();
virtual ~message_impl();
@@ -61,8 +72,8 @@ public: std::shared_ptr<load_balancing_option_impl> create_load_balancing_option();
std::shared_ptr<protection_option_impl> create_protection_option();
- const std::vector<std::shared_ptr<entry_impl> > & get_entries() const;
- const std::vector<std::shared_ptr<option_impl> > & get_options() const;
+ const entries_t & get_entries() const;
+ const options_t & get_options() const;
int16_t get_option_index(const std::shared_ptr<option_impl> &_option) const;
uint32_t get_options_length();
@@ -75,6 +86,18 @@ public: length_t get_someip_length() const;
+ std::uint8_t get_number_required_acks() const;
+ std::uint8_t get_number_contained_acks() const;
+ void set_number_required_acks(std::uint8_t _required_acks);
+ void increase_number_required_acks(std::uint8_t _amount = 1);
+ void decrease_number_required_acks(std::uint8_t _amount = 1);
+ void increase_number_contained_acks();
+ bool all_required_acks_contained() const;
+ std::unique_lock<std::mutex> get_message_lock();
+
+ void forced_initial_events_add(forced_initial_events_t _entry);
+ const std::vector<forced_initial_events_t> forced_initial_events_get();
+
private:
entry_impl * deserialize_entry(vsomeip::deserializer *_from);
option_impl * deserialize_option(vsomeip::deserializer *_from);
@@ -83,8 +106,14 @@ private: flags_t flags_;
uint32_t options_length_;
- std::vector<std::shared_ptr<entry_impl> > entries_;
- std::vector<std::shared_ptr<option_impl> > options_;
+ entries_t entries_;
+ options_t options_;
+ std::atomic<std::uint8_t> number_required_acks_;
+ std::atomic<std::uint8_t> number_contained_acks_;
+ std::mutex message_mutex_;
+
+ std::mutex forced_initial_events_mutex_;
+ std::vector<forced_initial_events_t> forced_initial_events_info_;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/service_discovery.hpp b/implementation/service_discovery/include/service_discovery.hpp index 075129b..caec99e 100644 --- a/implementation/service_discovery/include/service_discovery.hpp +++ b/implementation/service_discovery/include/service_discovery.hpp @@ -65,6 +65,10 @@ public: std::shared_ptr<serviceinfo> _info) = 0; virtual void set_diagnosis_mode(const bool _activate) = 0; + virtual void remote_subscription_acknowledge( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + client_t _client, bool _accepted, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) = 0; }; } // namespace sd diff --git a/implementation/service_discovery/include/service_discovery_host.hpp b/implementation/service_discovery/include/service_discovery_host.hpp index 492c920..f6ab65d 100644 --- a/implementation/service_discovery/include/service_discovery_host.hpp +++ b/implementation/service_discovery/include/service_discovery_host.hpp @@ -15,6 +15,8 @@ #include "../../routing/include/types.hpp" +#include <vsomeip/message.hpp> + namespace vsomeip { class configuration; @@ -57,12 +59,6 @@ public: virtual std::chrono::milliseconds update_routing_info( std::chrono::milliseconds _elapsed) = 0; - virtual void on_subscribe(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, - std::shared_ptr<endpoint_definition> _subscriber, - std::shared_ptr<endpoint_definition> _target, - const std::chrono::steady_clock::time_point &_expiration) = 0; - virtual void on_unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, std::shared_ptr<endpoint_definition> _target) = 0; @@ -72,7 +68,7 @@ public: virtual void on_subscribe_ack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event) = 0; + event_t _event, pending_subscription_id_t _subscription_id) = 0; virtual std::shared_ptr<endpoint> find_or_create_remote_client( service_t _service, instance_t _instance, @@ -81,13 +77,16 @@ public: virtual void expire_subscriptions(const boost::asio::ip::address &_address) = 0; virtual void expire_services(const boost::asio::ip::address &_address) = 0; - virtual bool on_subscribe_accepted(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, std::shared_ptr<endpoint_definition> _target, - const std::chrono::steady_clock::time_point &_expiration) = 0; + virtual remote_subscription_state_e on_remote_subscription( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber, + const std::shared_ptr<endpoint_definition> &_target, + ttl_t _ttl, client_t *_client, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) = 0; virtual void on_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event) = 0; + event_t _event, pending_subscription_id_t _subscription_id) = 0; virtual bool has_identified(client_t _client, service_t _service, instance_t _instance, bool _reliable) = 0; @@ -98,6 +97,10 @@ public: service_t _service, instance_t _instance) const = 0; virtual std::map<instance_t, std::shared_ptr<serviceinfo>> get_offered_service_instances( service_t _service) const = 0; + + virtual void send_initial_events(service_t _service, instance_t _instance, + eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition>& _subscriber) = 0; }; } // namespace sd diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 5f28135..68b159f 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -12,6 +12,7 @@ #include <set> #include <forward_list> #include <atomic> +#include <tuple> #include <boost/asio/steady_timer.hpp> @@ -35,19 +36,20 @@ class eventgroupentry_impl; class option_impl; class request; class serviceentry_impl; -class service_discovery_fsm; class service_discovery_host; class subscription; typedef std::map<service_t, std::map<instance_t, std::shared_ptr<request> > > requests_t; -struct accepted_subscriber_t { - std::shared_ptr < endpoint_definition > subscriber; - std::shared_ptr < endpoint_definition > target; - std::chrono::steady_clock::time_point its_expiration; - vsomeip::service_t service_id; - vsomeip::instance_t instance_id; - vsomeip::eventgroup_t eventgroup_; +struct subscriber_t { + std::shared_ptr<endpoint_definition> subscriber; + std::shared_ptr<endpoint_definition> target; + std::shared_ptr<sd_message_identifier_t> response_message_id_; + std::shared_ptr<eventgroupinfo> eventgroupinfo_; + vsomeip::ttl_t ttl_; + vsomeip::major_version_t major_; + std::uint16_t reserved_; + std::uint8_t counter_; }; class service_discovery_impl: public service_discovery, @@ -93,6 +95,10 @@ public: void set_diagnosis_mode(const bool _activate); + void remote_subscription_acknowledge( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + client_t _client, bool _acknowledged, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id); private: std::pair<session_t, bool> get_session(const boost::asio::ip::address &_address); void increment_session(const boost::asio::ip::address &_address); @@ -123,7 +129,9 @@ private: std::shared_ptr<subscription> &_subscription); void insert_subscription_ack(std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, uint8_t _counter, major_version_t _major, uint16_t _reserved); + const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, + uint8_t _counter, major_version_t _major, uint16_t _reserved, + const std::shared_ptr<endpoint_definition> &_target = nullptr); void insert_subscription_nack(std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, uint8_t _counter, major_version_t _major, uint16_t _reserved); @@ -152,8 +160,10 @@ private: std::shared_ptr<eventgroupentry_impl> &_entry, const std::vector<std::shared_ptr<option_impl> > &_options, std::shared_ptr < message_impl > &its_message_response, - std::vector <accepted_subscriber_t> &accepted_subscribers, - const boost::asio::ip::address &_destination); + const boost::asio::ip::address &_destination, + const std::shared_ptr<sd_message_identifier_t> &_message_id, + bool _is_stop_subscribe_subscribe, + bool _force_initial_events); void handle_eventgroup_subscription(service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, uint8_t _counter, uint16_t _reserved, @@ -162,7 +172,9 @@ private: const boost::asio::ip::address &_second_address, uint16_t _second_port, bool _is_second_reliable, std::shared_ptr < message_impl > &its_message, - std::vector <accepted_subscriber_t> &accepted_subscribers); + const std::shared_ptr<sd_message_identifier_t> &_message_id, + bool _is_stop_subscribe_subscribe, + bool _force_initial_events); void handle_eventgroup_subscription_ack(service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, uint8_t _counter, @@ -182,7 +194,9 @@ private: void check_ttl(const boost::system::error_code &_error); void start_subscription_expiration_timer(); + void start_subscription_expiration_timer_unlocked(); void stop_subscription_expiration_timer(); + void stop_subscription_expiration_timer_unlocked(); void expire_subscriptions(const boost::system::error_code &_error); bool check_ipv4_address(boost::asio::ip::address its_address); @@ -279,6 +293,27 @@ private: bool check_source_address(const boost::asio::ip::address &its_source_address) const; + void update_subscription_expiration_timer(const std::shared_ptr<message_impl> &_message); + + void remote_subscription_acknowledge_subscriber( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged); + + void remote_subscription_not_acknowledge_subscriber( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged); + + void remote_subscription_not_acknowledge_all(service_t _service, instance_t _instance); + + void remote_subscription_not_acknowledge_all(); + + void remote_subscription_remove( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber); + + bool check_stop_subscribe_subscribe(message_impl::entries_t::const_iterator _iter, + message_impl::entries_t::const_iterator _end) const; + private: boost::asio::io_service &io_; service_discovery_host *host_; @@ -317,6 +352,7 @@ private: ttl_t ttl_; // TTL handling for subscriptions done by other hosts + std::mutex subscription_expiration_timer_mutex_; boost::asio::steady_timer subscription_expiration_timer_; std::chrono::steady_clock::time_point next_subscription_expiration_; @@ -358,6 +394,13 @@ private: boost::asio::ip::address current_remote_address_; std::atomic<bool> is_diagnosis_; + + std::mutex pending_remote_subscriptions_mutex_; + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, + std::map<client_t, std::vector<std::shared_ptr<subscriber_t>>>>>> pending_remote_subscriptions_; + std::mutex response_mutex_; }; } // namespace sd diff --git a/implementation/service_discovery/src/entry_impl.cpp b/implementation/service_discovery/src/entry_impl.cpp index d647bf5..ecab203 100755 --- a/implementation/service_discovery/src/entry_impl.cpp +++ b/implementation/service_discovery/src/entry_impl.cpp @@ -23,6 +23,8 @@ entry_impl::entry_impl() { ttl_ = 0x0;
num_options_[0] = 0;
num_options_[1] = 0;
+ index1_ = 0;
+ index2_ = 0;
}
entry_impl::entry_impl(const entry_impl &_entry) {
@@ -33,6 +35,8 @@ entry_impl::entry_impl(const entry_impl &_entry) { ttl_ = _entry.ttl_;
num_options_[0] = _entry.num_options_[0];
num_options_[1] = _entry.num_options_[1];
+ index1_ = _entry.index1_;
+ index2_ = _entry.index2_;
}
entry_impl::~entry_impl() {
@@ -139,11 +143,9 @@ bool entry_impl::deserialize(vsomeip::deserializer *_from) { is_successful = is_successful && _from->deserialize(its_type);
type_ = static_cast<entry_type_e>(its_type);
- uint8_t its_index1(0);
- is_successful = is_successful && _from->deserialize(its_index1);
+ is_successful = is_successful && _from->deserialize(index1_);
- uint8_t its_index2(0);
- is_successful = is_successful && _from->deserialize(its_index2);
+ is_successful = is_successful && _from->deserialize(index2_);
uint8_t its_numbers(0);
is_successful = is_successful && _from->deserialize(its_numbers);
@@ -151,10 +153,10 @@ bool entry_impl::deserialize(vsomeip::deserializer *_from) { num_options_[0] = uint8_t(its_numbers >> 4);
num_options_[1] = uint8_t(its_numbers & 0xF);
- for (uint16_t i = its_index1; i < its_index1 + num_options_[0]; ++i)
+ for (uint16_t i = index1_; i < index1_ + num_options_[0]; ++i)
options_[0].push_back((uint8_t)(i));
- for (uint16_t i = its_index2; i < its_index2 + num_options_[1]; ++i)
+ for (uint16_t i = index2_; i < index2_ + num_options_[1]; ++i)
options_[1].push_back((uint8_t)(i));
uint16_t its_id(0);
diff --git a/implementation/service_discovery/src/eventgroupentry_impl.cpp b/implementation/service_discovery/src/eventgroupentry_impl.cpp index 29716d7..1f9dd4a 100755 --- a/implementation/service_discovery/src/eventgroupentry_impl.cpp +++ b/implementation/service_discovery/src/eventgroupentry_impl.cpp @@ -115,5 +115,34 @@ bool eventgroupentry_impl::deserialize(vsomeip::deserializer *_from) { return is_successful;
}
+bool eventgroupentry_impl::is_matching_subscribe(
+ const eventgroupentry_impl& _other) const {
+ return !(ttl_ != 0
+ || _other.ttl_ == 0
+ || service_ != _other.service_
+ || instance_ != _other.instance_
+ || eventgroup_ != _other.eventgroup_
+ || index1_ != _other.index1_
+ || index2_ != _other.index2_
+ || num_options_[0] != _other.num_options_[0]
+ || num_options_[1] != _other.num_options_[1]
+ || major_version_ != _other.major_version_
+ || counter_ != _other.counter_);
+}
+
+void eventgroupentry_impl::add_target(
+ const std::shared_ptr<endpoint_definition> &_target) {
+ if (_target->is_reliable()) {
+ target_reliable_ = _target;
+ } else {
+ target_unreliable_ = _target;
+ }
+}
+
+std::shared_ptr<endpoint_definition> eventgroupentry_impl::get_target(
+ bool _reliable) const {
+ return _reliable ? target_reliable_ : target_unreliable_;
+}
+
} // namespace sd
} // namespace vsomeip
diff --git a/implementation/service_discovery/src/message_impl.cpp b/implementation/service_discovery/src/message_impl.cpp index d4d3931..d23bf31 100755 --- a/implementation/service_discovery/src/message_impl.cpp +++ b/implementation/service_discovery/src/message_impl.cpp @@ -26,12 +26,14 @@ namespace vsomeip {
namespace sd {
-message_impl::message_impl() {
+message_impl::message_impl() :
+ flags_(0x0),
+ options_length_(0x0),
+ number_required_acks_(0x0),
+ number_contained_acks_(0x0) {
header_.service_ = 0xFFFF;
header_.method_ = 0x8100;
header_.protocol_version_ = 0x01;
- flags_ = 0x00;
- options_length_ = 0x0000;
}
message_impl::~message_impl() {
@@ -149,11 +151,11 @@ std::shared_ptr<protection_option_impl> message_impl::create_protection_option() return its_option;
}
-const std::vector<std::shared_ptr<entry_impl> > & message_impl::get_entries() const {
+const message_impl::entries_t & message_impl::get_entries() const {
return entries_;
}
-const std::vector<std::shared_ptr<option_impl> > & message_impl::get_options() const {
+const message_impl::options_t & message_impl::get_options() const {
return options_;
}
@@ -236,7 +238,31 @@ bool message_impl::deserialize(vsomeip::deserializer *_from) { while (is_successful && _from->get_remaining()) {
std::shared_ptr < entry_impl > its_entry(deserialize_entry(_from));
if (its_entry) {
- entries_.push_back(its_entry);
+ if (its_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP) {
+ bool is_unique(true);
+ for (const auto& e : entries_) {
+ if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP &&
+ *(static_cast<eventgroupentry_impl*>(e.get())) ==
+ *(static_cast<eventgroupentry_impl*>(its_entry.get()))) {
+ is_unique = false;
+ break;
+ }
+ }
+ if (is_unique) {
+ entries_.push_back(its_entry);
+ if (its_entry->get_ttl() > 0) {
+ const std::uint8_t num_options =
+ static_cast<std::uint8_t>(
+ its_entry->get_num_options(1) +
+ its_entry->get_num_options(2));
+ number_required_acks_ =
+ static_cast<std::uint8_t>(number_required_acks_
+ + num_options);
+ }
+ }
+ } else {
+ entries_.push_back(its_entry);
+ }
} else {
is_successful = false;
}
@@ -366,5 +392,48 @@ length_t message_impl::get_someip_length() const { return header_.length_;
}
+std::uint8_t message_impl::get_number_required_acks() const {
+ return number_required_acks_;
+}
+
+std::uint8_t message_impl::get_number_contained_acks() const {
+ return number_contained_acks_;
+}
+
+void message_impl::set_number_required_acks(std::uint8_t _required_acks) {
+ number_required_acks_ = _required_acks;
+}
+
+void message_impl::increase_number_required_acks(std::uint8_t _amount) {
+ number_required_acks_ += _amount;
+}
+
+void message_impl::decrease_number_required_acks(std::uint8_t _amount) {
+ number_required_acks_ -= _amount;
+}
+
+void message_impl::increase_number_contained_acks() {
+ number_contained_acks_++;
+}
+
+bool message_impl::all_required_acks_contained() const {
+ return number_contained_acks_ == number_required_acks_;
+}
+
+std::unique_lock<std::mutex> message_impl::get_message_lock() {
+ return std::unique_lock<std::mutex>(message_mutex_);
+}
+
+void message_impl::forced_initial_events_add(forced_initial_events_t _entry) {
+ std::lock_guard<std::mutex> its_lock(forced_initial_events_mutex_);
+ forced_initial_events_info_.push_back(_entry);
+}
+
+const std::vector<message_impl::forced_initial_events_t>
+message_impl::forced_initial_events_get() {
+ std::lock_guard<std::mutex> its_lock(forced_initial_events_mutex_);
+ return forced_initial_events_info_;
+}
+
} // namespace sd
} // namespace vsomeip
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index d4a8964..ca9aa5a 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -1000,8 +1000,11 @@ void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared void service_discovery_impl::insert_subscription_ack( std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, uint8_t _counter, major_version_t _major, uint16_t _reserved) { - + const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, + uint8_t _counter, major_version_t _major, uint16_t _reserved, + const std::shared_ptr<endpoint_definition> &_target) { + std::unique_lock<std::mutex> its_lock(_message->get_message_lock()); + _message->increase_number_contained_acks(); for (auto its_entry : _message->get_entries()) { if (its_entry->is_eventgroup_entry()) { std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry = @@ -1015,6 +1018,17 @@ void service_discovery_impl::insert_subscription_ack( && its_eventgroup_entry->get_reserved() == _reserved && its_eventgroup_entry->get_counter() == _counter && its_eventgroup_entry->get_ttl() == _ttl) { + if (_target) { + if (_target->is_reliable()) { + if (!its_eventgroup_entry->get_target(true)) { + its_eventgroup_entry->add_target(_target); + } + } else { + if (!its_eventgroup_entry->get_target(false)) { + its_eventgroup_entry->add_target(_target); + } + } + } return; } } @@ -1031,6 +1045,9 @@ void service_discovery_impl::insert_subscription_ack( its_entry->set_counter(_counter); // SWS_SD_00315 its_entry->set_ttl(_ttl); + if (_target) { + its_entry->add_target(_target); + } boost::asio::ip::address its_address; uint16_t its_port; @@ -1043,6 +1060,7 @@ void service_discovery_impl::insert_subscription_nack( std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, uint8_t _counter, major_version_t _major, uint16_t _reserved) { + std::unique_lock<std::mutex> its_lock(_message->get_message_lock()); std::shared_ptr < eventgroupentry_impl > its_entry = _message->create_eventgroup_entry(); // SWS_SD_00316 and SWS_SD_00385 @@ -1055,6 +1073,7 @@ void service_discovery_impl::insert_subscription_nack( its_entry->set_counter(_counter); // SWS_SD_00432 its_entry->set_ttl(0x0); + _message->increase_number_contained_acks(); } bool service_discovery_impl::send(bool _is_announcing) { @@ -1064,6 +1083,7 @@ bool service_discovery_impl::send(bool _is_announcing) { std::shared_ptr < message_impl > its_message; if(_is_announcing) { + remote_subscription_not_acknowledge_all(); its_message = its_runtime->create_message(); its_messages.push_back(its_message); @@ -1127,32 +1147,62 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, std::shared_ptr < message_impl > its_message_response = its_runtime->create_message(); - std::vector <accepted_subscriber_t> accepted_subscribers; - for (auto its_entry : its_message->get_entries()) { - if (its_entry->is_service_entry()) { + const std::uint8_t its_required_acks = + its_message->get_number_required_acks(); + its_message_response->set_number_required_acks(its_required_acks); + std::shared_ptr<sd_message_identifier_t> its_message_id = + std::make_shared<sd_message_identifier_t>( + its_message->get_session(), _sender, _destination, + its_message_response); + + const message_impl::entries_t& its_entries = its_message->get_entries(); + const message_impl::entries_t::const_iterator its_end = its_entries.end(); + bool is_stop_subscribe_subscribe(false); + + for (auto iter = its_entries.begin(); iter != its_end; iter++) { + if ((*iter)->is_service_entry()) { std::shared_ptr < serviceentry_impl > its_service_entry = std::dynamic_pointer_cast < serviceentry_impl - > (its_entry); + > (*iter); bool its_unicast_flag = its_message->get_unicast_flag(); process_serviceentry(its_service_entry, its_options, its_unicast_flag ); } else { std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry = std::dynamic_pointer_cast < eventgroupentry_impl - > (its_entry); - process_eventgroupentry( its_eventgroup_entry, its_options, its_message_response, accepted_subscribers, _destination); + > (*iter); + bool force_initial_events(false); + if (is_stop_subscribe_subscribe) { + is_stop_subscribe_subscribe = false; + force_initial_events = true; + } + is_stop_subscribe_subscribe = check_stop_subscribe_subscribe(iter, its_end); + process_eventgroupentry(its_eventgroup_entry, its_options, + its_message_response, _destination, + its_message_id, is_stop_subscribe_subscribe, force_initial_events); + } + } + + // send answer directly if SubscribeEventgroup entries were (n)acked + if (its_required_acks || its_message_response->get_number_required_acks() > 0) { + bool sent(false); + { + std::lock_guard<std::mutex> its_lock(response_mutex_); + if (its_message_response->all_required_acks_contained()) { + update_subscription_expiration_timer(its_message_response); + serialize_and_send(its_message_response, _sender); + // set required acks to zero to mark message as sent + its_message_response->set_number_required_acks(0); + sent = true; + } + } + if (sent) { + for (const auto &fie : its_message_response->forced_initial_events_get()) { + host_->send_initial_events(fie.service_, fie.instance_, + fie.eventgroup_, fie.target_); + } } } - - //send ACK / NACK if present - if( 0 < its_message_response->get_entries().size() && its_message_response ) { - serialize_and_send(its_message_response, _sender); - } - - for( const auto &a : accepted_subscribers) { - host_->on_subscribe(a.service_id, a.instance_id, a.eventgroup_, a.subscriber, a.target, a.its_expiration); - } - accepted_subscribers.clear(); start_ttl_timer(); } else { VSOMEIP_ERROR << "service_discovery_impl::on_message: deserialization error."; @@ -1658,8 +1708,10 @@ void service_discovery_impl::process_eventgroupentry( std::shared_ptr<eventgroupentry_impl> &_entry, const std::vector<std::shared_ptr<option_impl> > &_options, std::shared_ptr < message_impl > &its_message_response, - std::vector <accepted_subscriber_t> &accepted_subscribers, - const boost::asio::ip::address &_destination) { + const boost::asio::ip::address &_destination, + const std::shared_ptr<sd_message_identifier_t> &_message_id, + bool _is_stop_subscribe_subscribe, + bool _force_initial_events) { service_t its_service = _entry->get_service(); instance_t its_instance = _entry->get_instance(); eventgroup_t its_eventgroup = _entry->get_eventgroup(); @@ -1691,6 +1743,9 @@ void service_discovery_impl::process_eventgroupentry( && _entry->get_num_options(2) == 0) { VSOMEIP_ERROR << "Invalid number of options in SubscribeEventGroup entry"; if(its_ttl > 0) { + // increase number of required acks by one as number required acks + // is calculated based on the number of referenced options + its_message_response->increase_number_required_acks(); insert_subscription_nack(its_message_response, its_service, its_instance, its_eventgroup, its_counter, its_major, its_reserved); } @@ -1712,6 +1767,14 @@ void service_discovery_impl::process_eventgroupentry( VSOMEIP_ERROR << "Fewer options in SD message than " "referenced in EventGroup entry or malformed option received"; if(its_ttl > 0) { + const std::uint8_t num_overreferenced_options = + static_cast<std::uint8_t>(_entry->get_num_options(1) + + _entry->get_num_options(2) - _options.size()); + if (its_message_response->get_number_required_acks() - + num_overreferenced_options > 0) { + its_message_response->decrease_number_required_acks( + num_overreferenced_options); + } insert_subscription_nack(its_message_response, its_service, its_instance, its_eventgroup, its_counter, its_major, its_reserved); } @@ -1895,12 +1958,14 @@ void service_discovery_impl::process_eventgroupentry( } break; case option_type_e::CONFIGURATION: { + its_message_response->decrease_number_required_acks(); break; } case option_type_e::UNKNOWN: default: VSOMEIP_WARNING << "Unsupported eventgroup option"; if(its_ttl > 0) { + its_message_response->decrease_number_required_acks(); insert_subscription_nack(its_message_response, its_service, its_instance, its_eventgroup, its_counter, its_major, its_reserved); return; @@ -1914,7 +1979,8 @@ void service_discovery_impl::process_eventgroupentry( handle_eventgroup_subscription(its_service, its_instance, its_eventgroup, its_major, its_ttl, its_counter, its_reserved, its_first_address, its_first_port, is_first_reliable, - its_second_address, its_second_port, is_second_reliable, its_message_response, accepted_subscribers); + its_second_address, its_second_port, is_second_reliable, its_message_response, + _message_id, _is_stop_subscribe_subscribe, _force_initial_events); } else { if( entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) { //this type is used for ACK and NACK messages if(its_ttl > 0) { @@ -1934,7 +2000,8 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service, const boost::asio::ip::address &_first_address, uint16_t _first_port, bool _is_first_reliable, const boost::asio::ip::address &_second_address, uint16_t _second_port, bool _is_second_reliable, std::shared_ptr < message_impl > &its_message, - std::vector <accepted_subscriber_t> &accepted_subscribers) { + const std::shared_ptr<sd_message_identifier_t> &_message_id, + bool _is_stop_subscribe_subscribe, bool _force_initial_events) { if (its_message) { bool has_reliable_events(false); @@ -1966,30 +2033,38 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service, if (reliablility_nack && _ttl > 0) { insert_subscription_nack(its_message, _service, _instance, _eventgroup, _counter, _major, _reserved); - VSOMEIP_WARNING << "Subscription for service/instance " - << std::hex << _service << "/" << _instance - << " not valid: Event configuration does not match the provided endpoint options"; + boost::system::error_code ec; + VSOMEIP_WARNING << "Subscription for [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " not valid: Event configuration does not match the provided " + << "endpoint options: " + << _first_address.to_string(ec) << ":" << std::dec << _first_port << " " + << _second_address.to_string(ec) << ":" << std::dec << _second_port; return; } std::shared_ptr < eventgroupinfo > its_info = host_->find_eventgroup( _service, _instance, _eventgroup); - bool is_nack(false); - std::shared_ptr < endpoint_definition > its_first_subscriber, - its_second_subscriber; - std::shared_ptr < endpoint_definition > its_first_target, - its_second_target; + struct subscriber_target_t { + std::shared_ptr<endpoint_definition> subscriber_; + std::shared_ptr<endpoint_definition> target_; + }; + std::array<subscriber_target_t, 2> its_targets = + { subscriber_target_t(), subscriber_target_t() }; // Could not find eventgroup or wrong version if (!its_info || _major != its_info->get_major()) { // Create a temporary info object with TTL=0 --> send NACK if( its_info && (_major != its_info->get_major())) { VSOMEIP_ERROR << "Requested major version:[" << (uint32_t) _major - << "] in subscription to service:[" << _service - << "] instance:[" << _instance - << "] eventgroup:[" << _eventgroup - << "], does not match with services major version:[" << (uint32_t) its_info->get_major() + << "] in subscription to service: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " does not match with services major version:[" << (uint32_t) its_info->get_major() << "]"; } else { VSOMEIP_ERROR << "Requested eventgroup:[" << _eventgroup @@ -2006,117 +2081,125 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service, boost::asio::ip::address its_first_address, its_second_address; uint16_t its_first_port, its_second_port; if (ILLEGAL_PORT != _first_port) { - its_first_subscriber = endpoint_definition::get( + its_targets[0].subscriber_ = endpoint_definition::get( _first_address, _first_port, _is_first_reliable, _service, _instance); if (!_is_first_reliable && its_info->get_multicast(its_first_address, its_first_port)) { // udp multicast - its_first_target = endpoint_definition::get( + its_targets[0].target_ = endpoint_definition::get( its_first_address, its_first_port, false, _service, _instance); } else if(_is_first_reliable) { // tcp unicast - its_first_target = its_first_subscriber; + its_targets[0].target_ = its_targets[0].subscriber_; // check if TCP connection is established by client - if( !is_tcp_connected(_service, _instance, its_first_target) && _ttl > 0) { + if(_ttl > 0 && !is_tcp_connected(_service, _instance, its_targets[0].target_)) { insert_subscription_nack(its_message, _service, _instance, _eventgroup, _counter, _major, _reserved); - VSOMEIP_ERROR << "TCP connection to target1: [" << its_first_target->get_address().to_string() - << ":" << its_first_target->get_port() - << "] not established for subscription to service:[" << _service - << "] instance:[" << _instance - << "] eventgroup:[" << _eventgroup << "]"; + VSOMEIP_ERROR << "TCP connection to target1: [" + << its_targets[0].target_->get_address().to_string() + << ":" << its_targets[0].target_->get_port() + << "] not established for subscription to: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + if (ILLEGAL_PORT != _second_port) { + its_message->decrease_number_required_acks(); + } return; } } else { // udp unicast - its_first_target = its_first_subscriber; + its_targets[0].target_ = its_targets[0].subscriber_; } } if (ILLEGAL_PORT != _second_port) { - its_second_subscriber = endpoint_definition::get( + its_targets[1].subscriber_ = endpoint_definition::get( _second_address, _second_port, _is_second_reliable, _service, _instance); if (!_is_second_reliable && its_info->get_multicast(its_second_address, its_second_port)) { // udp multicast - its_second_target = endpoint_definition::get( + its_targets[1].target_ = endpoint_definition::get( its_second_address, its_second_port, false, _service, _instance); } else if (_is_second_reliable) { // tcp unicast - its_second_target = its_second_subscriber; + its_targets[1].target_ = its_targets[1].subscriber_; // check if TCP connection is established by client - if(_ttl > 0 && !is_tcp_connected(_service, _instance, its_second_target)) { + if(_ttl > 0 && !is_tcp_connected(_service, _instance, its_targets[1].target_)) { insert_subscription_nack(its_message, _service, _instance, _eventgroup, _counter, _major, _reserved); - VSOMEIP_ERROR << "TCP connection to target2 : [" << its_second_target->get_address().to_string() - << ":" << its_second_target->get_port() - << "] not established for subscription to service:[" << _service - << "] instance:[" << _instance - << "] eventgroup:[" << _eventgroup << "]"; + VSOMEIP_ERROR << "TCP connection to target2 : [" + << its_targets[1].target_->get_address().to_string() + << ":" << its_targets[1].target_->get_port() + << "] not established for subscription to: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + if (ILLEGAL_PORT != _first_port) { + its_message->decrease_number_required_acks(); + } return; } } else { // udp unicast - its_second_target = its_second_subscriber; + its_targets[1].target_ = its_targets[1].subscriber_; } } } if (_ttl == 0) { // --> unsubscribe - if (its_first_subscriber) { - host_->on_unsubscribe(_service, _instance, _eventgroup, its_first_subscriber); - } - if (its_second_subscriber) { - host_->on_unsubscribe(_service, _instance, _eventgroup, its_second_subscriber); + for (const auto &target : its_targets) { + if (target.subscriber_) { + remote_subscription_remove(_service, _instance, _eventgroup, target.subscriber_); + if (!_is_stop_subscribe_subscribe) { + host_->on_unsubscribe(_service, _instance, _eventgroup, target.subscriber_); + } + } } return; } - std::chrono::steady_clock::time_point its_expiration - = std::chrono::steady_clock::now() + std::chrono::seconds(_ttl); - - if (its_first_target) { - if(!host_->on_subscribe_accepted(_service, _instance, _eventgroup, - its_first_subscriber, its_expiration)) { - is_nack = true; - insert_subscription_nack(its_message, _service, _instance, _eventgroup, - _counter, _major, _reserved); - } - } - if (its_second_subscriber) { - if(!host_->on_subscribe_accepted(_service, _instance, _eventgroup, - its_second_subscriber, its_expiration)) { - is_nack = true; - insert_subscription_nack(its_message, _service, _instance, _eventgroup, - _counter, _major, _reserved); - } - } - - if (!is_nack) - { - insert_subscription_ack(its_message, _service, _instance, _eventgroup, - its_info, _ttl, _counter, _major, _reserved); - - if (its_expiration < next_subscription_expiration_) { - stop_subscription_expiration_timer(); - next_subscription_expiration_ = its_expiration; - start_subscription_expiration_timer(); - } - - if (its_first_target && its_first_subscriber) { - accepted_subscriber_t subscriber_; - subscriber_.service_id = _service; - subscriber_.instance_id = _instance; - subscriber_.eventgroup_ = _eventgroup; - subscriber_.subscriber = its_first_subscriber; - subscriber_.target = its_first_target; - subscriber_.its_expiration = its_expiration; - - accepted_subscribers.push_back(subscriber_); - } - if (its_second_target && its_second_subscriber) { - accepted_subscriber_t subscriber_; - subscriber_.service_id = _service; - subscriber_.instance_id = _instance; - subscriber_.eventgroup_ = _eventgroup; - subscriber_.subscriber = its_second_subscriber; - subscriber_.target = its_second_target; - subscriber_.its_expiration = its_expiration; - - accepted_subscribers.push_back(subscriber_); + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + for (const auto &target : its_targets) { + if (target.target_) { + client_t its_subscribing_remote_client = VSOMEIP_ROUTING_CLIENT; + switch (host_->on_remote_subscription(_service, _instance, + _eventgroup, target.subscriber_, target.target_, + _ttl, &its_subscribing_remote_client, + _message_id)) { + case remote_subscription_state_e::SUBSCRIPTION_ACKED: + insert_subscription_ack(its_message, _service, + _instance, _eventgroup, its_info, _ttl, + _counter, _major, _reserved); + if (_force_initial_events) { + // processing subscription of StopSubscribe/Subscribe + // sequence + its_message->forced_initial_events_add( + message_impl::forced_initial_events_t( + { target.subscriber_, _service, _instance, + _eventgroup })); + } + break; + case remote_subscription_state_e::SUBSCRIPTION_NACKED: + case remote_subscription_state_e::SUBSCRIPTION_ERROR: + insert_subscription_nack(its_message, _service, + _instance, _eventgroup, _counter, _major, + _reserved); + break; + case remote_subscription_state_e::SUBSCRIPTION_PENDING: + if (target.target_ && target.subscriber_) { + std::shared_ptr<subscriber_t> subscriber_ = + std::make_shared<subscriber_t>(); + subscriber_->subscriber = target.subscriber_; + subscriber_->target = target.target_; + subscriber_->response_message_id_ = _message_id; + subscriber_->eventgroupinfo_ = its_info; + subscriber_->ttl_ = _ttl; + subscriber_->major_ = _major; + subscriber_->reserved_ = _reserved; + subscriber_->counter_ = _counter; + + pending_remote_subscriptions_[_service] + [_instance] + [_eventgroup] + [its_subscribing_remote_client].push_back(subscriber_); + } + default: + break; + } } } } @@ -2137,7 +2220,7 @@ void service_discovery_impl::handle_eventgroup_subscription_nack(service_t _serv // Deliver nack nackedClient = client.first; host_->on_subscribe_nack(client.first, _service, - _instance, _eventgroup, ANY_EVENT); + _instance, _eventgroup, ANY_EVENT, DEFAULT_SUBSCRIPTION); break; } } @@ -2179,7 +2262,7 @@ void service_discovery_impl::handle_eventgroup_subscription_ack( if (its_client.second->get_counter() == _counter) { its_client.second->set_acknowledged(true); host_->on_subscribe_ack(its_client.first, _service, - _instance, _eventgroup, ANY_EVENT); + _instance, _eventgroup, ANY_EVENT, DEFAULT_SUBSCRIPTION); } if (_address.is_multicast()) { host_->on_subscribe_ack(_service, _instance, _address, @@ -2389,6 +2472,11 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ } void service_discovery_impl::start_subscription_expiration_timer() { + std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_); + start_subscription_expiration_timer_unlocked(); +} + +void service_discovery_impl::start_subscription_expiration_timer_unlocked() { subscription_expiration_timer_.expires_at(next_subscription_expiration_); subscription_expiration_timer_.async_wait( std::bind(&service_discovery_impl::expire_subscriptions, @@ -2397,6 +2485,11 @@ void service_discovery_impl::start_subscription_expiration_timer() { } void service_discovery_impl::stop_subscription_expiration_timer() { + std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_); + stop_subscription_expiration_timer_unlocked(); +} + +void service_discovery_impl::stop_subscription_expiration_timer_unlocked() { subscription_expiration_timer_.cancel(); } @@ -2913,6 +3006,8 @@ void service_discovery_impl::stop_offer_service( if(_info->is_in_mainphase() || stop_offer_required) { send_stop_offer(_service, _instance, _info); } + // sent out NACKs for all pending subscriptions + remote_subscription_not_acknowledge_all(_service, _instance); } bool service_discovery_impl::send_stop_offer( @@ -3014,5 +3109,261 @@ void service_discovery_impl::set_diagnosis_mode(const bool _activate) { is_diagnosis_ = _activate; } +void service_discovery_impl::remote_subscription_acknowledge( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + client_t _client, bool _acknowledged, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) { + std::shared_ptr<subscriber_t> its_subscriber; + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + const auto its_service = pending_remote_subscriptions_.find(_service); + if (its_service != pending_remote_subscriptions_.end()) { + const auto its_instance = its_service->second.find(_instance); + if (its_instance != its_service->second.end()) { + const auto its_eventgroup = its_instance->second.find(_eventgroup); + if (its_eventgroup != its_instance->second.end()) { + const auto its_client = its_eventgroup->second.find(_client); + if (its_client != its_eventgroup->second.end()) { + for (auto iter = its_client->second.begin(); + iter != its_client->second.end();) { + if ((*iter)->response_message_id_ == _sd_message_id) { + its_subscriber = *iter; + iter = its_client->second.erase(iter); + break; + } else { + iter++; + } + } + + // delete if necessary + if (!its_client->second.size()) { + its_eventgroup->second.erase(its_client); + if (!its_eventgroup->second.size()) { + its_instance->second.erase(its_eventgroup); + if (!its_instance->second.size()) { + its_service->second.erase(its_instance); + if (!its_service->second.size()) { + pending_remote_subscriptions_.erase( + its_service); + } + } + } + } + } + } + } + } + } + if (its_subscriber) { + remote_subscription_acknowledge_subscriber(_service, _instance, + _eventgroup, its_subscriber, _acknowledged); + } +} + +void service_discovery_impl::update_subscription_expiration_timer( + const std::shared_ptr<message_impl> &_message) { + std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_); + const std::chrono::steady_clock::time_point now = + std::chrono::steady_clock::now(); + stop_subscription_expiration_timer_unlocked(); + for (const auto &entry : _message->get_entries()) { + if (entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK + && entry->get_ttl()) { + const std::chrono::steady_clock::time_point its_expiration = now + + std::chrono::seconds(entry->get_ttl()); + if (its_expiration < next_subscription_expiration_) { + next_subscription_expiration_ = its_expiration; + } + } + } + start_subscription_expiration_timer_unlocked(); +} + +void service_discovery_impl::remote_subscription_acknowledge_subscriber( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged) { + std::shared_ptr<message_impl> its_response = _subscriber->response_message_id_->response_; + std::vector<std::tuple<service_t, instance_t, eventgroup_t, + std::shared_ptr<endpoint_definition>>> its_acks; + bool sent(false); + { + std::lock_guard<std::mutex> its_lock(response_mutex_); + if (_acknowledged) { + insert_subscription_ack(its_response, _service, _instance, + _eventgroup, _subscriber->eventgroupinfo_, + _subscriber->ttl_, _subscriber->counter_, + _subscriber->major_, _subscriber->reserved_, _subscriber->subscriber); + } else { + insert_subscription_nack(its_response, _service, _instance, + _eventgroup, _subscriber->counter_, + _subscriber->major_, _subscriber->reserved_); + } + + if (its_response->all_required_acks_contained()) { + update_subscription_expiration_timer(its_response); + serialize_and_send(its_response, _subscriber->response_message_id_->sender_); + // set required acks to zero to mark message as sent + its_response->set_number_required_acks(0); + sent = true; + } + } + if (sent) { + for (const auto &e : its_response->get_entries()) { + if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK + && e->get_ttl() > 0) { + const std::shared_ptr<eventgroupentry_impl> casted_e = + std::static_pointer_cast<eventgroupentry_impl>(e); + const std::shared_ptr<endpoint_definition> its_reliable = + casted_e->get_target(true); + if (its_reliable) { + its_acks.push_back( + std::make_tuple(e->get_service(), e->get_instance(), + casted_e->get_eventgroup(), its_reliable)); + } + const std::shared_ptr<endpoint_definition> its_unreliable = + casted_e->get_target(false); + if (its_unreliable) { + its_acks.push_back( + std::make_tuple(e->get_service(), e->get_instance(), + casted_e->get_eventgroup(), + its_unreliable)); + } + } + } + for (const auto& ack_tuple : its_acks) { + host_->send_initial_events(std::get<0>(ack_tuple), + std::get<1>(ack_tuple), std::get<2>(ack_tuple), + std::get<3>(ack_tuple)); + } + for (const auto &fie : its_response->forced_initial_events_get()) { + host_->send_initial_events(fie.service_, fie.instance_, + fie.eventgroup_, fie.target_); + } + } +} + +void service_discovery_impl::remote_subscription_not_acknowledge_all( + service_t _service, instance_t _instance) { + std::map<eventgroup_t, std::vector<std::shared_ptr<subscriber_t>>>its_pending_subscriptions; + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + const auto its_service = pending_remote_subscriptions_.find(_service); + if (its_service != pending_remote_subscriptions_.end()) { + const auto its_instance = its_service->second.find(_instance); + if (its_instance != its_service->second.end()) { + for (const auto &its_eventgroup : its_instance->second) { + for (const auto &its_client : its_eventgroup.second) { + its_pending_subscriptions[its_eventgroup.first].insert( + its_pending_subscriptions[its_eventgroup.first].end(), + its_client.second.begin(), + its_client.second.end()); + } + } + // delete everything from this service instance + its_service->second.erase(its_instance); + if (!its_service->second.size()) { + pending_remote_subscriptions_.erase(its_service); + } + } + } + } + for (const auto &eg : its_pending_subscriptions) { + for (const auto &its_subscriber : eg.second) { + remote_subscription_acknowledge_subscriber(_service, _instance, + eg.first, its_subscriber, false); + } + } +} + +void service_discovery_impl::remote_subscription_not_acknowledge_all() { + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, std::vector<std::shared_ptr<subscriber_t>>>>> to_be_nacked; + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + for (const auto &its_service : pending_remote_subscriptions_) { + for (const auto &its_instance : its_service.second) { + for (const auto &its_eventgroup : its_instance.second) { + for (const auto &its_client : its_eventgroup.second) { + to_be_nacked[its_service.first] + [its_instance.first] + [its_eventgroup.first].insert( + to_be_nacked[its_service.first][its_instance.first][its_eventgroup.first].end(), + its_client.second.begin(), + its_client.second.end()); + } + } + } + } + pending_remote_subscriptions_.clear(); + } + for (const auto &s : to_be_nacked) { + for (const auto &i : s.second) { + for (const auto &eg : i.second) { + for (const auto &sub : eg.second) { + remote_subscription_acknowledge_subscriber(s.first, i.first, + eg.first, sub, false); + } + } + } + } +} + +void service_discovery_impl::remote_subscription_remove( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber) { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + const auto its_service = pending_remote_subscriptions_.find(_service); + if (its_service != pending_remote_subscriptions_.end()) { + const auto its_instance = its_service->second.find(_instance); + if (its_instance != its_service->second.end()) { + const auto its_eventgroup = its_instance->second.find(_eventgroup); + if (its_eventgroup != its_instance->second.end()) { + for (auto client_iter = its_eventgroup->second.begin(); + client_iter != its_eventgroup->second.end(); ) { + for (auto subscriber_iter = client_iter->second.begin(); + subscriber_iter != client_iter->second.end();) { + if ((*subscriber_iter)->subscriber == _subscriber) { + (*subscriber_iter)->response_message_id_->response_->decrease_number_required_acks(); + subscriber_iter = client_iter->second.erase( + subscriber_iter); + } else { + ++subscriber_iter; + } + } + if (!client_iter->second.size()) { + client_iter = its_eventgroup->second.erase(client_iter); + } else { + ++client_iter; + } + } + if (!its_eventgroup->second.size()) { + its_instance->second.erase(its_eventgroup); + if (!its_service->second.size()) { + its_service->second.erase(its_instance); + if (!its_service->second.size()) { + pending_remote_subscriptions_.erase(its_service); + } + } + } + } + } + } +} + +bool service_discovery_impl::check_stop_subscribe_subscribe( + message_impl::entries_t::const_iterator _iter, + message_impl::entries_t::const_iterator _end) const { + const message_impl::entries_t::const_iterator its_next = std::next(_iter); + if ((*_iter)->get_type() != entry_type_e::SUBSCRIBE_EVENTGROUP + || its_next == _end + || (*its_next)->get_type() != entry_type_e::STOP_SUBSCRIBE_EVENTGROUP) { + return false; + } + + return (*static_cast<eventgroupentry_impl*>(_iter->get())).is_matching_subscribe( + *(static_cast<eventgroupentry_impl*>(its_next->get()))); +} + } // namespace sd } // namespace vsomeip diff --git a/interface/vsomeip/application.hpp b/interface/vsomeip/application.hpp index fdf007e..42bd534 100644 --- a/interface/vsomeip/application.hpp +++ b/interface/vsomeip/application.hpp @@ -926,6 +926,25 @@ public: */ virtual void set_watchdog_handler(watchdog_handler_t _handler, std::chrono::seconds _interval) = 0; + /** + * + * \brief Registers a subscription handler. + * + * A subscription handler is called whenever the subscription state of an + * eventgroup changes. The callback is called with the client identifier + * and a boolean that indicates whether the client subscribed or + * unsubscribed. + * + * \param _service Service identifier of service instance whose + * subscription state is to be monitored. + * \param _instance Instance identifier of service instance whose + * subscription state is to be monitored. + * \param _eventgroup Eventgroup identifier of eventgroup whose + * subscription state is to be monitored. + * \param _handler Callback that shall be called. + * + */ + virtual void register_async_subscription_handler(service_t _service, instance_t _instance, eventgroup_t _eventgroup, async_subscription_handler_t _handler) = 0; }; /** @} */ diff --git a/interface/vsomeip/constants.hpp b/interface/vsomeip/constants.hpp index 9aeee09..d45f2c1 100644 --- a/interface/vsomeip/constants.hpp +++ b/interface/vsomeip/constants.hpp @@ -54,6 +54,8 @@ const byte_t SERVICE_COOKIE[] = { 0xFF, 0xFF, 0x80, 0x00, 0x00, 0x00, 0x00, const event_t ANY_EVENT = 0xFFFF; const client_t ANY_CLIENT = 0xFFFF; +const pending_subscription_id_t DEFAULT_SUBSCRIPTION = 0x0; + } // namespace vsomeip #endif // VSOMEIP_CONSTANTS_HPP diff --git a/interface/vsomeip/handler.hpp b/interface/vsomeip/handler.hpp index ab4f118..bcb2aa8 100644 --- a/interface/vsomeip/handler.hpp +++ b/interface/vsomeip/handler.hpp @@ -22,6 +22,7 @@ typedef std::function< bool (client_t, bool) > subscription_handler_t; typedef std::function< void (const uint16_t) > error_handler_t; typedef std::function< void (const service_t, const instance_t, const eventgroup_t, const event_t, const uint16_t) > subscription_status_handler_t; +typedef std::function< void (client_t, bool, std::function< void (const bool) > )> async_subscription_handler_t; typedef std::function< void (const std::vector<std::pair<service_t, instance_t>> &_services) > offered_services_handler_t; typedef std::function< void () > watchdog_handler_t; diff --git a/interface/vsomeip/primitive_types.hpp b/interface/vsomeip/primitive_types.hpp index d6c90de..9f6c84a 100644 --- a/interface/vsomeip/primitive_types.hpp +++ b/interface/vsomeip/primitive_types.hpp @@ -43,6 +43,7 @@ typedef std::string trace_channel_t; typedef std::string trace_filter_type_t; +typedef std::uint16_t pending_subscription_id_t; } // namespace vsomeip #endif // VSOMEIP_PRIMITIVE_TYPES_HPP diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c82123e..b3a29c8 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -67,7 +67,7 @@ endfunction() ############################################################################## # configuration-test ############################################################################## -if(NOT ${TESTS_BAT}) +if(NOT ${TESTS_BAT}) set(TEST_CONFIGURATION configuration-test) add_executable(${TEST_CONFIGURATION} configuration_tests/configuration-test.cpp) @@ -588,6 +588,57 @@ if(NOT ${TESTS_BAT}) ${TEST_RESTART_ROUTING_CLIENT} ) endif() + +############################################################################## +# security test +############################################################################## + +if (${TEST_SECURITY}) + if(NOT ${TESTS_BAT}) + set(TEST_SECURITY_NAME security_test) + + set(TEST_SECURITY_SERVICE security_test_service) + add_executable(${TEST_SECURITY_SERVICE} security_tests/${TEST_SECURITY_SERVICE}.cpp) + target_link_libraries(${TEST_SECURITY_SERVICE} + vsomeip + ${Boost_LIBRARIES} + ${DL_LIBRARY} + ${TEST_LINK_LIBRARIES} + ) + + # Copy config file for service into $BUILDDIR/test + set(TEST_SECURITY_SERVICE_CONFIG_FILE ${TEST_SECURITY_NAME}_config.json) + configure_file( + ${PROJECT_SOURCE_DIR}/test/security_tests/conf/${TEST_SECURITY_SERVICE_CONFIG_FILE}.in + ${PROJECT_SOURCE_DIR}/test/security_tests/${TEST_SECURITY_SERVICE_CONFIG_FILE} + @ONLY) + copy_to_builddir( + ${PROJECT_SOURCE_DIR}/test/security_tests/${TEST_SECURITY_SERVICE_CONFIG_FILE} + ${PROJECT_BINARY_DIR}/test/${TEST_SECURITY_SERVICE_CONFIG_FILE} + ${TEST_SECURITY_SERVICE} + ) + + # Copy bashscript to start service into $BUILDDIR/test + set(TEST_SECURITY_SERVICE_START_SCRIPT ${TEST_SECURITY_NAME}_start.sh) + copy_to_builddir( + ${PROJECT_SOURCE_DIR}/test/security_tests/${TEST_SECURITY_SERVICE_START_SCRIPT} + ${PROJECT_BINARY_DIR}/test/${TEST_SECURITY_SERVICE_START_SCRIPT} + ${TEST_SECURITY_SERVICE} + ) + + set(TEST_SECURITY_CLIENT security_test_client) + add_executable(${TEST_SECURITY_CLIENT} + security_tests/${TEST_SECURITY_CLIENT}.cpp + ) + target_link_libraries(${TEST_SECURITY_CLIENT} + vsomeip + ${Boost_LIBRARIES} + ${DL_LIBRARY} + ${TEST_LINK_LIBRARIES} + ) + endif() +endif() + ############################################################################## # payload-test ############################################################################## @@ -2052,6 +2103,10 @@ if(NOT ${TESTS_BAT}) add_dependencies(build_tests ${TEST_OFFER_EXTERNAL_SD_MESSAGE_SENDER}) add_dependencies(build_tests ${TEST_RESTART_ROUTING_SERVICE}) add_dependencies(build_tests ${TEST_RESTART_ROUTING_CLIENT}) + if (${TEST_SECURITY}) + add_dependencies(build_tests ${TEST_SECURITY_SERVICE}) + add_dependencies(build_tests ${TEST_SECURITY_CLIENT}) + endif() add_dependencies(build_tests ${TEST_OFFERED_SERVICES_INFO_CLIENT}) add_dependencies(build_tests ${TEST_OFFERED_SERVICES_INFO_SERVICE}) else() @@ -2432,7 +2487,13 @@ if(NOT ${TESTS_BAT}) # Restart-Routing tests add_test(NAME ${TEST_RESTART_ROUTING_NAME} COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_RESTART_ROUTING_STARTER} - ) + ) + if (${TEST_SECURITY}) + # Security tests + add_test(NAME ${TEST_SECURITY_NAME} + COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_SECURITY_SERVICE_START_SCRIPT} + ) + endif() else() # Routing tests add_test(NAME ${TEST_LOCAL_ROUTING_NAME} diff --git a/test/configuration_tests/configuration-test-deprecated.json b/test/configuration_tests/configuration-test-deprecated.json index ad4c0d6..5ffacce 100644 --- a/test/configuration_tests/configuration-test-deprecated.json +++ b/test/configuration_tests/configuration-test-deprecated.json @@ -236,6 +236,24 @@ "clients" : [ { + "reliable_remote_ports" : { "first" : "30500", "last" : "30599" }, + "unreliable_remote_ports" : { "first" : "30500", "last" : "30599" }, + "reliable_client_ports" : { "first" : "30491", "last" : "30499" }, + "unreliable_client_ports" : { "first" : "30491", "last" : "30499" } + }, + { + "reliable_remote_ports" : { "first" : "31500", "last" : "31599" }, + "unreliable_remote_ports" : { "first" : "31500", "last" : "31599" }, + "reliable_client_ports" : { "first" : "31491", "last" : "31499" }, + "unreliable_client_ports" : { "first" : "31491", "last" : "31499" } + }, + { + "reliable_remote_ports" : { "first" : "32500", "last" : "32599" }, + "unreliable_remote_ports" : { "first" : "32500", "last" : "32599" }, + "reliable_client_ports" : { "first" : "32491", "last" : "32499" }, + "unreliable_client_ports" : { "first" : "32491", "last" : "32499" } + }, + { "service" : "0x8888", "instance" : "0x1", "unreliable" : [ "0x11", "0x10" ], diff --git a/test/configuration_tests/configuration-test.cpp b/test/configuration_tests/configuration-test.cpp index 8693d4d..a7e8def 100644 --- a/test/configuration_tests/configuration-test.cpp +++ b/test/configuration_tests/configuration-test.cpp @@ -424,18 +424,42 @@ void check_file(const std::string &_config_file, used_ports[true].insert(0x11); used_ports[false].insert(0x10); std::uint16_t port_to_use(0x0); - EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x1, true, used_ports, port_to_use)); + EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x1, vsomeip::ILLEGAL_PORT, true, used_ports, port_to_use)); EXPECT_EQ(0x10, port_to_use); - EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x1, false, used_ports, port_to_use)); + EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x1, vsomeip::ILLEGAL_PORT, false, used_ports, port_to_use)); EXPECT_EQ(0x11, port_to_use); used_ports[true].insert(0x10); used_ports[false].insert(0x11); - EXPECT_FALSE(its_configuration->get_client_port(0x8888, 0x1, true, used_ports, port_to_use)); + EXPECT_FALSE(its_configuration->get_client_port(0x8888, 0x1, vsomeip::ILLEGAL_PORT, true, used_ports, port_to_use)); EXPECT_EQ(vsomeip::ILLEGAL_PORT, port_to_use); - EXPECT_FALSE(its_configuration->get_client_port(0x8888, 0x1, false, used_ports, port_to_use)); + EXPECT_FALSE(its_configuration->get_client_port(0x8888, 0x1, vsomeip::ILLEGAL_PORT, false, used_ports, port_to_use)); EXPECT_EQ(vsomeip::ILLEGAL_PORT, port_to_use); + + //check for correct client port assignment if service / instance was not configured but a remote port range + used_ports.clear(); + EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x12, 0x7725, true, used_ports, port_to_use)); + EXPECT_EQ(0x771B, port_to_use); + used_ports[true].insert(0x771B); + EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x12, 0x7725, true, used_ports, port_to_use)); + EXPECT_EQ(0x771C, port_to_use); + used_ports[true].insert(0x771C); + EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x12, 0x7B0D, true, used_ports, port_to_use)); + EXPECT_EQ(0x7B03, port_to_use); + used_ports[true].insert(0x7B03); + EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x12, 0x7B0D, true, used_ports, port_to_use)); + EXPECT_EQ(0x7B04, port_to_use); + used_ports[true].insert(0x7B04); + EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x12, 0x7EF4, true, used_ports, port_to_use)); + EXPECT_EQ(0x7EEB, port_to_use); + used_ports[true].insert(0x7EEB); + EXPECT_TRUE(its_configuration->get_client_port(0x8888, 0x12, 0x7EF4, true, used_ports, port_to_use)); + EXPECT_EQ(0x7EEC, port_to_use); + used_ports[true].insert(0x7EEC); + used_ports.clear(); + + // payload sizes // use 17000 instead of 1500 as configured max-local-payload size will be // increased to bigger max-reliable-payload-size diff --git a/test/configuration_tests/configuration-test.json b/test/configuration_tests/configuration-test.json index 1509ad4..32d11d7 100644 --- a/test/configuration_tests/configuration-test.json +++ b/test/configuration_tests/configuration-test.json @@ -199,6 +199,24 @@ "clients" : [ { + "reliable_remote_ports" : { "first" : "30500", "last" : "30599" }, + "unreliable_remote_ports" : { "first" : "30500", "last" : "30599" }, + "reliable_client_ports" : { "first" : "30491", "last" : "30499" }, + "unreliable_client_ports" : { "first" : "30491", "last" : "30499" } + }, + { + "reliable_remote_ports" : { "first" : "31500", "last" : "31599" }, + "unreliable_remote_ports" : { "first" : "31500", "last" : "31599" }, + "reliable_client_ports" : { "first" : "31491", "last" : "31499" }, + "unreliable_client_ports" : { "first" : "31491", "last" : "31499" } + }, + { + "reliable_remote_ports" : { "first" : "32500", "last" : "32599" }, + "unreliable_remote_ports" : { "first" : "32500", "last" : "32599" }, + "reliable_client_ports" : { "first" : "32491", "last" : "32499" }, + "unreliable_client_ports" : { "first" : "32491", "last" : "32499" } + }, + { "service" : "0x8888", "instance" : "0x1", "unreliable" : [ "0x11", "0x10" ], diff --git a/test/security_tests/conf/security_test_config.json.in b/test/security_tests/conf/security_test_config.json.in new file mode 100644 index 0000000..693bf58 --- /dev/null +++ b/test/security_tests/conf/security_test_config.json.in @@ -0,0 +1,71 @@ +{ + "unicast" : "localhost", + "logging" : + { + "level" : "info", + "console" : "true", + "file" : { "enable" : "false", "path" : "/tmp/vsomeip.log" }, + "dlt" : "false" + }, + "applications" : + [ + { + "name" : "service-sample", + "id" : "0x1277" + }, + { + "name" : "client-sample", + "id" : "0x1255" + } + ], + "security" : + { + "check_credentials" : "true", + "policies" : + [ + { + "client" : "0x1277", + "credentials" : { "uid" : "@TEST_UID@", "gid" : "@TEST_GID@" }, + "allow" : + { + "offers": + [ + { + "service" : "0x1234", + "instance" : "0x5678" + } + ] + } + }, + { + "client" : "0x1255", + "credentials" : { "uid" : "@TEST_UID@", "gid" : "@TEST_GID@" }, + "allow" : + { + "requests": + [ + { + "service" : "0x1234", + "instance" : "0x5678" + } + ] + } + } + ] + }, + "routing" : "service-sample", + "service-discovery" : + { + "enable" : "true", + "multicast" : "224.244.224.245", + "port" : "30490", + "protocol" : "udp", + "initial_delay_min" : "10", + "initial_delay_max" : "100", + "repetitions_base_delay" : "200", + "repetitions_max" : "3", + "ttl" : "3", + "cyclic_offer_delay" : "2000", + "request_response_delay" : "1500" + } +} diff --git a/test/security_tests/security_test_client.cpp b/test/security_tests/security_test_client.cpp new file mode 100644 index 0000000..924f13c --- /dev/null +++ b/test/security_tests/security_test_client.cpp @@ -0,0 +1,158 @@ +// Copyright (C) 2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include "security_test_client.hpp" + +security_test_client::security_test_client() + : app_(vsomeip::runtime::get()->create_application()), + is_available_(false), + sender_(std::bind(&security_test_client::run, this)), + received_responses_(0) { + +} + +bool security_test_client::init() { + if (!app_->init()) { + ADD_FAILURE() << "Couldn't initialize application"; + return false; + } + + app_->register_state_handler( + std::bind(&security_test_client::on_state, this, + std::placeholders::_1)); + + app_->register_message_handler(vsomeip::ANY_SERVICE, + vsomeip_test::TEST_SERVICE_INSTANCE_ID, vsomeip::ANY_METHOD, + std::bind(&security_test_client::on_message, this, + std::placeholders::_1)); + + app_->register_availability_handler(vsomeip_test::TEST_SERVICE_SERVICE_ID, + vsomeip_test::TEST_SERVICE_INSTANCE_ID, + std::bind(&security_test_client::on_availability, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)); + return true; +} + +void security_test_client::start() { + VSOMEIP_INFO << "Starting..."; + + app_->start(); +} + +void security_test_client::stop() { + VSOMEIP_INFO << "Stopping..."; + + shutdown_service(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + app_->clear_all_handler(); + app_->stop(); +} + +void security_test_client::on_state(vsomeip::state_type_e _state) { + if(_state == vsomeip::state_type_e::ST_REGISTERED) { + app_->request_service(vsomeip_test::TEST_SERVICE_SERVICE_ID, + vsomeip_test::TEST_SERVICE_INSTANCE_ID, false); + } +} + +void security_test_client::on_availability(vsomeip::service_t _service, + vsomeip::instance_t _instance, bool _is_available) { + + VSOMEIP_INFO << std::hex << "Client 0x" << app_->get_client() + << " : Service [" << std::setw(4) << std::setfill('0') << std::hex + << _service << "." << _instance << "] is " + << (_is_available ? "available." : "NOT available."); + + if(vsomeip_test::TEST_SERVICE_SERVICE_ID == _service + && vsomeip_test::TEST_SERVICE_INSTANCE_ID == _instance) { + std::unique_lock<std::mutex> its_lock(mutex_); + if(is_available_ && !_is_available) { + is_available_ = false; + } + else if(_is_available && !is_available_) { + is_available_ = true; + condition_.notify_one(); + } + } +} + +void security_test_client::on_message(const std::shared_ptr<vsomeip::message> &_response) { + VSOMEIP_INFO << "Received a response from Service [" + << std::setw(4) << std::setfill('0') << std::hex << _response->get_service() + << "." + << std::setw(4) << std::setfill('0') << std::hex << _response->get_instance() + << "] to Client/Session [" + << std::setw(4) << std::setfill('0') << std::hex << _response->get_client() + << "/" + << std::setw(4) << std::setfill('0') << std::hex << _response->get_session() + << "]"; + + if (_response->get_service() == vsomeip_test::TEST_SERVICE_SERVICE_ID && + _response->get_instance() == vsomeip_test::TEST_SERVICE_INSTANCE_ID) { + received_responses_++; + if (received_responses_ == vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND_SECURITY_TESTS) { + VSOMEIP_WARNING << std::hex << app_->get_client() + << ": Received all messages ~> going down!"; + } + } +} + +void security_test_client::run() { + for (uint32_t i = 0; i < vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND_SECURITY_TESTS; ++i) { + { + std::unique_lock<std::mutex> its_lock(mutex_); + while (!is_available_) + { + condition_.wait(its_lock); + } + } + + auto request = vsomeip::runtime::get()->create_request(false); + request->set_service(vsomeip_test::TEST_SERVICE_SERVICE_ID); + request->set_instance(vsomeip_test::TEST_SERVICE_INSTANCE_ID); + request->set_method(vsomeip_test::TEST_SERVICE_METHOD_ID); + app_->send(request, true); + + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + EXPECT_EQ(vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND_SECURITY_TESTS, + received_responses_); + + stop(); +} + +void security_test_client::join_sender_thread() +{ + if (sender_.joinable()) { + sender_.join(); + } +} + +void security_test_client::shutdown_service() { + auto request = vsomeip::runtime::get()->create_request(false); + request->set_service(vsomeip_test::TEST_SERVICE_SERVICE_ID); + request->set_instance(vsomeip_test::TEST_SERVICE_INSTANCE_ID); + request->set_method(vsomeip_test::TEST_SERVICE_METHOD_ID_SHUTDOWN); + app_->send(request,true); +} + +TEST(someip_security_test, basic_request_response) +{ + security_test_client test_client; + if (test_client.init()) { + test_client.start(); + test_client.join_sender_thread(); + } +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}
\ No newline at end of file diff --git a/test/security_tests/security_test_client.hpp b/test/security_tests/security_test_client.hpp new file mode 100644 index 0000000..ed010fa --- /dev/null +++ b/test/security_tests/security_test_client.hpp @@ -0,0 +1,50 @@ + +// Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef SECURITY_TEST_CLIENT_HPP +#define SECURITY_TEST_CLIENT_HPP + +#include <gtest/gtest.h> + +#include <vsomeip/vsomeip.hpp> + +#include "../someip_test_globals.hpp" + +#include <thread> +#include <mutex> +#include <condition_variable> +#include <atomic> + +class security_test_client { +public: + security_test_client(); + bool init(); + void start(); + void stop(); + + void on_state(vsomeip::state_type_e _state); + void on_availability(vsomeip::service_t _service, + vsomeip::instance_t _instance, bool _is_available); + void on_message(const std::shared_ptr<vsomeip::message> &_response); + + void run(); + void join_sender_thread(); + +private: + void shutdown_service(); + + std::shared_ptr<vsomeip::application> app_; + + std::mutex mutex_; + std::condition_variable condition_; + bool is_available_; + + std::thread sender_; + + std::atomic<std::uint32_t> received_responses_; +}; + +#endif // SECURITY_TEST_CLIENT_HPP diff --git a/test/security_tests/security_test_client_start.sh b/test/security_tests/security_test_client_start.sh new file mode 100755 index 0000000..e87ca31 --- /dev/null +++ b/test/security_tests/security_test_client_start.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +export VSOMEIP_APPLICATION_NAME=client-sample +export VSOMEIP_CONFIGURATION=vsomeip-security.json +./security_test_client diff --git a/test/security_tests/security_test_service.cpp b/test/security_tests/security_test_service.cpp new file mode 100644 index 0000000..99fb14e --- /dev/null +++ b/test/security_tests/security_test_service.cpp @@ -0,0 +1,133 @@ +// Copyright (C) 2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include "security_test_service.hpp" + +security_test_service::security_test_service() : + app_(vsomeip::runtime::get()->create_application()), + is_registered_(false), + blocked_(false), + number_of_received_messages_(0), + offer_thread_(std::bind(&security_test_service::run, this)) { +} + +bool security_test_service::init() { + std::lock_guard<std::mutex> its_lock(mutex_); + + if (!app_->init()) { + ADD_FAILURE() << "Couldn't initialize application"; + return false; + } + app_->register_message_handler(vsomeip_test::TEST_SERVICE_SERVICE_ID, + vsomeip_test::TEST_SERVICE_INSTANCE_ID, vsomeip_test::TEST_SERVICE_METHOD_ID, + std::bind(&security_test_service::on_message, this, + std::placeholders::_1)); + + app_->register_message_handler(vsomeip_test::TEST_SERVICE_SERVICE_ID, + vsomeip_test::TEST_SERVICE_INSTANCE_ID, + vsomeip_test::TEST_SERVICE_METHOD_ID_SHUTDOWN, + std::bind(&security_test_service::on_message_shutdown, this, + std::placeholders::_1)); + + app_->register_state_handler( + std::bind(&security_test_service::on_state, this, + std::placeholders::_1)); + return true; +} + +void security_test_service::start() { + VSOMEIP_INFO << "Starting..."; + app_->start(); +} + +void security_test_service::stop() { + VSOMEIP_INFO << "Stopping..."; + app_->clear_all_handler(); + app_->stop(); +} + +void security_test_service::join_offer_thread() { + if (offer_thread_.joinable()) { + offer_thread_.join(); + } +} + +void security_test_service::offer() { + app_->offer_service(vsomeip_test::TEST_SERVICE_SERVICE_ID, vsomeip_test::TEST_SERVICE_INSTANCE_ID); +} + +void security_test_service::stop_offer() { + app_->stop_offer_service(vsomeip_test::TEST_SERVICE_SERVICE_ID, vsomeip_test::TEST_SERVICE_INSTANCE_ID); +} + +void security_test_service::on_state(vsomeip::state_type_e _state) { + VSOMEIP_INFO << "Application " << app_->get_name() << " is " + << (_state == vsomeip::state_type_e::ST_REGISTERED ? "registered." : + "deregistered."); + + if(_state == vsomeip::state_type_e::ST_REGISTERED) { + if(!is_registered_) { + is_registered_ = true; + std::lock_guard<std::mutex> its_lock(mutex_); + blocked_ = true; + // "start" the run method thread + condition_.notify_one(); + } + } + else { + is_registered_ = false; + } +} + +void security_test_service::on_message(const std::shared_ptr<vsomeip::message>& _request) { + ASSERT_EQ(vsomeip_test::TEST_SERVICE_SERVICE_ID, _request->get_service()); + ASSERT_EQ(vsomeip_test::TEST_SERVICE_METHOD_ID, _request->get_method()); + + VSOMEIP_INFO << "Received a message with Client/Session [" << std::setw(4) + << std::setfill('0') << std::hex << _request->get_client() << "/" + << std::setw(4) << std::setfill('0') << std::hex + << _request->get_session() << "]"; + + // send response + std::shared_ptr<vsomeip::message> its_response = + vsomeip::runtime::get()->create_response(_request); + + app_->send(its_response, true); + + number_of_received_messages_++; + if(number_of_received_messages_ == vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND_SECURITY_TESTS) { + VSOMEIP_INFO << "Received all messages!"; + } +} + +void security_test_service::on_message_shutdown( + const std::shared_ptr<vsomeip::message>& _request) { + (void)_request; + VSOMEIP_INFO << "Shutdown method was called, going down now."; + stop(); +} + +void security_test_service::run() { + std::unique_lock<std::mutex> its_lock(mutex_); + while (!blocked_) + condition_.wait(its_lock); + + offer(); +} + +TEST(someip_security_test, basic_request_response) { + security_test_service test_service; + if (test_service.init()) { + test_service.start(); + test_service.join_offer_thread(); + } +} + +#ifndef _WIN32 +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} +#endif diff --git a/test/security_tests/security_test_service.hpp b/test/security_tests/security_test_service.hpp new file mode 100644 index 0000000..21f1f7c --- /dev/null +++ b/test/security_tests/security_test_service.hpp @@ -0,0 +1,44 @@ +// Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef SECURITY_TEST_SERVICE_HPP +#define SECURITY_TEST_SERVICE_HPP + +#include <gtest/gtest.h> + +#include <vsomeip/vsomeip.hpp> + +#include "../someip_test_globals.hpp" + +#include <thread> +#include <mutex> +#include <condition_variable> + +class security_test_service { +public: + security_test_service(); + bool init(); + void start(); + void stop(); + void offer(); + void stop_offer(); + void join_offer_thread(); + void on_state(vsomeip::state_type_e _state); + void on_message(const std::shared_ptr<vsomeip::message> &_request); + void on_message_shutdown(const std::shared_ptr<vsomeip::message> &_request); + void run(); + +private: + std::shared_ptr<vsomeip::application> app_; + bool is_registered_; + + std::mutex mutex_; + std::condition_variable condition_; + bool blocked_; + std::uint32_t number_of_received_messages_; + std::thread offer_thread_; +}; + +#endif // SECURITY_TEST_SERVICE_HPP diff --git a/test/security_tests/security_test_start.sh b/test/security_tests/security_test_start.sh new file mode 100755 index 0000000..ac4f51f --- /dev/null +++ b/test/security_tests/security_test_start.sh @@ -0,0 +1,15 @@ +#!/bin/bash +# Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +export VSOMEIP_CONFIGURATION=security_test_config.json + +export VSOMEIP_APPLICATION_NAME=service-sample +./security_test_service & + +sleep 1 + +export VSOMEIP_APPLICATION_NAME=client-sample +./security_test_client diff --git a/test/someip_test_globals.hpp b/test/someip_test_globals.hpp index 5a5f916..8e7af4b 100644 --- a/test/someip_test_globals.hpp +++ b/test/someip_test_globals.hpp @@ -37,6 +37,8 @@ constexpr std::uint32_t MAX_PAYLOADSIZE = 1024*128; constexpr std::uint32_t MAX_PAYLOADSIZE_UDP = 1400; constexpr std::uint32_t NUMBER_OF_MESSAGES_TO_SEND_ROUTING_RESTART_TESTS = 32; + +constexpr std::uint32_t NUMBER_OF_MESSAGES_TO_SEND_SECURITY_TESTS = 32; } #endif /* SOMEIP_TEST_GLOBALS_HPP_ */ diff --git a/test/subscribe_notify_tests/subscribe_notify_test_one_event_two_eventgroups_client.cpp b/test/subscribe_notify_tests/subscribe_notify_test_one_event_two_eventgroups_client.cpp index eaec6be..6461d87 100644 --- a/test/subscribe_notify_tests/subscribe_notify_test_one_event_two_eventgroups_client.cpp +++ b/test/subscribe_notify_tests/subscribe_notify_test_one_event_two_eventgroups_client.cpp @@ -32,7 +32,6 @@ public: wait_availability_(true), wait_set_value_(true), wait_shutdown_response_(true), - wait_events_(true), run_thread_(std::bind(&subscribe_notify_test_one_event_two_eventgroups_client::run, this)) { } ~subscribe_notify_test_one_event_two_eventgroups_client() { @@ -185,9 +184,12 @@ public: app_->send(its_request); } - void wait_on_condition(std::unique_lock<std::mutex>&& _lock, bool *_predicate, std::condition_variable&& _condition) { + void wait_on_condition(std::unique_lock<std::mutex>&& _lock, bool *_predicate, std::condition_variable&& _condition, std::uint32_t _timeout) { while (*_predicate) { - _condition.wait(_lock); + if (std::cv_status::timeout == _condition.wait_for(_lock, std::chrono::seconds(_timeout))) { + ADD_FAILURE() << "Condition variable wasn't notified within time (" + << _timeout << "sec)"; + } } *_predicate = true; } @@ -206,9 +208,15 @@ public: void wait_for_events(std::unique_lock<std::mutex>&& _lock, std::uint32_t _expected_number_received_events, std::condition_variable&& _condition) { - while (wait_events_ && - received_events_.size() != _expected_number_received_events) { - _condition.wait(_lock); + std::cv_status its_status(std::cv_status::no_timeout); + while (received_events_.size() != _expected_number_received_events + && its_status != std::cv_status::timeout) { + its_status = _condition.wait_for(_lock, std::chrono::seconds(15)); + if (std::cv_status::timeout == its_status) { + ADD_FAILURE() << "Didn't receive expected number of events: " + << _expected_number_received_events + << " within time. Instead received: " << received_events_.size(); + } } ASSERT_EQ(size_t(_expected_number_received_events), received_events_.size()); } @@ -232,14 +240,14 @@ public: void run() { std::unique_lock<std::mutex> its_availability_lock(availability_mutex_); - wait_on_condition(std::move(its_availability_lock), &wait_availability_, std::move(availability_condition_)); + wait_on_condition(std::move(its_availability_lock), &wait_availability_, std::move(availability_condition_), 300); // service is available now for (int i = 0; i < 3; i++) { // set value set_field_at_service(0x1); std::unique_lock<std::mutex> its_set_value_lock(set_value_mutex_); - wait_on_condition(std::move(its_set_value_lock), &wait_set_value_, std::move(set_value_condition_)); + wait_on_condition(std::move(its_set_value_lock), &wait_set_value_, std::move(set_value_condition_), 30); // subscribe std::unique_lock<std::mutex> its_events_lock(events_mutex_); @@ -258,7 +266,7 @@ public: // set value again set_field_at_service(0x2); - wait_on_condition(std::move(its_set_value_lock), &wait_set_value_, std::move(set_value_condition_)); + wait_on_condition(std::move(its_set_value_lock), &wait_set_value_, std::move(set_value_condition_), 30); wait_for_events(std::move(its_events_lock), 3, std::move(events_condition_)); check_received_events_payload(0x2); @@ -272,7 +280,7 @@ public: // set value again set_field_at_service(0x3); - wait_on_condition(std::move(its_set_value_lock), &wait_set_value_, std::move(set_value_condition_)); + wait_on_condition(std::move(its_set_value_lock), &wait_set_value_, std::move(set_value_condition_), 30); wait_for_events(std::move(its_events_lock), 3, std::move(events_condition_)); check_received_events_payload(0x3); its_expected.insert({info_.event_id, 1}); @@ -288,7 +296,7 @@ public: } std::unique_lock<std::mutex> its_shutdown_lock(shutdown_response_mutex_); call_method_at_service(subscribe_notify_test::shutdown_method_id); - wait_on_condition(std::move(its_shutdown_lock), &wait_shutdown_response_, std::move(shutdown_response_condition_)); + wait_on_condition(std::move(its_shutdown_lock), &wait_shutdown_response_, std::move(shutdown_response_condition_), 30); stop(); } @@ -309,7 +317,6 @@ private: std::mutex shutdown_response_mutex_; std::condition_variable shutdown_response_condition_; - bool wait_events_; std::mutex events_mutex_; std::condition_variable events_condition_; |