From 8bb2ed134d75803e8e6e3c4f4baa253e4d74edf4 Mon Sep 17 00:00:00 2001 From: Juergen Gehring Date: Thu, 25 Jan 2018 00:40:07 -0800 Subject: vsomeip 2.10.5 --- CHANGES | 28 ++ CMakeLists.txt | 2 +- documentation/vsomeipUserGuide | 159 ++++++++- examples/request-sample.cpp | 3 +- .../configuration/include/configuration.hpp | 18 + .../configuration/include/configuration_impl.hpp | 40 ++- implementation/configuration/include/debounce.hpp | 39 +++ .../configuration/include/internal.hpp.in | 9 +- .../configuration/src/configuration_impl.cpp | 364 ++++++++++++++++++++- .../endpoints/include/client_endpoint_impl.hpp | 8 +- implementation/endpoints/include/endpoint_impl.hpp | 6 +- .../include/local_client_endpoint_impl.hpp | 3 +- .../include/local_server_endpoint_impl.hpp | 6 +- .../endpoints/include/server_endpoint_impl.hpp | 5 +- .../endpoints/include/tcp_client_endpoint_impl.hpp | 3 +- .../endpoints/include/tcp_server_endpoint_impl.hpp | 5 +- .../endpoints/include/udp_client_endpoint_impl.hpp | 3 +- .../endpoints/include/udp_server_endpoint_impl.hpp | 3 +- .../endpoints/src/client_endpoint_impl.cpp | 75 ++++- implementation/endpoints/src/endpoint_impl.cpp | 6 +- .../endpoints/src/local_client_endpoint_impl.cpp | 16 +- .../endpoints/src/local_server_endpoint_impl.cpp | 23 +- .../endpoints/src/server_endpoint_impl.cpp | 64 +++- .../endpoints/src/tcp_client_endpoint_impl.cpp | 24 +- .../endpoints/src/tcp_server_endpoint_impl.cpp | 31 +- .../endpoints/src/udp_client_endpoint_impl.cpp | 13 +- .../endpoints/src/udp_server_endpoint_impl.cpp | 14 +- implementation/routing/include/event.hpp | 2 +- .../routing/include/routing_manager_impl.hpp | 2 +- .../routing/include/routing_manager_stub_host.hpp | 2 +- implementation/routing/src/event.cpp | 7 +- .../routing/src/routing_manager_base.cpp | 97 +++++- .../routing/src/routing_manager_impl.cpp | 88 +++-- .../routing/src/routing_manager_proxy.cpp | 3 +- .../routing/src/routing_manager_stub.cpp | 6 +- implementation/runtime/src/application_impl.cpp | 22 +- .../include/configuration_option_impl.hpp | 2 +- .../include/eventgroupentry_impl.hpp | 24 +- .../service_discovery/include/ip_option_impl.hpp | 2 +- .../service_discovery/include/ipv4_option_impl.hpp | 1 + .../service_discovery/include/ipv6_option_impl.hpp | 1 + .../include/load_balancing_option_impl.hpp | 2 +- .../include/protection_option_impl.hpp | 2 +- .../include/service_discovery_impl.hpp | 11 +- .../src/configuration_option_impl.cpp | 12 +- .../service_discovery/src/eventgroupentry_impl.cpp | 118 ++++++- .../service_discovery/src/ip_option_impl.cpp | 13 +- .../service_discovery/src/ipv4_option_impl.cpp | 5 + .../service_discovery/src/ipv6_option_impl.cpp | 5 + .../src/load_balancing_option_impl.cpp | 13 +- .../service_discovery/src/option_impl.cpp | 3 +- .../src/protection_option_impl.cpp | 13 +- .../src/service_discovery_impl.cpp | 61 +++- test/CMakeLists.txt | 76 ++++- test/big_payload_tests/big_payload_test_client.cpp | 34 +- .../big_payload_test_external_starter.sh | 6 +- .../big_payload_tests/big_payload_test_globals.hpp | 4 + .../big_payload_test_local_queue_limited.json | 42 +++ .../big_payload_test_local_starter.sh | 6 +- .../big_payload_tests/big_payload_test_service.cpp | 20 +- .../big_payload_test_service_external_start.sh | 8 +- ...d_test_tcp_client_queue_limited_general.json.in | 31 ++ ..._test_tcp_client_queue_limited_specific.json.in | 43 +++ ..._test_tcp_service_queue_limited_general.json.in | 87 +++++ ...test_tcp_service_queue_limited_specific.json.in | 99 ++++++ 65 files changed, 1695 insertions(+), 248 deletions(-) create mode 100644 implementation/configuration/include/debounce.hpp create mode 100644 test/big_payload_tests/big_payload_test_local_queue_limited.json create mode 100644 test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_general.json.in create mode 100644 test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_specific.json.in create mode 100644 test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_general.json.in create mode 100644 test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_specific.json.in diff --git a/CHANGES b/CHANGES index 1fa9a5f..6f3f91d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,34 @@ Changes ======= +v2.10.5 +- Fix possible deadlock on application shutdown +- Try to reestablish TCP connection on resubscription if the remote + closed the connection +- Introduce new configuration file parameters to control + interpretation of TTL field of incoming remote offers and + subscriptions: + - service-discovery > ttl_factor_offers (optional array of + service/instance/TTL factor tuples) + - service-discovery > ttl_factor_subscriptions (optional array of + service/instance/TTL factor tuples) +- Added possibility to debounce external events/fields + based on time or change of data in the payload (maskable) via new + configuration file parameter: + - debounce (optional array) + For more information see the vsomeipUserGuide. +- Added possibility to limit amount of memory used to cache outgoing + messages on IP port basis or globally via configuration file + parameter: + - endpoint-queue-limits (array): to limit on IP:Port (endpoint) + level + - endpoint-queue-limit-external: to generally limit all external + endpoints. + - endpoint-queue-limit-local: to limit queue sizes for local + communication + For more information see the vsomeipUserGuide. + + v2.10.4 - Extended diagnosis plugin to handle requests for "disableRxAndEnableTx" and "disableRxAndTx". diff --git a/CMakeLists.txt b/CMakeLists.txt index d1df178..e062da1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ project (vsomeip) set (VSOMEIP_MAJOR_VERSION 2) set (VSOMEIP_MINOR_VERSION 10) -set (VSOMEIP_PATCH_VERSION 4) +set (VSOMEIP_PATCH_VERSION 5) set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION}) set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in set (CMAKE_VERBOSE_MAKEFILE off) diff --git a/documentation/vsomeipUserGuide b/documentation/vsomeipUserGuide index 0c222b9..3fb07c3 100644 --- a/documentation/vsomeipUserGuide +++ b/documentation/vsomeipUserGuide @@ -650,7 +650,7 @@ remote service hosted on beforehand specified IP and port. On service side: the payload size limit in bytes of messages received and sent by the service offered on previously specified IP and port. + -If multiple services are hosted on the same port all they share the limit +If multiple services are hosted on the same port they all share the limit specified. * `max-payload-size-local` @@ -665,6 +665,62 @@ The maximum allowed payload size for TCP communication in bytes. By default the payload size for TCP communication is unlimited. It can be limited via this setting. +* `endpoint-queue-limits` (array) ++ +Array to limit the maximum allowed size in bytes of cached outgoing messages per +IP and port (message queue size per endpoint). If not specified otherwise the +allowed queue size is unlimited. The settings in this array only affect external +communication. To limit the local queue size `endpoint-queue-limit-local` can +be used. + +** `unicast` ++ +On client side: the IP of the remote service for which the queue size of sent +requests should be limited. ++ +On service side: the IP of the offered service for which the queue size for +sent responses should be limited. This IP address is therefore +identical to the IP address specified via `unicast` setting on top level of the +json file. + +** `ports` (array) ++ +Array which holds pairs of port and queue size statements. + +*** `port` ++ +On client side: the port of the remote service for which the queue size of sent +requests should be limited. ++ +On service side: the port of the offered service for which the queue size for +send responses should be limited. + +*** `queue-size-limit` ++ +On client side: the queue size limit in bytes of messages sent to the +remote service hosted on beforehand specified IP and port. ++ +On service side: the queue size limit in bytes for responses sent by the service +offered on previously specified IP and port. ++ +If multiple services are hosted on the same port they all share the limit +specified. + +* `endpoint-queue-limit-external` ++ +Setting to limit the maximum allowed size in bytes of cached outgoing messages +for external communication (message queue size per endpoint). By default the +queue size for external communication is unlimited. It can be limited via this +setting. Settings done in the `endpoint-queue-limits` array override this +setting. + +* `endpoint-queue-limit-local` ++ +Setting to limit the maximum allowed size in bytes of cached outgoing messages +for local communication (message queue size per endpoint). By default the queue +size for node internal communication is unlimited. It can be limited via this +setting. + * `buffer-shrink-threshold` + The number of processed messages which are half the size or smaller than the @@ -714,6 +770,60 @@ The highest Service-ID in hex of a internal service range. The highest Instance-ID in hex of a internal service-instance range. If not specified the highest Instance-ID is 0xFFFF. +* `debounce` (optional array) ++ +Events/fields sent by external devices will be forwarded to the +applications only if a configurable function evaluates to true. The +function checks whether the event/field payload has changed and whether +a specified interval has been elapsed since the last forwarding. + +** `service` ++ +Service ID which hosts the events to be debounced. + +** `instance` ++ +Instance ID which hosts the events to be debounced. + +** `events` ++ +Array of events which shall be debounced based on the following +configuration options. + +*** `event` ++ +Event ID. + +*** `on_change` ++ +Specifies whether the event is only forwared on +paylaod change or not. (valid values: _true_, _false_). + +*** `ignore` ++ +Array of payload indexes with given bit mask (optional) +to be ignored in payload change evaluation. +Instead of specifying an index / bitmask pair, one can only define the paylaod index +which shall be ignored in the evaluation. + +**** `index` ++ +Payload index to be checked with given bitmask. + +**** `mask` ++ +1Byte bitmask applied to byte at given payload index. +Example mask: 0x0f ignores payload changes in low nibble of the byte at given index. + +*** `interval` ++ +Specifies if the event shall be debounced based on elapsed time interval. +(valid values: _time in ms_, _never_). + +*** `on_change_resets_interval_` (optional) +Specifies if interval timer is reset when payload change was detected. +(valid values: _false_, _true_). + * `routing` + The name of the application that is responsible for the routing. @@ -762,10 +872,55 @@ repetition phase. + Lifetime of entries for provided services as well as consumed services and eventgroups. +** `ttl_factor_offers` (optional array) ++ +Array which holds correction factors for incoming remote offers. If a value +greater than one is specified for a service instance, the TTL field of the +corresponding service entry will be multiplied with the specified factor. + +Example: An offer of a service is received with a TTL of 3 sec and the TTL +factor is set to 5. The remote node stops offering the service w/o sending a +StopOffer message. The service will then expire (marked as unavailable) 15 seconds +after the last offer has been received. + +*** `service` ++ +The id of the service. + +*** `instance` ++ +The id of the service instance. + +*** `ttl_factor` ++ +TTL correction factor + +** `ttl_factor_subscriptions` (optional array) ++ +Array which holds correction factors for incoming remote subscriptions. If a +value greater than one is specified for a service instance, the TTL field of the +corresponding eventgroup entry will be multiplied with the specified factor. + +Example: A remote subscription to an offered service is received with a TTL of 3 +sec and the TTL factor is set to 5. The remote node stops resubscribing to the +service w/o sending a StopSubscribeEventgroup message. The subscription will +then expire 15 seconds after the last resubscription has been received. + +*** `service` ++ +The id of the service. + +*** `instance` ++ +The id of the service instance. + +*** `ttl_factor` ++ +TTL correction factor + ** `cyclic_offer_delay` + Cycle of the OfferService messages in the main phase. + ** `request_response_delay` + Minimum delay of a unicast message to a multicast message for @@ -926,7 +1081,7 @@ To activate the 'Audit Mode' the 'security' object has to be included in the json file but the 'check_credentials' switch has to be set to false. For example: -[source, json] +[source, bash] ---- [...] "services" : diff --git a/examples/request-sample.cpp b/examples/request-sample.cpp index 39dfaaa..7732cb0 100644 --- a/examples/request-sample.cpp +++ b/examples/request-sample.cpp @@ -150,7 +150,6 @@ public: { std::unique_lock its_lock(mutex_); while (!blocked_) condition_.wait(its_lock); - std::this_thread::sleep_for(std::chrono::milliseconds(cycle_)); if (is_available_) { app_->send(request_, true); std::cout << "Client/Session [" @@ -165,8 +164,8 @@ public: << std::endl; blocked_ = false; } - } + std::this_thread::sleep_for(std::chrono::milliseconds(cycle_)); } } diff --git a/implementation/configuration/include/configuration.hpp b/implementation/configuration/include/configuration.hpp index 9908e48..16ebd33 100644 --- a/implementation/configuration/include/configuration.hpp +++ b/implementation/configuration/include/configuration.hpp @@ -25,6 +25,8 @@ #include "../../e2e_protection/include/e2exf/config.hpp" #include "e2e.hpp" +#include "debounce.hpp" + #define VSOMEIP_CONFIG_PLUGIN_VERSION 1 namespace vsomeip { @@ -150,6 +152,22 @@ public: virtual bool log_status() const = 0; virtual uint32_t get_log_status_interval() const = 0; + + // TTL factor + typedef std::uint32_t ttl_factor_t; + typedef std::map> ttl_map_t; + virtual ttl_map_t get_ttl_factor_offers() const = 0; + virtual ttl_map_t get_ttl_factor_subscribes() const = 0; + + // Debouncing + virtual std::shared_ptr get_debounce( + service_t _service, instance_t _instance, event_t _event) const = 0; + + // Queue size limit endpoints + typedef std::uint32_t endpoint_queue_limit_t; + virtual endpoint_queue_limit_t get_endpoint_queue_limit( + const std::string& _address, std::uint16_t _port) const = 0; + virtual endpoint_queue_limit_t get_endpoint_queue_limit_local() const = 0; }; } // namespace vsomeip diff --git a/implementation/configuration/include/configuration_impl.hpp b/implementation/configuration/include/configuration_impl.hpp index 265d5d9..93f7ebf 100644 --- a/implementation/configuration/include/configuration_impl.hpp +++ b/implementation/configuration/include/configuration_impl.hpp @@ -22,6 +22,7 @@ #include "policy.hpp" #include "../../e2e_protection/include/e2exf/config.hpp" #include "e2e.hpp" +#include "debounce.hpp" namespace vsomeip { namespace cfg { @@ -161,6 +162,15 @@ public: VSOMEIP_EXPORT bool log_status() const; VSOMEIP_EXPORT uint32_t get_log_status_interval() const; + VSOMEIP_EXPORT ttl_map_t get_ttl_factor_offers() const; + VSOMEIP_EXPORT ttl_map_t get_ttl_factor_subscribes() const; + + VSOMEIP_EXPORT std::shared_ptr get_debounce( + service_t _service, instance_t _instance, event_t _event) const; + + VSOMEIP_EXPORT endpoint_queue_limit_t get_endpoint_queue_limit( + const std::string& _address, std::uint16_t _port) const; + VSOMEIP_EXPORT endpoint_queue_limit_t get_endpoint_queue_limit_local() const; private: void read_data(const std::set &_input, std::vector &_elements, @@ -221,6 +231,15 @@ private: void load_policies(const element &_element); void load_policy(const boost::property_tree::ptree &_tree); + void load_debounce(const element &_element); + void load_service_debounce(const boost::property_tree::ptree &_tree); + void load_events_debounce(const boost::property_tree::ptree &_tree, + std::map> &_debounces); + void load_event_debounce(const boost::property_tree::ptree &_tree, + std::map> &_debounces); + void load_event_debounce_ignore(const boost::property_tree::ptree &_tree, + std::map &_ignore); + servicegroup *find_servicegroup(const std::string &_name) const; std::shared_ptr find_client(service_t _service, instance_t _instance) const; @@ -243,6 +262,11 @@ private: void load_e2e(const element &_element); void load_e2e_protected(const boost::property_tree::ptree &_tree); + void load_ttl_factors(const boost::property_tree::ptree &_tree, + ttl_map_t* _target); + + void load_endpoint_queue_sizes(const element &_element); + private: std::mutex mutex_; @@ -333,7 +357,12 @@ protected: ET_TRACING_ENABLE, ET_TRACING_SD_ENABLE, ET_SERVICE_DISCOVERY_OFFER_DEBOUNCE_TIME, - ET_MAX = 25 + ET_SERVICE_DISCOVERY_TTL_FACTOR_OFFERS, + ET_SERVICE_DISCOVERY_TTL_FACTOR_SUBSCRIPTIONS, + ET_ENDPOINT_QUEUE_LIMITS, + ET_ENDPOINT_QUEUE_LIMIT_EXTERNAL, + ET_ENDPOINT_QUEUE_LIMIT_LOCAL, + ET_MAX = 30 }; bool is_configured_[ET_MAX]; @@ -355,6 +384,15 @@ protected: bool log_status_; uint32_t log_status_interval_; + + ttl_map_t ttl_factors_offers_; + ttl_map_t ttl_factors_subscriptions_; + + std::map>>> debounces_; + + std::map> endpoint_queue_limits_; + endpoint_queue_limit_t endpoint_queue_limit_external_; + endpoint_queue_limit_t endpoint_queue_limit_local_; }; } // namespace cfg diff --git a/implementation/configuration/include/debounce.hpp b/implementation/configuration/include/debounce.hpp new file mode 100644 index 0000000..b4ce6d3 --- /dev/null +++ b/implementation/configuration/include/debounce.hpp @@ -0,0 +1,39 @@ +// Copyright (C) 2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef VSOMEIP_CFG_DEBOUNCE_HPP +#define VSOMEIP_CFG_DEBOUNCE_HPP + +#include + +namespace vsomeip { +namespace cfg { + +// Messages are forwarded either because their value differs from the +// last received message (on_change=true) or because the specified time +// (interval_) between two messages has elapsed. A message that is forwarded +// because of a changed value may reset the time until the next unchanged +// message is forwarded or not (on_change_resets_interval). By specifiying +// indexes and bit masks, the comparison that is carried out to decide whether +// or not two message values differ is configurable (ignore_). +struct debounce { + debounce() : on_change_(false), + on_change_resets_interval_(false), + interval_(0), + last_forwarded_((std::chrono::steady_clock::time_point::max)()) { + } + + bool on_change_; + bool on_change_resets_interval_; + std::map ignore_; + + long interval_; + std::chrono::steady_clock::time_point last_forwarded_; +}; + +} // namespace cfg +} // namespace vsomeip + +#endif // VSOMEIP_CFG_DEBOUNCE_HPP diff --git a/implementation/configuration/include/internal.hpp.in b/implementation/configuration/include/internal.hpp.in index 3a987f9..6012092 100644 --- a/implementation/configuration/include/internal.hpp.in +++ b/implementation/configuration/include/internal.hpp.in @@ -176,13 +176,10 @@ struct configuration_data_t { unsigned short routing_manager_host_; }; -#ifndef _WIN32 -constexpr std::uint32_t MESSAGE_SIZE_UNLIMITED = std::numeric_limits::max(); -#elif _MSC_VER >= 1900 -constexpr std::uint32_t MESSAGE_SIZE_UNLIMITED = (std::numeric_limits::max)(); -#else const std::uint32_t MESSAGE_SIZE_UNLIMITED = (std::numeric_limits::max)(); -#endif + +const std::uint32_t QUEUE_SIZE_UNLIMITED = (std::numeric_limits::max)(); + } // namespace vsomeip diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp index 43e3c67..0920b66 100644 --- a/implementation/configuration/src/configuration_impl.cpp +++ b/implementation/configuration/src/configuration_impl.cpp @@ -77,7 +77,9 @@ configuration_impl::configuration_impl() log_memory_(false), log_memory_interval_(0), log_status_(false), - log_status_interval_(0) { + log_status_interval_(0), + endpoint_queue_limit_external_(QUEUE_SIZE_UNLIMITED), + endpoint_queue_limit_local_(QUEUE_SIZE_UNLIMITED) { unicast_ = unicast_.from_string(VSOMEIP_UNICAST_ADDRESS); for (auto i = 0; i < ET_MAX; i++) is_configured_[i] = false; @@ -94,7 +96,9 @@ configuration_impl::configuration_impl(const configuration_impl &_other) max_reliable_message_size_(_other.max_reliable_message_size_), buffer_shrink_threshold_(_other.buffer_shrink_threshold_), permissions_shm_(VSOMEIP_DEFAULT_SHM_PERMISSION), - umask_(VSOMEIP_DEFAULT_UMASK_LOCAL_ENDPOINTS) { + umask_(VSOMEIP_DEFAULT_UMASK_LOCAL_ENDPOINTS), + endpoint_queue_limit_external_(_other.endpoint_queue_limit_external_), + endpoint_queue_limit_local_(_other.endpoint_queue_limit_local_) { applications_.insert(_other.applications_.begin(), _other.applications_.end()); services_.insert(_other.services_.begin(), _other.services_.end()); @@ -144,6 +148,8 @@ configuration_impl::configuration_impl(const configuration_impl &_other) log_memory_interval_ = _other.log_memory_interval_; log_status_ = _other.log_status_; log_status_interval_ = _other.log_status_interval_; + + debounces_ = _other.debounces_; } configuration_impl::~configuration_impl() { @@ -332,6 +338,7 @@ bool configuration_impl::load_data(const std::vector &_elements, load_network(e); load_diagnosis_address(e); load_payload_sizes(e); + load_endpoint_queue_sizes(e); load_permissions(e); load_policies(e); load_tracing(e); @@ -348,6 +355,7 @@ bool configuration_impl::load_data(const std::vector &_elements, load_watchdog(e); load_selective_broadcasts_support(e); load_e2e(e); + load_debounce(e); } } @@ -892,6 +900,22 @@ void configuration_impl::load_service_discovery( its_converter >> sd_offer_debounce_time_; is_configured_[ET_SERVICE_DISCOVERY_OFFER_DEBOUNCE_TIME] = true; } + } else if (its_key == "ttl_factor_offers") { + if (is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_OFFERS]) { + VSOMEIP_WARNING << "Multiple definitions for service_discovery.ttl_factor_offers." + " Ignoring definition from " << _element.name_; + } else { + load_ttl_factors(i->second, &ttl_factors_offers_); + is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_OFFERS] = true; + } + } else if (its_key == "ttl_factor_subscriptions") { + if (is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_SUBSCRIPTIONS]) { + VSOMEIP_WARNING << "Multiple definitions for service_discovery.ttl_factor_subscriptions." + " Ignoring definition from " << _element.name_; + } else { + load_ttl_factors(i->second, &ttl_factors_subscriptions_); + is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_SUBSCRIPTIONS] = true; + } } } } catch (...) { @@ -2636,5 +2660,341 @@ uint32_t configuration_impl::get_log_status_interval() const { return log_status_interval_; } +void configuration_impl::load_ttl_factors( + const boost::property_tree::ptree &_tree, ttl_map_t* _target) { + const service_t ILLEGAL_VALUE(0xffff); + for (const auto& i : _tree) { + service_t its_service(ILLEGAL_VALUE); + instance_t its_instance(ILLEGAL_VALUE); + configuration::ttl_factor_t its_ttl_factor(0); + + for (const auto& j : i.second) { + std::string its_key(j.first); + std::string its_value(j.second.data()); + std::stringstream its_converter; + + if (its_key == "ttl_factor") { + its_converter << its_value; + its_converter >> its_ttl_factor; + } else { + // Trim "its_value" + if (its_value.size() > 1 && its_value[0] == '0' && its_value[1] == 'x') { + its_converter << std::hex << its_value; + } else { + its_converter << std::dec << its_value; + } + + if (its_key == "service") { + its_converter >> its_service; + } else if (its_key == "instance") { + its_converter >> its_instance; + } + } + } + if (its_service != ILLEGAL_VALUE + && its_instance != ILLEGAL_VALUE + && its_ttl_factor > 0) { + (*_target)[its_service][its_instance] = its_ttl_factor; + } else { + VSOMEIP_ERROR << "Invalid ttl factor configuration"; + } + } +} + +configuration::ttl_map_t configuration_impl::get_ttl_factor_offers() const { + return ttl_factors_offers_; +} + +configuration::ttl_map_t configuration_impl::get_ttl_factor_subscribes() const { + return ttl_factors_subscriptions_; +} + +configuration::endpoint_queue_limit_t +configuration_impl::get_endpoint_queue_limit( + const std::string& _address, std::uint16_t _port) const { + auto found_address = endpoint_queue_limits_.find(_address); + if (found_address != endpoint_queue_limits_.end()) { + auto found_port = found_address->second.find(_port); + if (found_port != found_address->second.end()) { + return found_port->second; + } + } + return endpoint_queue_limit_external_; +} + +configuration::endpoint_queue_limit_t +configuration_impl::get_endpoint_queue_limit_local() const { + return endpoint_queue_limit_local_; +} + +void configuration_impl::load_endpoint_queue_sizes(const element &_element) { + const std::string endpoint_queue_limits("endpoint-queue-limits"); + const std::string endpoint_queue_limit_external("endpoint-queue-limit-external"); + const std::string endpoint_queue_limit_local("endpoint-queue-limit-local"); + + try { + if (_element.tree_.get_child_optional(endpoint_queue_limit_external)) { + if (is_configured_[ET_ENDPOINT_QUEUE_LIMIT_EXTERNAL]) { + VSOMEIP_WARNING << "Multiple definitions for " + << endpoint_queue_limit_external + << " Ignoring definition from " << _element.name_; + } else { + is_configured_[ET_ENDPOINT_QUEUE_LIMIT_EXTERNAL] = true; + auto mpsl = _element.tree_.get_child( + endpoint_queue_limit_external); + std::string s(mpsl.data()); + try { + endpoint_queue_limit_external_ = + static_cast(std::stoul( + s.c_str(), NULL, 10)); + } catch (const std::exception &e) { + VSOMEIP_ERROR<<__func__ << ": " << endpoint_queue_limit_external + << " " << e.what(); + } + } + } + if (_element.tree_.get_child_optional(endpoint_queue_limit_local)) { + if (is_configured_[ET_ENDPOINT_QUEUE_LIMIT_LOCAL]) { + VSOMEIP_WARNING << "Multiple definitions for " + << endpoint_queue_limit_local + << " Ignoring definition from " << _element.name_; + } else { + is_configured_[ET_ENDPOINT_QUEUE_LIMIT_LOCAL] = true; + auto mpsl = _element.tree_.get_child(endpoint_queue_limit_local); + std::string s(mpsl.data()); + try { + endpoint_queue_limit_local_= + static_cast( + std::stoul(s.c_str(), NULL, 10)); + } catch (const std::exception &e) { + VSOMEIP_ERROR<< __func__ << ": "<< endpoint_queue_limit_local + << " " << e.what(); + } + } + } + + if (_element.tree_.get_child_optional(endpoint_queue_limits)) { + if (is_configured_[ET_ENDPOINT_QUEUE_LIMITS]) { + VSOMEIP_WARNING << "Multiple definitions for " + << endpoint_queue_limits + << " Ignoring definition from " << _element.name_; + } else { + is_configured_[ET_ENDPOINT_QUEUE_LIMITS] = true; + const std::string unicast("unicast"); + const std::string ports("ports"); + const std::string port("port"); + const std::string queue_size_limit("queue-size-limit"); + + for (const auto i : _element.tree_.get_child(endpoint_queue_limits)) { + if (!i.second.get_child_optional(unicast) + || !i.second.get_child_optional(ports)) { + continue; + } + std::string its_unicast(i.second.get_child(unicast).data()); + for (const auto j : i.second.get_child(ports)) { + + if (!j.second.get_child_optional(port) + || !j.second.get_child_optional(queue_size_limit)) { + continue; + } + + std::uint16_t its_port = ILLEGAL_PORT; + std::uint32_t its_queue_size_limit = 0; + + try { + std::string p(j.second.get_child(port).data()); + its_port = static_cast(std::stoul(p.c_str(), + NULL, 10)); + std::string s(j.second.get_child(queue_size_limit).data()); + its_queue_size_limit = static_cast(std::stoul( + s.c_str(), NULL, 10)); + } catch (const std::exception &e) { + VSOMEIP_ERROR << __func__ << ":" << e.what(); + } + + if (its_port == ILLEGAL_PORT || its_queue_size_limit == 0) { + continue; + } + + endpoint_queue_limits_[its_unicast][its_port] = its_queue_size_limit; + } + } + } + } + } catch (...) { + } +} + +void configuration_impl::load_debounce(const element &_element) { + try { + auto its_debounce = _element.tree_.get_child("debounce"); + for (auto i = its_debounce.begin(); i != its_debounce.end(); ++i) { + load_service_debounce(i->second); + } + } catch (...) { + } +} + +void configuration_impl::load_service_debounce( + const boost::property_tree::ptree &_tree) { + service_t its_service(0); + instance_t its_instance(0); + std::map> its_debounces; + + for (auto i = _tree.begin(); i != _tree.end(); ++i) { + std::string its_key(i->first); + std::string its_value(i->second.data()); + std::stringstream its_converter; + + if (its_key == "service") { + if (its_value.size() > 1 && its_value[0] == '0' && its_value[1] == 'x') { + its_converter << std::hex << its_value; + } else { + its_converter << std::dec << its_value; + } + its_converter >> its_service; + } else if (its_key == "instance") { + if (its_value.size() > 1 && its_value[0] == '0' && its_value[1] == 'x') { + its_converter << std::hex << its_value; + } else { + its_converter << std::dec << its_value; + } + its_converter >> its_instance; + } else if (its_key == "events") { + load_events_debounce(i->second, its_debounces); + } + } + + // TODO: Improve error handling! + if (its_service > 0 && its_instance > 0 && !its_debounces.empty()) { + auto find_service = debounces_.find(its_service); + if (find_service != debounces_.end()) { + auto find_instance = find_service->second.find(its_instance); + if (find_instance != find_service->second.end()) { + VSOMEIP_ERROR << "Multiple debounce configurations for service " + << std::hex << std::setw(4) << std::setfill('0') << its_service + << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance; + return; + } + } + debounces_[its_service][its_instance] = its_debounces; + } +} + +void configuration_impl::load_events_debounce( + const boost::property_tree::ptree &_tree, + std::map> &_debounces) { + for (auto i = _tree.begin(); i != _tree.end(); ++i) { + load_event_debounce(i->second, _debounces); + } +} + +void configuration_impl::load_event_debounce( + const boost::property_tree::ptree &_tree, + std::map> &_debounces) { + event_t its_event(0); + std::shared_ptr its_debounce = std::make_shared(); + + for (auto i = _tree.begin(); i != _tree.end(); ++i) { + std::string its_key(i->first); + std::string its_value(i->second.data()); + std::stringstream its_converter; + + if (its_key == "event") { + if (its_value.size() > 1 && its_value[0] == '0' && its_value[1] == 'x') { + its_converter << std::hex << its_value; + } else { + its_converter << std::dec << its_value; + } + its_converter >> its_event; + } else if (its_key == "on_change") { + its_debounce->on_change_ = (its_value == "true"); + } else if (its_key == "on_change_resets_interval") { + its_debounce->on_change_resets_interval_ = (its_value == "true"); + } else if (its_key == "ignore") { + load_event_debounce_ignore(i->second, its_debounce->ignore_); + } else if (its_key == "interval") { + if (its_value == "never") { + its_debounce->interval_ = -1; + } else { + its_converter << std::dec << its_value; + its_converter >> its_debounce->interval_; + } + } + } + + // TODO: Improve error handling + if (its_event > 0) { + auto find_event = _debounces.find(its_event); + if (find_event == _debounces.end()) { + _debounces[its_event] = its_debounce; + } + } +} + +void configuration_impl::load_event_debounce_ignore( + const boost::property_tree::ptree &_tree, + std::map &_ignore) { + std::size_t its_ignored; + byte_t its_mask; + std::stringstream its_converter; + + for (auto i = _tree.begin(); i != _tree.end(); ++i) { + std::string its_value = i->second.data(); + + its_mask = 0xff; + + if (!its_value.empty() + && std::find_if(its_value.begin(), its_value.end(), + [](char _c) { return !std::isdigit(_c); }) + == its_value.end()) { + its_converter.str(""); + its_converter.clear(); + its_converter << std::dec << its_value; + its_converter >> its_ignored; + + } else { + for (auto j = i->second.begin(); j != i->second.end(); ++j) { + std::string its_ignore_key(j->first); + std::string its_ignore_value(j->second.data()); + + if (its_ignore_key == "index") { + its_converter.str(""); + its_converter.clear(); + its_converter << std::dec << its_ignore_value; + its_converter >> its_ignored; + } else if (its_ignore_key == "mask") { + its_converter.str(""); + its_converter.clear(); + + int its_tmp_mask; + its_converter << std::hex << its_ignore_value; + its_converter >> its_tmp_mask; + + its_mask = static_cast(its_tmp_mask); + } + } + } + + _ignore[its_ignored] = its_mask; + } +} + +std::shared_ptr configuration_impl::get_debounce( + service_t _service, instance_t _instance, event_t _event) const { + auto found_service = debounces_.find(_service); + if (found_service != debounces_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto found_event = found_instance->second.find(_event); + if (found_event != found_instance->second.end()) { + return found_event->second; + } + } + } + return nullptr; +} + } // namespace config } // namespace vsomeip diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index b1bccfa..1981bc9 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -37,7 +37,8 @@ public: client_endpoint_impl(std::shared_ptr _host, endpoint_type _local, endpoint_type _remote, boost::asio::io_service &_io, - std::uint32_t _max_message_size); + std::uint32_t _max_message_size, + configuration::endpoint_queue_limit_t _queue_limit); virtual ~client_endpoint_impl(); bool send(const uint8_t *_data, uint32_t _size, bool _flush); @@ -70,8 +71,8 @@ public: protected: virtual void send_queued() = 0; - void shutdown_and_close_socket(); - void shutdown_and_close_socket_unlocked(); + void shutdown_and_close_socket(bool _recreate_socket); + void shutdown_and_close_socket_unlocked(bool _recreate_socket); void start_connect_timer(); mutable std::mutex socket_mutex_; @@ -88,6 +89,7 @@ protected: // send data message_buffer_ptr_t packetizer_; std::deque queue_; + std::size_t queue_size_; std::mutex mutex_; diff --git a/implementation/endpoints/include/endpoint_impl.hpp b/implementation/endpoints/include/endpoint_impl.hpp index cf98ed1..49b9f7f 100644 --- a/implementation/endpoints/include/endpoint_impl.hpp +++ b/implementation/endpoints/include/endpoint_impl.hpp @@ -16,6 +16,7 @@ #include "buffer.hpp" #include "endpoint.hpp" +#include "../../configuration/include/configuration.hpp" namespace vsomeip { @@ -29,7 +30,8 @@ public: endpoint_impl(std::shared_ptr _adapter, endpoint_type _local, boost::asio::io_service &_io, - std::uint32_t _max_message_size); + std::uint32_t _max_message_size, + configuration::endpoint_queue_limit_t _queue_limit); virtual ~endpoint_impl(); void enable_magic_cookies(); @@ -87,6 +89,8 @@ protected: error_handler_t error_handler_; std::mutex error_handler_mutex_; + + const configuration::endpoint_queue_limit_t queue_limit_; }; } // namespace vsomeip diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp index dc56f23..d6ee995 100644 --- a/implementation/endpoints/include/local_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp @@ -34,7 +34,8 @@ public: local_client_endpoint_impl(std::shared_ptr _host, endpoint_type _remote, boost::asio::io_service &_io, - std::uint32_t _max_message_size); + std::uint32_t _max_message_size, + configuration::endpoint_queue_limit_t _queue_limit); virtual ~local_client_endpoint_impl(); diff --git a/implementation/endpoints/include/local_server_endpoint_impl.hpp b/implementation/endpoints/include/local_server_endpoint_impl.hpp index 458d59f..d54cf69 100644 --- a/implementation/endpoints/include/local_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_server_endpoint_impl.hpp @@ -42,14 +42,16 @@ public: endpoint_type _local, boost::asio::io_service &_io, std::uint32_t _max_message_size, - std::uint32_t _buffer_shrink_threshold); + std::uint32_t _buffer_shrink_threshold, + configuration::endpoint_queue_limit_t _queue_limit); local_server_endpoint_impl(std::shared_ptr _host, endpoint_type _local, boost::asio::io_service &_io, std::uint32_t _max_message_size, int native_socket, - std::uint32_t _buffer_shrink_threshold); + std::uint32_t _buffer_shrink_threshold, + configuration::endpoint_queue_limit_t _queue_limit); virtual ~local_server_endpoint_impl(); diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp index 715c9b8..ae89508 100644 --- a/implementation/endpoints/include/server_endpoint_impl.hpp +++ b/implementation/endpoints/include/server_endpoint_impl.hpp @@ -27,12 +27,13 @@ class server_endpoint_impl: public endpoint_impl, public: typedef typename Protocol::socket socket_type; typedef typename Protocol::endpoint endpoint_type; - typedef typename std::map > queue_type; + typedef typename std::map>> queue_type; typedef typename queue_type::iterator queue_iterator_type; server_endpoint_impl(std::shared_ptr _host, endpoint_type _local, boost::asio::io_service &_io, - std::uint32_t _max_message_size); + std::uint32_t _max_message_size, + configuration::endpoint_queue_limit_t _queue_limit); virtual ~server_endpoint_impl(); bool is_client() const; diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp index 801c98c..1c77664 100644 --- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp @@ -26,7 +26,8 @@ public: boost::asio::io_service &_io, std::uint32_t _max_message_size, std::uint32_t buffer_shrink_threshold, - std::chrono::milliseconds _send_timeout); + std::chrono::milliseconds _send_timeout, + configuration::endpoint_queue_limit_t _queue_limit); virtual ~tcp_client_endpoint_impl(); void start(); diff --git a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp index ab6fdb9..4318c1b 100644 --- a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp @@ -31,7 +31,8 @@ public: boost::asio::io_service &_io, std::uint32_t _max_message_size, std::uint32_t _buffer_shrink_threshold, - std::chrono::milliseconds _send_timeout); + std::chrono::milliseconds _send_timeout, + configuration::endpoint_queue_limit_t _queue_limit); virtual ~tcp_server_endpoint_impl(); void start(); @@ -88,7 +89,7 @@ private: bool _magic_cookies_enabled, boost::asio::io_service & _io_service, std::chrono::milliseconds _send_timeout); - void send_magic_cookie(message_buffer_ptr_t &_buffer); + bool send_magic_cookie(message_buffer_ptr_t &_buffer); bool is_magic_cookie(size_t _offset) const; void receive_cbk(boost::system::error_code const &_error, std::size_t _bytes); diff --git a/implementation/endpoints/include/udp_client_endpoint_impl.hpp b/implementation/endpoints/include/udp_client_endpoint_impl.hpp index b83e2f1..a1d8761 100644 --- a/implementation/endpoints/include/udp_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/udp_client_endpoint_impl.hpp @@ -29,7 +29,8 @@ public: udp_client_endpoint_impl(std::shared_ptr _host, endpoint_type _local, endpoint_type _remote, - boost::asio::io_service &_io); + boost::asio::io_service &_io, + configuration::endpoint_queue_limit_t _queue_limit); virtual ~udp_client_endpoint_impl(); void start(); diff --git a/implementation/endpoints/include/udp_server_endpoint_impl.hpp b/implementation/endpoints/include/udp_server_endpoint_impl.hpp index bed7c0c..6dcb537 100644 --- a/implementation/endpoints/include/udp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/udp_server_endpoint_impl.hpp @@ -25,7 +25,8 @@ class udp_server_endpoint_impl: public udp_server_endpoint_base_impl { public: udp_server_endpoint_impl(std::shared_ptr _host, endpoint_type _local, - boost::asio::io_service &_io); + boost::asio::io_service &_io, + configuration::endpoint_queue_limit_t _queue_limit); virtual ~udp_server_endpoint_impl(); void start(); diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index 3783add..2fb9fa9 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -21,6 +21,7 @@ #include "../../configuration/include/internal.hpp" #include "../../logging/include/logger.hpp" #include "../../utility/include/utility.hpp" +#include "../../utility/include/byteorder.hpp" namespace vsomeip { @@ -30,13 +31,15 @@ client_endpoint_impl::client_endpoint_impl( endpoint_type _local, endpoint_type _remote, boost::asio::io_service &_io, - std::uint32_t _max_message_size) - : endpoint_impl(_host, _local, _io, _max_message_size), + std::uint32_t _max_message_size, + configuration::endpoint_queue_limit_t _queue_limit) + : endpoint_impl(_host, _local, _io, _max_message_size, _queue_limit), socket_(new socket_type(_io)), remote_(_remote), flush_timer_(_io), connect_timer_(_io), connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable is_connected_(false), packetizer_(std::make_shared()), + queue_size_(0), was_not_connected_(false), local_port_(0) { } @@ -62,6 +65,7 @@ void client_endpoint_impl::stop() { endpoint_impl::sending_blocked_ = true; // delete unsent messages queue_.clear(); + queue_size_ = 0; } { std::lock_guard its_lock(connect_timer_mutex_); @@ -69,7 +73,7 @@ void client_endpoint_impl::stop() { connect_timer_.cancel(ec); } connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; - shutdown_and_close_socket(); + shutdown_and_close_socket(false); } template @@ -118,14 +122,50 @@ bool client_endpoint_impl::send(const uint8_t *_data, if (packetizer_->size() + _size > endpoint_impl::max_message_size_ && !packetizer_->empty()) { queue_.push_back(packetizer_); + queue_size_ += packetizer_->size(); packetizer_ = std::make_shared(); } + if (endpoint_impl::queue_limit_ != QUEUE_SIZE_UNLIMITED + && queue_size_ + _size > endpoint_impl::queue_limit_) { + service_t its_service(0); + method_t its_method(0); + client_t its_client(0); + session_t its_session(0); + if (_size >= VSOMEIP_SESSION_POS_MAX) { + // this will yield wrong IDs for local communication as the commands + // are prepended to the actual payload + // it will print: + // (lowbyte service ID + highbyte methoid) + // [(Command + lowerbyte sender's client ID). + // highbyte sender's client ID + lowbyte command size. + // lowbyte methodid + highbyte vsomeipd length] + its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN], + _data[VSOMEIP_SERVICE_POS_MAX]); + its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], + _data[VSOMEIP_METHOD_POS_MAX]); + its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN], + _data[VSOMEIP_CLIENT_POS_MAX]); + its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN], + _data[VSOMEIP_SESSION_POS_MAX]); + } + VSOMEIP_ERROR << "cei::send: queue size limit (" << std::dec + << endpoint_impl::queue_limit_ + << ") reached. Dropping message (" + << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "." + << std::hex << std::setw(4) << std::setfill('0') << its_session << "] " + << "queue_size: " << std::dec << queue_size_ + << " data size: " << std::dec << _size; + return false; + } packetizer_->insert(packetizer_->end(), _data, _data + _size); if (_flush) { flush_timer_.cancel(); queue_.push_back(packetizer_); + queue_size_ += packetizer_->size(); packetizer_ = std::make_shared(); } else { flush_timer_.expires_from_now( @@ -152,6 +192,7 @@ bool client_endpoint_impl::flush() { std::lock_guard its_lock(mutex_); if (!packetizer_->empty()) { queue_.push_back(packetizer_); + queue_size_ += packetizer_->size(); packetizer_ = std::make_shared(); if (queue_.size() == 1) { // no writing in progress send_queued(); @@ -168,13 +209,13 @@ void client_endpoint_impl::connect_cbk( boost::system::error_code const &_error) { if (_error == boost::asio::error::operation_aborted) { // endpoint was stopped - shutdown_and_close_socket(); + shutdown_and_close_socket(false); return; } std::shared_ptr its_host = this->host_.lock(); if (its_host) { if (_error && _error != boost::asio::error::already_connected) { - shutdown_and_close_socket(); + shutdown_and_close_socket(true); start_connect_timer(); // Double the timeout as long as the maximum allowed is larger if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT) @@ -224,33 +265,42 @@ void client_endpoint_impl::send_cbk( if (!_error) { std::lock_guard its_lock(mutex_); if (queue_.size() > 0) { + queue_size_ -= queue_.front()->size(); queue_.pop_front(); send_queued(); } } else if (_error == boost::asio::error::broken_pipe) { is_connected_ = false; + bool stopping(false); { std::lock_guard its_lock(mutex_); - if (endpoint_impl::sending_blocked_) { + stopping = endpoint_impl::sending_blocked_; + if (stopping) { queue_.clear(); + queue_size_ = 0; } else { VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message() << " (" << std::dec << _error.value() << ") " << std::dec << queue_.size(); } } - shutdown_and_close_socket(); + if (!stopping) { + print_status(); + } + shutdown_and_close_socket(true); connect(); } else if (_error == boost::asio::error::not_connected || _error == boost::asio::error::bad_descriptor) { was_not_connected_ = true; + shutdown_and_close_socket(true); connect(); } else if (_error == boost::asio::error::operation_aborted) { // endpoint was stopped - shutdown_and_close_socket(); + shutdown_and_close_socket(false); } else { VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message() << " (" << std::dec << _error.value() << ")" ; + print_status(); } } @@ -263,19 +313,22 @@ void client_endpoint_impl::flush_cbk( } template -void client_endpoint_impl::shutdown_and_close_socket() { +void client_endpoint_impl::shutdown_and_close_socket(bool _recreate_socket) { std::lock_guard its_lock(socket_mutex_); - shutdown_and_close_socket_unlocked(); + shutdown_and_close_socket_unlocked(_recreate_socket); } template -void client_endpoint_impl::shutdown_and_close_socket_unlocked() { +void client_endpoint_impl::shutdown_and_close_socket_unlocked(bool _recreate_socket) { local_port_ = 0; if (socket_->is_open()) { boost::system::error_code its_error; socket_->shutdown(Protocol::socket::shutdown_both, its_error); socket_->close(its_error); } + if (_recreate_socket) { + socket_.reset(new socket_type(endpoint_impl::service_)); + } } template diff --git a/implementation/endpoints/src/endpoint_impl.cpp b/implementation/endpoints/src/endpoint_impl.cpp index cf35a39..9cde3e9 100644 --- a/implementation/endpoints/src/endpoint_impl.cpp +++ b/implementation/endpoints/src/endpoint_impl.cpp @@ -22,7 +22,8 @@ endpoint_impl::endpoint_impl( std::shared_ptr _host, endpoint_type _local, boost::asio::io_service &_io, - std::uint32_t _max_message_size) + std::uint32_t _max_message_size, + configuration::endpoint_queue_limit_t _queue_limit) : service_(_io), host_(_host), is_supporting_magic_cookies_(false), @@ -30,7 +31,8 @@ endpoint_impl::endpoint_impl( max_message_size_(_max_message_size), use_count_(0), sending_blocked_(false), - local_(_local) { + local_(_local), + queue_limit_(_queue_limit) { } template diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index 64b6314..cc444a9 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -27,8 +27,10 @@ local_client_endpoint_impl::local_client_endpoint_impl( std::shared_ptr< endpoint_host > _host, endpoint_type _remote, boost::asio::io_service &_io, - std::uint32_t _max_message_size) - : local_client_endpoint_base_impl(_host, _remote, _remote, _io, _max_message_size), + std::uint32_t _max_message_size, + configuration::endpoint_queue_limit_t _queue_limit) + : local_client_endpoint_base_impl(_host, _remote, _remote, _io, + _max_message_size, _queue_limit), // Using _remote for the local(!) endpoint is ok, // because we have no bind for local endpoints! recv_buffer_(1,0) { @@ -49,11 +51,11 @@ void local_client_endpoint_impl::restart() { std::lock_guard its_lock(mutex_); sending_blocked_ = false; queue_.clear(); + queue_size_ = 0; } { std::lock_guard its_lock(socket_mutex_); - shutdown_and_close_socket_unlocked(); - socket_.reset(new socket_type(service_)); + shutdown_and_close_socket_unlocked(true); } start_connect_timer(); } @@ -95,7 +97,7 @@ void local_client_endpoint_impl::stop() { } } } - shutdown_and_close_socket(); + shutdown_and_close_socket(false); } void local_client_endpoint_impl::connect() { @@ -266,9 +268,7 @@ void local_client_endpoint_impl::print_status() { { std::lock_guard its_lock(mutex_); its_queue_size = queue_.size(); - for (const auto &m : queue_) { - its_data_size += m->size(); - } + its_data_size = queue_size_; } VSOMEIP_INFO << "status lce: " << its_path << " queue: " diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp index cf6d8eb..c558661 100644 --- a/implementation/endpoints/src/local_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp @@ -29,8 +29,10 @@ local_server_endpoint_impl::local_server_endpoint_impl( std::shared_ptr< endpoint_host > _host, endpoint_type _local, boost::asio::io_service &_io, std::uint32_t _max_message_size, - std::uint32_t _buffer_shrink_threshold) - : local_server_endpoint_base_impl(_host, _local, _io, _max_message_size), + std::uint32_t _buffer_shrink_threshold, + configuration::endpoint_queue_limit_t _queue_limit) + : local_server_endpoint_base_impl(_host, _local, _io, + _max_message_size, _queue_limit), acceptor_(_io), buffer_shrink_threshold_(_buffer_shrink_threshold) { is_supporting_magic_cookies_ = false; @@ -57,8 +59,10 @@ local_server_endpoint_impl::local_server_endpoint_impl( endpoint_type _local, boost::asio::io_service &_io, std::uint32_t _max_message_size, int native_socket, - std::uint32_t _buffer_shrink_threshold) - : local_server_endpoint_base_impl(_host, _local, _io, _max_message_size), + std::uint32_t _buffer_shrink_threshold, + configuration::endpoint_queue_limit_t _queue_limit) + : local_server_endpoint_base_impl(_host, _local, _io, + _max_message_size, _queue_limit), acceptor_(_io), buffer_shrink_threshold_(_buffer_shrink_threshold) { is_supporting_magic_cookies_ = false; @@ -143,6 +147,7 @@ void local_server_endpoint_impl::send_queued( auto connection_iterator = connections_.find(_queue_iterator->first); if (connection_iterator != connections_.end()) { connection_iterator->second->send_queued(_queue_iterator); + its_connection = connection_iterator->second; } else { VSOMEIP_INFO << "Didn't find connection: " #ifdef _WIN32 @@ -152,7 +157,7 @@ void local_server_endpoint_impl::send_queued( << _queue_iterator->first.path() #endif << " dropping outstanding messages (" << std::dec - << _queue_iterator->second.size() << ")."; + << _queue_iterator->second.second.size() << ")."; queues_.erase(_queue_iterator->first); } } @@ -348,7 +353,7 @@ void local_server_endpoint_impl::connection::send_queued( return; } - message_buffer_ptr_t its_buffer = _queue_iterator->second.front(); + message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front(); #if 0 std::stringstream msg; msg << "lse::sq: "; @@ -664,10 +669,8 @@ void local_server_endpoint_impl::print_status() { } auto found_queue = queues_.find(c.first); if (found_queue != queues_.end()) { - its_queue_size = found_queue->second.size(); - for (const auto &m : found_queue->second) { - its_data_size += m->size(); - } + its_queue_size = found_queue->second.second.size(); + its_data_size = found_queue->second.first; } VSOMEIP_INFO << "status lse: client: " << its_remote_path << " queue: " << std::dec << its_queue_size diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp index 8ffbfdb..17e26df 100644 --- a/implementation/endpoints/src/server_endpoint_impl.cpp +++ b/implementation/endpoints/src/server_endpoint_impl.cpp @@ -25,8 +25,10 @@ namespace vsomeip { template server_endpoint_impl::server_endpoint_impl( std::shared_ptr _host, endpoint_type _local, - boost::asio::io_service &_io, std::uint32_t _max_message_size) - : endpoint_impl(_host, _local, _io, _max_message_size), + boost::asio::io_service &_io, std::uint32_t _max_message_size, + configuration::endpoint_queue_limit_t _queue_limit) + : endpoint_impl(_host, _local, _io, _max_message_size, + _queue_limit), flush_timer_(_io) { } @@ -140,25 +142,65 @@ bool server_endpoint_impl::send_intern( target_queue_iterator = queues_.insert(queues_.begin(), std::make_pair( _target, - std::deque() + std::make_pair(std::size_t(0), + std::deque()) )); } // TODO compare against value from configuration here - const bool queue_size_zero_on_entry(target_queue_iterator->second.empty()); + const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty()); if (target_packetizer->size() + _size > endpoint_impl::max_message_size_ && !target_packetizer->empty()) { - target_queue_iterator->second.push_back(target_packetizer); + target_queue_iterator->second.second.push_back(target_packetizer); + target_queue_iterator->second.first += target_packetizer->size(); target_packetizer = std::make_shared(); packetizer_[_target] = target_packetizer; } + if (endpoint_impl::queue_limit_ != QUEUE_SIZE_UNLIMITED + && target_queue_iterator->second.first + _size > + endpoint_impl::queue_limit_) { + service_t its_service(0); + method_t its_method(0); + client_t its_client(0); + session_t its_session(0); + if (_size >= VSOMEIP_SESSION_POS_MAX) { + // this will yield wrong IDs for local communication as the commands + // are prepended to the actual payload + // it will print: + // (lowbyte service ID + highbyte methoid) + // [(Command + lowerbyte sender's client ID). + // highbyte sender's client ID + lowbyte command size. + // lowbyte methodid + highbyte vsomeipd length] + its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN], + _data[VSOMEIP_SERVICE_POS_MAX]); + its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], + _data[VSOMEIP_METHOD_POS_MAX]); + its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN], + _data[VSOMEIP_CLIENT_POS_MAX]); + its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN], + _data[VSOMEIP_SESSION_POS_MAX]); + } + VSOMEIP_ERROR << "sei::send_intern: queue size limit (" << std::dec + << endpoint_impl::queue_limit_ + << ") reached. Dropping message (" + << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "." + << std::hex << std::setw(4) << std::setfill('0') << its_session << "]" + << " queue_size: " << std::dec << target_queue_iterator->second.first + << " data size: " << std::dec << _size; + return false; + } + + target_packetizer->insert(target_packetizer->end(), _data, _data + _size); if (_flush) { flush_timer_.cancel(); - target_queue_iterator->second.push_back(target_packetizer); + target_queue_iterator->second.second.push_back(target_packetizer); + target_queue_iterator->second.first += target_packetizer->size(); packetizer_[_target] = std::make_shared(); } else { std::chrono::milliseconds flush_timeout(VSOMEIP_DEFAULT_FLUSH_TIMEOUT); @@ -170,7 +212,7 @@ bool server_endpoint_impl::send_intern( std::placeholders::_1)); } - if (queue_size_zero_on_entry && !target_queue_iterator->second.empty()) { // no writing in progress + if (queue_size_zero_on_entry && !target_queue_iterator->second.second.empty()) { // no writing in progress send_queued(target_queue_iterator); } @@ -183,7 +225,7 @@ bool server_endpoint_impl::flush( bool is_flushed = false; std::lock_guard its_lock(mutex_); auto queue_iterator = queues_.find(_target); - if (queue_iterator != queues_.end() && !queue_iterator->second.empty()) { + if (queue_iterator != queues_.end() && !queue_iterator->second.second.empty()) { send_queued(queue_iterator); is_flushed = true; } @@ -205,8 +247,10 @@ void server_endpoint_impl::send_cbk( std::lock_guard its_lock(mutex_); if (!_error) { - _queue_iterator->second.pop_front(); - if (_queue_iterator->second.size() > 0) { + _queue_iterator->second.first -= + _queue_iterator->second.second.front()->size(); + _queue_iterator->second.second.pop_front(); + if (_queue_iterator->second.second.size() > 0) { send_queued(_queue_iterator); } } else { diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index 2646f01..f1d2efc 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -29,8 +29,10 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl( boost::asio::io_service &_io, std::uint32_t _max_message_size, std::uint32_t _buffer_shrink_threshold, - std::chrono::milliseconds _send_timeout) - : tcp_client_endpoint_base_impl(_host, _local, _remote, _io, _max_message_size), + std::chrono::milliseconds _send_timeout, + configuration::endpoint_queue_limit_t _queue_limit) + : tcp_client_endpoint_base_impl(_host, _local, _remote, _io, + _max_message_size, _queue_limit), recv_buffer_size_initial_(VSOMEIP_SOMEIP_HEADER_SIZE), recv_buffer_(recv_buffer_size_initial_, 0), recv_buffer_size_(0), @@ -64,11 +66,10 @@ void tcp_client_endpoint_impl::restart() { is_connected_ = false; { std::lock_guard its_lock(socket_mutex_); - shutdown_and_close_socket_unlocked(); + shutdown_and_close_socket_unlocked(true); recv_buffer_size_ = 0; recv_buffer_.resize(recv_buffer_size_initial_, 0x0); recv_buffer_.shrink_to_fit(); - socket_.reset(new socket_type(service_)); } { std::lock_guard its_lock(mutex_); @@ -94,6 +95,7 @@ void tcp_client_endpoint_impl::restart() { << " size: " << std::dec << m->size(); } queue_.clear(); + queue_size_ = 0; } start_connect_timer(); } @@ -348,6 +350,7 @@ void tcp_client_endpoint_impl::send_magic_cookie(message_buffer_ptr_t &_buffer) CLIENT_COOKIE, CLIENT_COOKIE + sizeof(CLIENT_COOKIE) ); + queue_size_ += sizeof(CLIENT_COOKIE); } else { VSOMEIP_WARNING << "Packet full. Cannot insert magic cookie!"; } @@ -496,10 +499,11 @@ void tcp_client_endpoint_impl::receive_cbk( if (_error == boost::asio::error::connection_reset || _error == boost::asio::error::eof || _error == boost::asio::error::timed_out) { - if(_error == boost::asio::error::timed_out) { - VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: " << _error.message(); - } - shutdown_and_close_socket_unlocked(); + VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: " << _error.message() + << " local: " << get_address_port_local() + << " remote: " << get_address_port_remote(); + is_connected_ = false; + shutdown_and_close_socket_unlocked(false); } else { its_lock.unlock(); receive(); @@ -594,9 +598,7 @@ void tcp_client_endpoint_impl::print_status() { { std::lock_guard its_lock(mutex_); its_queue_size = queue_.size(); - for (const auto &m : queue_) { - its_data_size += m->size(); - } + its_data_size = queue_size_; } std::string local; { diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index 37e3554..5a061e1 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -25,8 +25,10 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl( std::shared_ptr _host, endpoint_type _local, boost::asio::io_service &_io, std::uint32_t _max_message_size, std::uint32_t _buffer_shrink_threshold, - std::chrono::milliseconds _send_timeout) - : tcp_server_endpoint_base_impl(_host, _local, _io, _max_message_size), + std::chrono::milliseconds _send_timeout, + configuration::endpoint_queue_limit_t _queue_limit) + : tcp_server_endpoint_base_impl(_host, _local, _io, + _max_message_size, _queue_limit), acceptor_(_io), buffer_shrink_threshold_(_buffer_shrink_threshold), local_port_(_local.port()), @@ -110,7 +112,7 @@ void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iter << _queue_iterator->first.address().to_string() << ":" << std::dec << static_cast(_queue_iterator->first.port()) << " dropping outstanding messages (" << std::dec - << _queue_iterator->second.size() << ")."; + << _queue_iterator->second.second.size() << ")."; queues_.erase(_queue_iterator->first); } } @@ -310,7 +312,7 @@ void tcp_server_endpoint_impl::connection::send_queued( " couldn't lock server_"; return; } - message_buffer_ptr_t its_buffer = _queue_iterator->second.front(); + message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front(); const service_t its_service = VSOMEIP_BYTES_TO_WORD( (*its_buffer)[VSOMEIP_SERVICE_POS_MIN], (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]); @@ -328,8 +330,10 @@ void tcp_server_endpoint_impl::connection::send_queued( std::chrono::steady_clock::now(); if (std::chrono::duration_cast( now - last_cookie_sent_) > std::chrono::milliseconds(10000)) { - send_magic_cookie(its_buffer); - last_cookie_sent_ = now; + if (send_magic_cookie(its_buffer)) { + last_cookie_sent_ = now; + _queue_iterator->second.first += sizeof(SERVICE_COOKIE); + } } } @@ -351,14 +355,16 @@ void tcp_server_endpoint_impl::connection::send_queued( } } -void tcp_server_endpoint_impl::connection::send_magic_cookie( +bool tcp_server_endpoint_impl::connection::send_magic_cookie( message_buffer_ptr_t &_buffer) { if (max_message_size_ == MESSAGE_SIZE_UNLIMITED || max_message_size_ - _buffer->size() >= VSOMEIP_SOMEIP_HEADER_SIZE + VSOMEIP_SOMEIP_MAGIC_COOKIE_SIZE) { _buffer->insert(_buffer->begin(), SERVICE_COOKIE, SERVICE_COOKIE + sizeof(SERVICE_COOKIE)); + return true; } + return false; } bool tcp_server_endpoint_impl::connection::is_magic_cookie(size_t _offset) const { @@ -553,7 +559,10 @@ void tcp_server_endpoint_impl::connection::receive_cbk( || _error == boost::asio::error::connection_reset || _error == boost::asio::error::timed_out) { if(_error == boost::asio::error::timed_out) { - VSOMEIP_WARNING << "tcp_server_endpoint receive_cbk: " << _error.message(); + std::lock_guard its_lock(socket_mutex_); + VSOMEIP_WARNING << "tcp_server_endpoint receive_cbk: " << _error.message() + << " local: " << get_address_port_local() + << " remote: " << get_address_port_remote(); } { std::lock_guard its_lock(its_server->connections_mutex_); @@ -746,10 +755,8 @@ void tcp_server_endpoint_impl::print_status() { } auto found_queue = queues_.find(c.first); if (found_queue != queues_.end()) { - its_queue_size = found_queue->second.size(); - for (const auto &m : found_queue->second) { - its_data_size += m->size(); - } + its_queue_size = found_queue->second.second.size(); + its_data_size = found_queue->second.first; } VSOMEIP_INFO << "status tse: client: " << c.second->get_address_port_remote() diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index 28a4888..c8ca0e0 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -19,9 +19,10 @@ udp_client_endpoint_impl::udp_client_endpoint_impl( std::shared_ptr< endpoint_host > _host, endpoint_type _local, endpoint_type _remote, - boost::asio::io_service &_io) + boost::asio::io_service &_io, + configuration::endpoint_queue_limit_t _queue_limit) : udp_client_endpoint_base_impl(_host, _local, _remote, _io, - VSOMEIP_MAX_UDP_MESSAGE_SIZE), + VSOMEIP_MAX_UDP_MESSAGE_SIZE, _queue_limit), recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0), remote_address_(_remote.address()), remote_port_(_remote.port()) { @@ -81,7 +82,7 @@ void udp_client_endpoint_impl::restart() { std::lock_guard its_lock(mutex_); queue_.clear(); } - shutdown_and_close_socket(); + shutdown_and_close_socket(false); start_connect_timer(); } @@ -215,7 +216,7 @@ void udp_client_endpoint_impl::receive_cbk( receive(); } else { if (_error == boost::asio::error::connection_refused) { - shutdown_and_close_socket(); + shutdown_and_close_socket(false); } else { receive(); } @@ -256,9 +257,7 @@ void udp_client_endpoint_impl::print_status() { { std::lock_guard its_lock(mutex_); its_queue_size = queue_.size(); - for (const auto &m : queue_) { - its_data_size += m->size(); - } + its_data_size = queue_size_; } std::string local; { diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index b712266..92990f1 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -24,9 +24,10 @@ namespace vsomeip { udp_server_endpoint_impl::udp_server_endpoint_impl( std::shared_ptr< endpoint_host > _host, endpoint_type _local, - boost::asio::io_service &_io) + boost::asio::io_service &_io, + configuration::endpoint_queue_limit_t _queue_limit) : server_endpoint_impl( - _host, _local, _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE), + _host, _local, _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE, _queue_limit), socket_(_io, _local.protocol()), joined_group_(false), recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0), @@ -121,7 +122,7 @@ bool udp_server_endpoint_impl::send_to( void udp_server_endpoint_impl::send_queued( const queue_iterator_type _queue_iterator) { - message_buffer_ptr_t its_buffer = _queue_iterator->second.front(); + message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front(); #if 0 std::stringstream msg; msg << "usei::sq(" << _queue_iterator->first.address().to_string() << ":" @@ -405,10 +406,9 @@ void udp_server_endpoint_impl::print_status() { for (const auto &c : queues_) { std::size_t its_data_size(0); std::size_t its_queue_size(0); - its_queue_size = c.second.size(); - for (const auto &m : c.second) { - its_data_size += m->size(); - } + its_queue_size = c.second.second.size(); + its_data_size = c.second.first; + boost::system::error_code ec; VSOMEIP_INFO << "status use: client: " << c.first.address().to_string(ec) << ":" diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp index 164f922..ac483ab 100644 --- a/implementation/routing/include/event.hpp +++ b/implementation/routing/include/event.hpp @@ -53,7 +53,7 @@ public: const std::shared_ptr _target, bool _force, bool _flush); - void set_payload_dont_notify(const std::shared_ptr &_payload); + bool set_payload_dont_notify(const std::shared_ptr &_payload); void set_payload(const std::shared_ptr &_payload, bool _force, bool _flush); diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp index dbdf2b9..ad6810b 100644 --- a/implementation/routing/include/routing_manager_impl.hpp +++ b/implementation/routing/include/routing_manager_impl.hpp @@ -160,7 +160,7 @@ public: client_t _bound_client, const boost::asio::ip::address &_remote_address, std::uint16_t _remote_port); - void on_message(service_t _service, instance_t _instance, + bool on_message(service_t _service, instance_t _instance, const byte_t *_data, length_t _size, bool _reliable, bool _is_valid_crc = true); void on_notification(client_t _client, service_t _service, instance_t _instance, const byte_t *_data, length_t _size, diff --git a/implementation/routing/include/routing_manager_stub_host.hpp b/implementation/routing/include/routing_manager_stub_host.hpp index 065be80..5497d6e 100644 --- a/implementation/routing/include/routing_manager_stub_host.hpp +++ b/implementation/routing/include/routing_manager_stub_host.hpp @@ -56,7 +56,7 @@ public: virtual void unsubscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0; - virtual void on_message(service_t _service, instance_t _instance, + virtual bool on_message(service_t _service, instance_t _instance, const byte_t *_data, length_t _size, bool _reliable, bool _is_valid_crc = true) = 0; virtual void on_notification(client_t _client, diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp index e3dde2c..ad228de 100644 --- a/implementation/routing/src/event.cpp +++ b/implementation/routing/src/event.cpp @@ -92,16 +92,19 @@ const std::shared_ptr event::get_payload() const { return (message_->get_payload()); } -void event::set_payload_dont_notify(const std::shared_ptr &_payload) { +bool event::set_payload_dont_notify(const std::shared_ptr &_payload) { std::lock_guard its_lock(mutex_); - if(is_cache_placeholder_) { + if (is_cache_placeholder_) { reset_payload(_payload); is_set_ = true; } else { if (set_payload_helper(_payload, false)) { reset_payload(_payload); + } else { + return false; } } + return true; } void event::set_payload(const std::shared_ptr &_payload, diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index 1d0c795..0e8ce5d 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -238,6 +238,100 @@ void routing_manager_base::register_event(client_t _client, service_t _service, its_event->set_eventgroups(_eventgroups); } + if (_is_shadow && !_epsilon_change_func) { + std::shared_ptr its_debounce + = configuration_->get_debounce(_service, _instance, _event); + if (its_debounce) { + VSOMEIP_WARNING << "Using debounce configuration for " + << " SOME/IP event " + << std::hex << std::setw(4) << std::setfill('0') + << _service << "." + << std::hex << std::setw(4) << std::setfill('0') + << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') + << _event << "."; + std::stringstream its_debounce_parameters; + its_debounce_parameters << "(on_change=" + << (its_debounce->on_change_ ? "true" : "false") + << ", ignore=[ "; + for (auto i : its_debounce->ignore_) + its_debounce_parameters << "(" << std::dec << i.first + << ", " << std::hex << (int)i.second << ") "; + its_debounce_parameters << "], interval=" + << std::dec << its_debounce->interval_ << ")"; + VSOMEIP_WARNING << "Debounce parameters: " + << its_debounce_parameters.str(); + _epsilon_change_func = [its_debounce]( + const std::shared_ptr &_old, + const std::shared_ptr &_new) { + bool is_changed(false), is_elapsed(false); + + // Check whether we should forward because of changed data + if (its_debounce->on_change_) { + length_t its_min_length, its_max_length; + + if (_old->get_length() < _new->get_length()) { + its_min_length = _old->get_length(); + its_max_length = _new->get_length(); + } else { + its_min_length = _new->get_length(); + its_max_length = _old->get_length(); + } + + // Check whether all additional bytes (if any) are excluded + for (length_t i = its_min_length; i < its_max_length; i++) { + auto j = its_debounce->ignore_.find(i); + if (j == its_debounce->ignore_.end() && j->second == 0xFF) { + is_changed = true; + break; + } + } + + if (!is_changed) { + const byte_t *its_old = _old->get_data(); + const byte_t *its_new = _new->get_data(); + for (length_t i = 0; i < its_min_length; i++) { + auto j = its_debounce->ignore_.find(i); + if (j == its_debounce->ignore_.end()) { + if (its_old[i] != its_new[i]) { + is_changed = true; + break; + } + } else if (j->second != 0xFF) { + if ((its_old[i] & ~(j->second)) != (its_new[i] & ~(j->second))) { + is_changed = true; + break; + } + } + } + } + } + + if (its_debounce->interval_ > -1) { + // Check whether we should forward because of the elapsed time since + // we did last time + std::chrono::steady_clock::time_point its_current + = std::chrono::steady_clock::now(); + + long elapsed = std::chrono::duration_cast( + its_current - its_debounce->last_forwarded_).count(); + is_elapsed = (its_debounce->last_forwarded_ == (std::chrono::steady_clock::time_point::max)() + || elapsed >= its_debounce->interval_); + if (is_elapsed || (is_changed && its_debounce->on_change_resets_interval_)) + its_debounce->last_forwarded_ = its_current; + } + return (is_changed || is_elapsed); + }; + } else { + _epsilon_change_func = [](const std::shared_ptr &_old, + const std::shared_ptr &_new) { + (void)_old; + (void)_new; + return true; + }; + } + } + its_event->set_epsilon_change_function(_epsilon_change_func); its_event->set_change_resets_cycle(_change_resets_cycle); its_event->set_update_cycle(_cycle); @@ -670,7 +764,8 @@ std::shared_ptr routing_manager_base::create_local_unlocked(client_t _ #else boost::asio::local::stream_protocol::endpoint(its_path.str()) #endif - , io_, configuration_->get_max_message_size_local()); + , io_, configuration_->get_max_message_size_local(), + configuration_->get_endpoint_queue_limit_local()); // Messages sent to the VSOMEIP_ROUTING_CLIENT are meant to be routed to // external devices. Therefore, its local endpoint must not be found by diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 88aaca3..6af17a4 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -1029,7 +1029,9 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, method_t its_method; bool its_is_crc_valid(true); instance_t its_instance(0x0); - +#ifdef USE_DLT + bool is_forwarded(true); +#endif if (_size >= VSOMEIP_SOMEIP_HEADER_SIZE) { its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); @@ -1121,30 +1123,36 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, its_data[VSOMEIP_CLIENT_POS_MAX] = 0x0; } // Common way of message handling +#ifdef USE_DLT + is_forwarded = +#endif on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(), its_is_crc_valid); + } } } #ifdef USE_DLT - const uint16_t its_data_size - = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); - - tc::trace_header its_header; - const boost::asio::ip::address_v4 its_remote_address = - _remote_address.is_v4() ? _remote_address.to_v4() : - boost::asio::ip::address_v4::from_string("6.6.6.6"); - tc::protocol_e its_protocol = - _receiver->is_local() ? tc::protocol_e::local : - _receiver->is_reliable() ? tc::protocol_e::tcp : + if (is_forwarded) { + const uint16_t its_data_size + = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); + + tc::trace_header its_header; + const boost::asio::ip::address_v4 its_remote_address = + _remote_address.is_v4() ? _remote_address.to_v4() : + boost::asio::ip::address_v4::from_string("6.6.6.6"); + tc::protocol_e its_protocol = + _receiver->is_local() ? tc::protocol_e::local : + _receiver->is_reliable() ? tc::protocol_e::tcp : tc::protocol_e::udp; - its_header.prepare(its_remote_address, _remote_port, its_protocol, false, - its_instance); - tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, - its_data_size); + its_header.prepare(its_remote_address, _remote_port, its_protocol, false, + its_instance); + tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data, + its_data_size); + } #endif } -void routing_manager_impl::on_message( +bool routing_manager_impl::on_message( service_t _service, instance_t _instance, const byte_t *_data, length_t _size, bool _reliable, bool _is_valid_crc) { @@ -1158,6 +1166,7 @@ void routing_manager_impl::on_message( VSOMEIP_INFO << msg.str(); #endif client_t its_client; + bool is_forwarded(true); if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) { its_client = find_local_client(_service, _instance); @@ -1169,12 +1178,13 @@ void routing_manager_impl::on_message( if (its_client == VSOMEIP_ROUTING_CLIENT && utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) { - deliver_notification(_service, _instance, _data, _size, _reliable, _is_valid_crc); + is_forwarded = deliver_notification(_service, _instance, _data, _size, _reliable, _is_valid_crc); } else if (its_client == host_->get_client()) { deliver_message(_data, _size, _instance, _reliable, _is_valid_crc); } else { send(its_client, _data, _size, _instance, true, _reliable, _is_valid_crc); //send to proxy } + return is_forwarded; } void routing_manager_impl::on_notification(client_t _client, @@ -1515,21 +1525,20 @@ bool routing_manager_impl::deliver_notification( service_t _service, instance_t _instance, const byte_t *_data, length_t _length, bool _reliable, bool _is_valid_crc) { - bool is_delivered(false); - method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); std::shared_ptr its_event = find_event(_service, _instance, its_method); if (its_event) { - if(its_event->is_field() && !its_event->is_provided()) { - // store the current value of the remote field + if (!its_event->is_provided()) { const uint32_t its_length(utility::get_payload_size(_data, _length)); - std::shared_ptr its_payload = - runtime::get()->create_payload( - &_data[VSOMEIP_PAYLOAD_POS], - its_length); - its_event->set_payload_dont_notify(its_payload); + std::shared_ptr its_payload + = runtime::get()->create_payload(&_data[VSOMEIP_PAYLOAD_POS], + its_length); + if (!its_event->set_payload_dont_notify(its_payload)) { + // do not forward the notification as it was filtered + return false; + } } for (const auto its_local_client : its_event->get_subscribers()) { @@ -1545,7 +1554,7 @@ bool routing_manager_impl::deliver_notification( } } - return is_delivered; + return true; } std::shared_ptr routing_manager_impl::find_eventgroup( @@ -1727,7 +1736,9 @@ std::shared_ptr routing_manager_impl::create_client_endpoint( configuration_->get_max_message_size_reliable( _address.to_string(), _remote_port), configuration_->get_buffer_shrink_threshold(), - std::chrono::milliseconds(configuration_->get_sd_ttl() * 666)); + std::chrono::milliseconds(configuration_->get_sd_ttl() * 666), + configuration_->get_endpoint_queue_limit( + _address.to_string(), _remote_port)); if (configuration_->has_enabled_magic_cookies(_address.to_string(), _remote_port)) { @@ -1742,7 +1753,8 @@ std::shared_ptr routing_manager_impl::create_client_endpoint( boost::asio::ip::udp::v6()), _local_port), boost::asio::ip::udp::endpoint(_address, _remote_port), - io_); + io_, configuration_->get_endpoint_queue_limit( + _address.to_string(), _remote_port)); } } catch (...) { host_->on_error(error_code_e::CLIENT_ENDPOINT_CREATION_FAILED); @@ -1765,7 +1777,9 @@ std::shared_ptr routing_manager_impl::create_server_endpoint( configuration_->get_max_message_size_reliable( its_unicast.to_string(), _port), configuration_->get_buffer_shrink_threshold(), - std::chrono::milliseconds(configuration_->get_sd_ttl() * 666)); + std::chrono::milliseconds(configuration_->get_sd_ttl() * 666), + configuration_->get_endpoint_queue_limit( + its_unicast.to_string(), _port)); if (configuration_->has_enabled_magic_cookies( its_unicast.to_string(), _port) || configuration_->has_enabled_magic_cookies( @@ -1773,6 +1787,9 @@ std::shared_ptr routing_manager_impl::create_server_endpoint( its_endpoint->enable_magic_cookies(); } } else { + configuration::endpoint_queue_limit_t its_limit = + configuration_->get_endpoint_queue_limit( + its_unicast.to_string(), _port); #ifndef _WIN32 if (its_unicast.is_v4()) { its_unicast = boost::asio::ip::address_v4::any(); @@ -1782,8 +1799,7 @@ std::shared_ptr routing_manager_impl::create_server_endpoint( #endif boost::asio::ip::udp::endpoint ep(its_unicast, _port); its_endpoint = std::make_shared( - shared_from_this(), - ep, io_); + shared_from_this(), ep, io_, its_limit); } } else { @@ -3287,10 +3303,10 @@ void routing_manager_impl::clear_remote_subscriber( if (its_instance != its_service->second.end()) { auto its_client = its_instance->second.find(_client); if (its_client != its_instance->second.end()) { - if (its_client->second.size() <= 1) { - its_instance->second.erase(_client); - } else { - its_client->second.erase(_target); + if (its_client->second.erase(_target)) { + if (!its_client->second.size()) { + its_instance->second.erase(_client); + } } } } diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index a62ee96..a4af448 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -1783,7 +1783,8 @@ void routing_manager_proxy::init_receiver() { boost::asio::local::stream_protocol::endpoint(its_client.str()), #endif io_, configuration_->get_max_message_size_local(), - configuration_->get_buffer_shrink_threshold()); + configuration_->get_buffer_shrink_threshold(), + configuration_->get_endpoint_queue_limit_local()); #ifdef _WIN32 VSOMEIP_INFO << "Listening at " << port; #else diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp index 504458e..010c8d0 100644 --- a/implementation/routing/src/routing_manager_stub.cpp +++ b/implementation/routing/src/routing_manager_stub.cpp @@ -848,7 +848,8 @@ void routing_manager_stub::init_routing_endpoint() { boost::asio::local::stream_protocol::endpoint(endpoint_path_), #endif io_, configuration_->get_max_message_size_local(), - configuration_->get_buffer_shrink_threshold()); + configuration_->get_buffer_shrink_threshold(), + configuration_->get_endpoint_queue_limit_local()); } catch (const std::exception &e) { VSOMEIP_ERROR << ERROR_INFO[static_cast(error_code_e::SERVER_ENDPOINT_CREATION_FAILED)] << " (" << static_cast(error_code_e::SERVER_ENDPOINT_CREATION_FAILED) << ")"; @@ -1459,7 +1460,8 @@ void routing_manager_stub::create_local_receiver() { boost::asio::local::stream_protocol::endpoint(local_receiver_path_), #endif io_, configuration_->get_max_message_size_local(), - configuration_->get_buffer_shrink_threshold()); + configuration_->get_buffer_shrink_threshold(), + configuration_->get_endpoint_queue_limit_local()); } catch (const std::exception &e) { VSOMEIP_ERROR << ERROR_INFO[static_cast(error_code_e::SERVER_ENDPOINT_CREATION_FAILED)] << " (" << static_cast(error_code_e::SERVER_ENDPOINT_CREATION_FAILED) << ")"; diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index 1d49df1..8d22c7c 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -1669,16 +1669,18 @@ void application_impl::invoke_handler(std::shared_ptr &_handler) { << "type=" << static_cast(its_sync_handler->handler_type_) << " thread=" << std::hex << its_id; } - { - std::lock_guard its_lock(dispatcher_mutex_); - running_dispatchers_.insert(its_id); - } - try { - _handler->handler_(); - } catch (const std::exception &e) { - VSOMEIP_ERROR << "application_impl::invoke_handler caught exception: " - << e.what(); - print_blocking_call(its_sync_handler); + if (is_dispatching_) { + { + std::lock_guard its_lock(dispatcher_mutex_); + running_dispatchers_.insert(its_id); + } + try { + _handler->handler_(); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "application_impl::invoke_handler caught exception: " + << e.what(); + print_blocking_call(its_sync_handler); + } } boost::system::error_code ec; its_dispatcher_timer.cancel(ec); diff --git a/implementation/service_discovery/include/configuration_option_impl.hpp b/implementation/service_discovery/include/configuration_option_impl.hpp index e600523..1d231e8 100644 --- a/implementation/service_discovery/include/configuration_option_impl.hpp +++ b/implementation/service_discovery/include/configuration_option_impl.hpp @@ -24,7 +24,7 @@ class configuration_option_impl: public option_impl { public: configuration_option_impl(); virtual ~configuration_option_impl(); - bool operator==(const option_impl &_other) const; + bool operator==(const configuration_option_impl &_other) const; void add_item(const std::string &_key, const std::string &_value); void remove_item(const std::string &_key); diff --git a/implementation/service_discovery/include/eventgroupentry_impl.hpp b/implementation/service_discovery/include/eventgroupentry_impl.hpp index 877cdbf..3a5d897 100755 --- a/implementation/service_discovery/include/eventgroupentry_impl.hpp +++ b/implementation/service_discovery/include/eventgroupentry_impl.hpp @@ -8,6 +8,7 @@ #include "entry_impl.hpp" #include "../../endpoints/include/endpoint_definition.hpp" +#include "message_impl.hpp" namespace vsomeip { namespace sd { @@ -31,19 +32,20 @@ public: bool deserialize(vsomeip::deserializer *_from); bool operator==(const eventgroupentry_impl& _other) const { - return !(ttl_ != _other.ttl_ || - service_ != _other.service_ || - instance_ != _other.instance_ || - eventgroup_ != _other.eventgroup_ || - index1_ != _other.index1_ || - index2_ != _other.index2_ || - num_options_[0] != _other.num_options_[0] || - num_options_[1] != _other.num_options_[1] || - major_version_ != _other.major_version_ || - counter_ != _other.counter_); + return (ttl_ == _other.ttl_ && + service_ == _other.service_ && + instance_ == _other.instance_ && + eventgroup_ == _other.eventgroup_ && + index1_ == _other.index1_ && + index2_ == _other.index2_ && + num_options_[0] == _other.num_options_[0] && + num_options_[1] == _other.num_options_[1] && + major_version_ == _other.major_version_ && + counter_ == _other.counter_); } - bool is_matching_subscribe(const eventgroupentry_impl& _other) const; + bool is_matching_subscribe(const eventgroupentry_impl& _other, + const message_impl::options_t& _options) const; void add_target(const std::shared_ptr &_target); std::shared_ptr get_target(bool _reliable) const; diff --git a/implementation/service_discovery/include/ip_option_impl.hpp b/implementation/service_discovery/include/ip_option_impl.hpp index 67f356b..1345835 100644 --- a/implementation/service_discovery/include/ip_option_impl.hpp +++ b/implementation/service_discovery/include/ip_option_impl.hpp @@ -17,7 +17,7 @@ class ip_option_impl: public option_impl { public: ip_option_impl(); virtual ~ip_option_impl(); - bool operator ==(const option_impl &_option) const; + virtual bool operator ==(const ip_option_impl &_option) const; uint16_t get_port() const; void set_port(uint16_t _port); diff --git a/implementation/service_discovery/include/ipv4_option_impl.hpp b/implementation/service_discovery/include/ipv4_option_impl.hpp index 5f22534..8ede63d 100644 --- a/implementation/service_discovery/include/ipv4_option_impl.hpp +++ b/implementation/service_discovery/include/ipv4_option_impl.hpp @@ -17,6 +17,7 @@ class ipv4_option_impl: public ip_option_impl { public: ipv4_option_impl(bool _is_multicast); virtual ~ipv4_option_impl(); + bool operator ==(const ipv4_option_impl &_other) const; const ipv4_address_t & get_address() const; void set_address(const ipv4_address_t &_address); diff --git a/implementation/service_discovery/include/ipv6_option_impl.hpp b/implementation/service_discovery/include/ipv6_option_impl.hpp index b082808..92701c0 100644 --- a/implementation/service_discovery/include/ipv6_option_impl.hpp +++ b/implementation/service_discovery/include/ipv6_option_impl.hpp @@ -17,6 +17,7 @@ class ipv6_option_impl: public ip_option_impl { public: ipv6_option_impl(bool _is_multicast); virtual ~ipv6_option_impl(); + bool operator ==(const ipv6_option_impl &_other) const; const ipv6_address_t & get_address() const; void set_address(const ipv6_address_t &_address); diff --git a/implementation/service_discovery/include/load_balancing_option_impl.hpp b/implementation/service_discovery/include/load_balancing_option_impl.hpp index e65989e..767c345 100755 --- a/implementation/service_discovery/include/load_balancing_option_impl.hpp +++ b/implementation/service_discovery/include/load_balancing_option_impl.hpp @@ -16,7 +16,7 @@ class load_balancing_option_impl: public option_impl { public: load_balancing_option_impl(); virtual ~load_balancing_option_impl(); - bool operator ==(const option_impl &_other) const; + bool operator ==(const load_balancing_option_impl &_other) const; priority_t get_priority() const; void set_priority(priority_t _priority); diff --git a/implementation/service_discovery/include/protection_option_impl.hpp b/implementation/service_discovery/include/protection_option_impl.hpp index 589bb7a..33d5849 100755 --- a/implementation/service_discovery/include/protection_option_impl.hpp +++ b/implementation/service_discovery/include/protection_option_impl.hpp @@ -16,7 +16,7 @@ class protection_option_impl: public option_impl { public: protection_option_impl(); virtual ~protection_option_impl(); - virtual bool operator ==(const option_impl &_other) const; + bool operator ==(const protection_option_impl &_other) const; alive_counter_t get_alive_counter() const; void set_alive_counter(alive_counter_t _counter); diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 214450d..d3c5f07 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -23,6 +23,7 @@ #include "ipv4_option_impl.hpp" #include "ipv6_option_impl.hpp" #include "deserializer.hpp" +#include "../../configuration/include/configuration.hpp" namespace vsomeip { @@ -312,7 +313,12 @@ private: const std::shared_ptr &_subscriber); bool check_stop_subscribe_subscribe(message_impl::entries_t::const_iterator _iter, - message_impl::entries_t::const_iterator _end) const; + message_impl::entries_t::const_iterator _end, + const message_impl::options_t& _options) const; + + configuration::ttl_factor_t get_ttl_factor( + service_t _service, instance_t _instance, + const configuration::ttl_map_t& _ttl_map) const; private: boost::asio::io_service &io_; @@ -402,6 +408,9 @@ private: std::map>>>>> pending_remote_subscriptions_; std::mutex response_mutex_; + + configuration::ttl_map_t ttl_factor_offers_; + configuration::ttl_map_t ttl_factor_subscriptions_; }; } // namespace sd diff --git a/implementation/service_discovery/src/configuration_option_impl.cpp b/implementation/service_discovery/src/configuration_option_impl.cpp index cb1937e..e9cf058 100755 --- a/implementation/service_discovery/src/configuration_option_impl.cpp +++ b/implementation/service_discovery/src/configuration_option_impl.cpp @@ -20,14 +20,10 @@ configuration_option_impl::configuration_option_impl() { configuration_option_impl::~configuration_option_impl() { } -bool configuration_option_impl::operator ==(const option_impl &_other) const { - if (_other.get_type() != option_type_e::CONFIGURATION) - return false; - - const configuration_option_impl& other = - dynamic_cast(_other); - - return (configuration_ == other.configuration_); +bool configuration_option_impl::operator ==( + const configuration_option_impl &_other) const { + return (option_impl::operator ==(_other) + && configuration_ == _other.configuration_); } void configuration_option_impl::add_item(const std::string &_key, diff --git a/implementation/service_discovery/src/eventgroupentry_impl.cpp b/implementation/service_discovery/src/eventgroupentry_impl.cpp index 1f9dd4a..f5394be 100755 --- a/implementation/service_discovery/src/eventgroupentry_impl.cpp +++ b/implementation/service_discovery/src/eventgroupentry_impl.cpp @@ -7,6 +7,8 @@ #include "../include/eventgroupentry_impl.hpp" #include "../../message/include/deserializer.hpp" #include "../../message/include/serializer.hpp" +#include "../include/ipv4_option_impl.hpp" +#include "../include/ipv6_option_impl.hpp" namespace vsomeip { namespace sd { @@ -116,18 +118,110 @@ bool eventgroupentry_impl::deserialize(vsomeip::deserializer *_from) { } bool eventgroupentry_impl::is_matching_subscribe( - const eventgroupentry_impl& _other) const { - return !(ttl_ != 0 - || _other.ttl_ == 0 - || service_ != _other.service_ - || instance_ != _other.instance_ - || eventgroup_ != _other.eventgroup_ - || index1_ != _other.index1_ - || index2_ != _other.index2_ - || num_options_[0] != _other.num_options_[0] - || num_options_[1] != _other.num_options_[1] - || major_version_ != _other.major_version_ - || counter_ != _other.counter_); + const eventgroupentry_impl& _other, + const message_impl::options_t& _options) const { + if (ttl_ == 0 + && _other.ttl_ > 0 + && service_ == _other.service_ + && instance_ == _other.instance_ + && eventgroup_ == _other.eventgroup_ + && index1_ == _other.index1_ + && index2_ == _other.index2_ + && num_options_[0] == _other.num_options_[0] + && num_options_[1] == _other.num_options_[1] + && major_version_ == _other.major_version_ + && counter_ == _other.counter_) { + return true; + } else if (ttl_ == 0 + && _other.ttl_ > 0 + && service_ == _other.service_ + && instance_ == _other.instance_ + && eventgroup_ == _other.eventgroup_ + && major_version_ == _other.major_version_ + && counter_ == _other.counter_) { + // check if entries reference options at different indexes but the + // options itself are identical + // check if number of options referenced is the same + if (num_options_[0] + num_options_[1] + != _other.num_options_[0] + _other.num_options_[1] || + num_options_[0] + num_options_[1] == 0) { + return false; + } + // read out ip options of current and _other + std::vector> its_options_current; + std::vector> its_options_other; + for (const auto option_run : {0,1}) { + for (const auto option_index : options_[option_run]) { + switch (_options[option_index]->get_type()) { + case option_type_e::IP4_ENDPOINT: + its_options_current.push_back( + std::static_pointer_cast( + _options[option_index])); + break; + case option_type_e::IP6_ENDPOINT: + its_options_current.push_back( + std::static_pointer_cast( + _options[option_index])); + break; + default: + break; + } + } + for (const auto option_index : _other.options_[option_run]) { + switch (_options[option_index]->get_type()) { + case option_type_e::IP4_ENDPOINT: + its_options_other.push_back( + std::static_pointer_cast( + _options[option_index])); + break; + case option_type_e::IP6_ENDPOINT: + its_options_other.push_back( + std::static_pointer_cast( + _options[option_index])); + break; + default: + break; + } + } + } + + if (!its_options_current.size() || !its_options_other.size()) { + return false; + } + + // search every option of current in other + for (const auto& c : its_options_current) { + bool found(false); + for (const auto& o : its_options_other) { + if (*c == *o) { + switch (c->get_type()) { + case option_type_e::IP4_ENDPOINT: + if (static_cast(c.get())->get_address() + == static_cast(o.get())->get_address()) { + found = true; + } + break; + case option_type_e::IP6_ENDPOINT: + if (static_cast(c.get())->get_address() + == static_cast(o.get())->get_address()) { + found = true; + } + break; + default: + break; + } + } + if (found) { + break; + } + } + if (!found) { + return false; + } + } + return true; + } + return false; } void eventgroupentry_impl::add_target( diff --git a/implementation/service_discovery/src/ip_option_impl.cpp b/implementation/service_discovery/src/ip_option_impl.cpp index 9be1c5f..0684dc8 100644 --- a/implementation/service_discovery/src/ip_option_impl.cpp +++ b/implementation/service_discovery/src/ip_option_impl.cpp @@ -21,15 +21,10 @@ ip_option_impl::ip_option_impl() : ip_option_impl::~ip_option_impl() { } -bool ip_option_impl::operator ==(const option_impl &_other) const { - if (type_ != _other.get_type()) - return false; - -#ifdef VSOMEIP_TODO - const ip_option_impl & other = - dynamic_cast(_other); -#endif - return true; +bool ip_option_impl::operator ==(const ip_option_impl &_other) const { + return (option_impl::operator ==(_other) + && protocol_ == _other.protocol_ + && port_ == _other.port_); } unsigned short ip_option_impl::get_port() const { diff --git a/implementation/service_discovery/src/ipv4_option_impl.cpp b/implementation/service_discovery/src/ipv4_option_impl.cpp index 00bf7b5..22ee7d8 100644 --- a/implementation/service_discovery/src/ipv4_option_impl.cpp +++ b/implementation/service_discovery/src/ipv4_option_impl.cpp @@ -25,6 +25,11 @@ ipv4_option_impl::ipv4_option_impl(bool _is_multicast) : ipv4_option_impl::~ipv4_option_impl() { } +bool ipv4_option_impl::operator ==(const ipv4_option_impl &_other) const { + return (ip_option_impl::operator ==(_other) + && address_ == _other.address_); +} + const ipv4_address_t & ipv4_option_impl::get_address() const { return address_; } diff --git a/implementation/service_discovery/src/ipv6_option_impl.cpp b/implementation/service_discovery/src/ipv6_option_impl.cpp index 94c8fe4..e4de3d6 100755 --- a/implementation/service_discovery/src/ipv6_option_impl.cpp +++ b/implementation/service_discovery/src/ipv6_option_impl.cpp @@ -25,6 +25,11 @@ ipv6_option_impl::ipv6_option_impl(bool _is_multicast) : ipv6_option_impl::~ipv6_option_impl() { } +bool ipv6_option_impl::operator ==(const ipv6_option_impl &_other) const { + return (ip_option_impl::operator ==(_other) + && address_ == _other.address_); +} + const ipv6_address_t & ipv6_option_impl::get_address() const { return address_; } diff --git a/implementation/service_discovery/src/load_balancing_option_impl.cpp b/implementation/service_discovery/src/load_balancing_option_impl.cpp index 8fe8217..937dd4d 100755 --- a/implementation/service_discovery/src/load_balancing_option_impl.cpp +++ b/implementation/service_discovery/src/load_balancing_option_impl.cpp @@ -20,14 +20,11 @@ load_balancing_option_impl::load_balancing_option_impl() { load_balancing_option_impl::~load_balancing_option_impl() { } -bool load_balancing_option_impl::operator ==(const option_impl &_other) const { - if (_other.get_type() != option_type_e::LOAD_BALANCING) - return false; - - const load_balancing_option_impl& other = - dynamic_cast(_other); - - return (priority_ == other.priority_ && weight_ == other.weight_); +bool load_balancing_option_impl::operator ==( + const load_balancing_option_impl &_other) const { + return (option_impl::operator ==(_other) + && priority_ == _other.priority_ + && priority_ == _other.weight_); } priority_t load_balancing_option_impl::get_priority() const { diff --git a/implementation/service_discovery/src/option_impl.cpp b/implementation/service_discovery/src/option_impl.cpp index 6854507..8191912 100755 --- a/implementation/service_discovery/src/option_impl.cpp +++ b/implementation/service_discovery/src/option_impl.cpp @@ -20,8 +20,7 @@ option_impl::~option_impl() { } bool option_impl::operator ==(const option_impl &_other) const { - (void)_other; - return false; + return (type_ == _other.type_ && length_ == _other.length_); } uint16_t option_impl::get_length() const { diff --git a/implementation/service_discovery/src/protection_option_impl.cpp b/implementation/service_discovery/src/protection_option_impl.cpp index b7db090..f847650 100755 --- a/implementation/service_discovery/src/protection_option_impl.cpp +++ b/implementation/service_discovery/src/protection_option_impl.cpp @@ -20,14 +20,11 @@ protection_option_impl::protection_option_impl() { protection_option_impl::~protection_option_impl() { } -bool protection_option_impl::operator ==(const option_impl &_other) const { - if (_other.get_type() != option_type_e::PROTECTION) - return false; - - const protection_option_impl& other = - dynamic_cast(_other); - - return (counter_ == other.counter_ && crc_ == other.crc_); +bool protection_option_impl::operator ==( + const protection_option_impl &_other) const { + return (option_impl::operator ==(_other) + && counter_ == _other.counter_ + && crc_ == _other.crc_); } alive_counter_t protection_option_impl::get_alive_counter() const { diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index f2171fb..fb13a8b 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -131,6 +131,9 @@ void service_discovery_impl::init() { offer_debounce_time_ = std::chrono::milliseconds( its_configuration->get_sd_offer_debounce_time()); ttl_timer_runtime_ = cyclic_offer_delay_ / 2; + + ttl_factor_offers_ = its_configuration->get_ttl_factor_offers(); + ttl_factor_subscriptions_ = its_configuration->get_ttl_factor_subscribes(); } else { VSOMEIP_ERROR << "SD: no configuration found!"; } @@ -1194,7 +1197,8 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, if (is_stop_subscribe_subscribe) { force_initial_events = true; } - is_stop_subscribe_subscribe = check_stop_subscribe_subscribe(iter, its_end); + is_stop_subscribe_subscribe = check_stop_subscribe_subscribe( + iter, its_end, its_message->get_options()); process_eventgroupentry(its_eventgroup_entry, its_options, its_message_response, _destination, its_message_id, is_stop_subscribe_subscribe, force_initial_events); @@ -1357,7 +1361,8 @@ void service_discovery_impl::process_offerservice_serviceentry( } host_->add_routing_info(_service, _instance, - _major, _minor, _ttl * 5, + _major, _minor, + _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_), _reliable_address, _reliable_port, _unreliable_address, _unreliable_port); @@ -1410,13 +1415,24 @@ void service_discovery_impl::process_offerservice_serviceentry( its_eventgroup.first, its_subscription, false, true); its_client.second->set_tcp_connection_established(false); - } + // restart TCP endpoint if not connected + if (its_subscription->get_endpoint(true) + && !its_subscription->get_endpoint(true)->is_connected()) { + its_subscription->get_endpoint(true)->restart(); + } + } its_subscription->set_acknowledged(false); } else { insert_nack_subscription_on_resubscribe(its_message, _service, _instance, its_eventgroup.first, its_subscription); + + // restart TCP endpoint if not connected + if (its_subscription->get_endpoint(true) + && !its_subscription->get_endpoint(true)->is_connected()) { + its_subscription->get_endpoint(true)->restart(); + } } } } @@ -2171,8 +2187,8 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service, client_t its_subscribing_remote_client = VSOMEIP_ROUTING_CLIENT; switch (host_->on_remote_subscription(_service, _instance, _eventgroup, target.subscriber_, target.target_, - _ttl * 5, &its_subscribing_remote_client, - _message_id)) { + _ttl * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_), + &its_subscribing_remote_client, _message_id)) { case remote_subscription_state_e::SUBSCRIPTION_ACKED: insert_subscription_ack(its_message, _service, _instance, _eventgroup, its_info, _ttl, @@ -2460,7 +2476,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ found_client->second, _reliable, !_reliable); found_client->second->set_udp_connection_established(true); } else { - // don't insert reliable endpoint option if the + // don't insert unreliable endpoint option if the // UDP client endpoint is not yet connected found_client->second->set_udp_connection_established(false); } @@ -3181,7 +3197,12 @@ void service_discovery_impl::update_subscription_expiration_timer( if (entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK && entry->get_ttl()) { const std::chrono::steady_clock::time_point its_expiration = now - + std::chrono::seconds(entry->get_ttl() * 5); + + std::chrono::seconds( + entry->get_ttl() + * get_ttl_factor( + entry->get_service(), + entry->get_instance(), + ttl_factor_subscriptions_)); if (its_expiration < next_subscription_expiration_) { next_subscription_expiration_ = its_expiration; } @@ -3364,17 +3385,35 @@ void service_discovery_impl::remote_subscription_remove( bool service_discovery_impl::check_stop_subscribe_subscribe( message_impl::entries_t::const_iterator _iter, - message_impl::entries_t::const_iterator _end) const { + message_impl::entries_t::const_iterator _end, + const message_impl::options_t& _options) const { const message_impl::entries_t::const_iterator its_next = std::next(_iter); - if ((*_iter)->get_type() != entry_type_e::SUBSCRIBE_EVENTGROUP + if ((*_iter)->get_ttl() > 0 + || (*_iter)->get_type() != entry_type_e::STOP_SUBSCRIBE_EVENTGROUP || its_next == _end - || (*its_next)->get_type() != entry_type_e::STOP_SUBSCRIBE_EVENTGROUP) { + || (*its_next)->get_type() != entry_type_e::SUBSCRIBE_EVENTGROUP) { return false; } return (*static_cast(_iter->get())).is_matching_subscribe( - *(static_cast(its_next->get()))); + *(static_cast(its_next->get())), _options); } +configuration::ttl_factor_t service_discovery_impl::get_ttl_factor( + service_t _service, instance_t _instance, + const configuration::ttl_map_t& _ttl_map) const { + configuration::ttl_factor_t its_ttl_factor(1); + auto found_service = _ttl_map.find(_service); + if (found_service != _ttl_map.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + its_ttl_factor = found_instance->second; + } + } + return its_ttl_factor; +} + + + } // namespace sd } // namespace vsomeip diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b3a29c8..25ac281 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -851,6 +851,14 @@ if(NOT ${TESTS_BAT}) ${TEST_BIG_PAYLOAD_SERVICE} ) + # Copy config file for client and service into $BUILDDIR/test + set(TEST_QUEUE_LIMITED_LOCAL_BIG_PAYLOAD_CONFIG_FILE ${TEST_BIG_PAYLOAD_NAME}_local_queue_limited.json) + copy_to_builddir( + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_QUEUE_LIMITED_LOCAL_BIG_PAYLOAD_CONFIG_FILE} + ${PROJECT_BINARY_DIR}/test/${TEST_QUEUE_LIMITED_LOCAL_BIG_PAYLOAD_CONFIG_FILE} + ${TEST_BIG_PAYLOAD_SERVICE} + ) + # Copy config file for client and service into $BUILDDIR/test set(TEST_LOCAL_BIG_PAYLOAD_CONFIG_FILE_RANDOM ${TEST_BIG_PAYLOAD_NAME}_local_random.json) copy_to_builddir( @@ -939,6 +947,54 @@ if(NOT ${TESTS_BAT}) ${TEST_BIG_PAYLOAD_SERVICE} ) + # Copy config file for client and service into $BUILDDIR/test + set(TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL ${TEST_BIG_PAYLOAD_NAME}_tcp_client_queue_limited_general.json) + configure_file( + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/conf/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL}.in + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL} + @ONLY) + copy_to_builddir( + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL} + ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL} + ${TEST_BIG_PAYLOAD_CLIENT} + ) + + # Copy config file for client and service into $BUILDDIR/test + set(TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL ${TEST_BIG_PAYLOAD_NAME}_tcp_service_queue_limited_general.json) + configure_file( + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/conf/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL}.in + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL} + @ONLY) + copy_to_builddir( + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL} + ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL} + ${TEST_BIG_PAYLOAD_SERVICE} + ) + + # Copy config file for client and service into $BUILDDIR/test + set(TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC ${TEST_BIG_PAYLOAD_NAME}_tcp_client_queue_limited_specific.json) + configure_file( + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/conf/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}.in + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC} + @ONLY) + copy_to_builddir( + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC} + ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC} + ${TEST_BIG_PAYLOAD_CLIENT} + ) + + # Copy config file for client and service into $BUILDDIR/test + set(TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC ${TEST_BIG_PAYLOAD_NAME}_tcp_service_queue_limited_specific.json) + configure_file( + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/conf/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}.in + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC} + @ONLY) + copy_to_builddir( + ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC} + ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC} + ${TEST_BIG_PAYLOAD_SERVICE} + ) + # Copy bashscript to start client local to $BUILDDIR/test set(TEST_LOCAL_BIG_PAYLOAD_CLIENT_START_SCRIPT ${TEST_BIG_PAYLOAD_NAME}_client_local_start.sh) copy_to_builddir(${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_LOCAL_BIG_PAYLOAD_CLIENT_START_SCRIPT} @@ -957,6 +1013,7 @@ if(NOT ${TESTS_BAT}) set(TEST_LOCAL_BIG_PAYLOAD_NAME big_payload_test_local) set(TEST_LOCAL_BIG_PAYLOAD_NAME_RANDOM big_payload_test_local_random) set(TEST_LOCAL_BIG_PAYLOAD_NAME_LIMITED big_payload_test_local_limited) + set(TEST_LOCAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED big_payload_test_local_queue_limited) set(TEST_LOCAL_BIG_PAYLOAD_STARTER ${TEST_LOCAL_BIG_PAYLOAD_NAME}_starter.sh) copy_to_builddir(${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_LOCAL_BIG_PAYLOAD_STARTER} ${PROJECT_BINARY_DIR}/test/${TEST_LOCAL_BIG_PAYLOAD_STARTER} @@ -967,7 +1024,9 @@ if(NOT ${TESTS_BAT}) set(TEST_EXTERNAL_BIG_PAYLOAD_NAME big_payload_test_external) set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_RANDOM big_payload_test_external_random) set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_LIMITED big_payload_test_external_limited) - set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_LIMITED_GENERAL big_payload_test_external_limited_general) + set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_LIMITED_GENERAL big_payload_test_external_limited_general) + set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_GENERAL big_payload_test_external_queue_limited_general) + set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_SPECIFIC big_payload_test_external_queue_limited_specific) set(TEST_EXTERNAL_BIG_PAYLOAD_STARTER ${TEST_EXTERNAL_BIG_PAYLOAD_NAME}_starter.sh) copy_to_builddir(${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_STARTER} ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_STARTER} @@ -2218,6 +2277,21 @@ if(NOT ${TESTS_BAT}) ) set_tests_properties(${TEST_EXTERNAL_BIG_PAYLOAD_NAME_LIMITED_GENERAL} PROPERTIES TIMEOUT 120) + add_test(NAME ${TEST_LOCAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED} + COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_LOCAL_BIG_PAYLOAD_STARTER} QUEUELIMITEDGENERAL + ) + set_tests_properties(${TEST_LOCAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED} PROPERTIES TIMEOUT 120) + + add_test(NAME ${TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_GENERAL} + COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_STARTER} QUEUELIMITEDGENERAL + ) + set_tests_properties(${TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_GENERAL} PROPERTIES TIMEOUT 120) + + add_test(NAME ${TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_SPECIFIC} + COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_STARTER} QUEUELIMITEDSPECIFIC + ) + set_tests_properties(${TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_SPECIFIC} PROPERTIES TIMEOUT 120) + # client id tests add_test(NAME ${TEST_CLIENT_ID_NAME}_diff_client_ids_diff_ports COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_CLIENT_ID_MASTER_STARTER} ${TEST_CLIENT_ID_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE}) diff --git a/test/big_payload_tests/big_payload_test_client.cpp b/test/big_payload_tests/big_payload_test_client.cpp index 6264836..9b50e79 100644 --- a/test/big_payload_tests/big_payload_test_client.cpp +++ b/test/big_payload_tests/big_payload_test_client.cpp @@ -31,6 +31,12 @@ big_payload_test_client::big_payload_test_client( case big_payload_test::test_mode::LIMITED_GENERAL: service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_LIMITED_GENERAL; break; + case big_payload_test::test_mode::QUEUE_LIMITED_GENERAL: + service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_GENERAL; + break; + case big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC: + service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_SPECIFIC; + break; default: service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID; break; @@ -71,7 +77,9 @@ void big_payload_test_client::stop() { VSOMEIP_INFO << "Stopping..."; if (test_mode_ == big_payload_test::test_mode::LIMITED - || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) { + || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) { std::this_thread::sleep_for(std::chrono::milliseconds(3000)); ASSERT_EQ(number_of_acknowledged_messages_, number_of_messages_to_send_ / 4); } @@ -82,7 +90,9 @@ void big_payload_test_client::stop() void big_payload_test_client::join_sender_thread(){ sender_.join(); if (test_mode_ == big_payload_test::test_mode::LIMITED - || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) { + || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) { ASSERT_EQ(number_of_acknowledged_messages_, number_of_messages_to_send_ / 4); } else { ASSERT_EQ(number_of_sent_messages_, number_of_acknowledged_messages_); @@ -150,7 +160,9 @@ void big_payload_test_client::on_message(const std::shared_ptr } number_of_acknowledged_messages_++; if (test_mode_ == big_payload_test::test_mode::LIMITED - || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) { + || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) { if (number_of_acknowledged_messages_ == number_of_messages_to_send_ / 4) { send(); } @@ -191,7 +203,9 @@ void big_payload_test_client::run() unsigned int datasize(std::rand() % big_payload_test::BIG_PAYLOAD_SIZE_RANDOM); its_payload_data.assign(datasize, big_payload_test::DATA_CLIENT_TO_SERVICE); } else if (test_mode_ == big_payload_test::test_mode::LIMITED - || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) { + || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) { if (i % 2) { // try to sent a too big payload for half of the messages its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 3, @@ -207,6 +221,10 @@ void big_payload_test_client::run() its_payload->set_data(its_payload_data); request_->set_payload(its_payload); app_->send(request_, true); + if (test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } VSOMEIP_INFO << "Client/Session [" << std::setw(4) << std::setfill('0') << std::hex << request_->get_client() << "/" << std::setw(4) << std::setfill('0') << std::hex << request_->get_session() @@ -223,7 +241,9 @@ void big_payload_test_client::run() GTEST_FATAL_FAILURE_("Didn't receive all replies within time"); } else { if (test_mode_ == big_payload_test::LIMITED - || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) { + || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL + || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) { ASSERT_EQ(number_of_messages_to_send_ / 4, number_of_acknowledged_messages_); } else { @@ -258,6 +278,10 @@ int main(int argc, char** argv) test_mode = big_payload_test::test_mode::LIMITED; } else if (std::string("LIMITEDGENERAL") == std::string(argv[1])) { test_mode = big_payload_test::test_mode::LIMITED_GENERAL; + } else if (std::string("QUEUELIMITEDGENERAL") == std::string(argv[1])) { + test_mode = big_payload_test::test_mode::QUEUE_LIMITED_GENERAL; + } else if (std::string("QUEUELIMITEDSPECIFIC") == std::string(argv[1])) { + test_mode = big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC; } } return RUN_ALL_TESTS(); diff --git a/test/big_payload_tests/big_payload_test_external_starter.sh b/test/big_payload_tests/big_payload_test_external_starter.sh index f76da1c..6c04a3d 100755 --- a/test/big_payload_tests/big_payload_test_external_starter.sh +++ b/test/big_payload_tests/big_payload_test_external_starter.sh @@ -10,7 +10,7 @@ # the testcase simply executes this script. This script then runs client # and service and checks that both exit successfully. -if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" && $1 != "LIMITEDGENERAL" ]] +if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" && $1 != "LIMITEDGENERAL" && $1 != "QUEUELIMITEDGENERAL" && $1 != "QUEUELIMITEDSPECIFIC" ]] then echo "The only allowed parameter to this script is RANDOM or LIMITED or LIMITEDGENERAL." echo "Like $0 RANDOM" @@ -24,6 +24,10 @@ if [[ $# -gt 0 && $1 == "RANDOM" ]]; then export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client_random.json elif [[ $# -gt 0 && $1 == "LIMITEDGENERAL" ]]; then export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client_limited_general.json +elif [[ $# -gt 0 && $1 == "QUEUELIMITEDGENERAL" ]]; then + export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client_queue_limited_general.json +elif [[ $# -gt 0 && $1 == "QUEUELIMITEDSPECIFIC" ]]; then + export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client_queue_limited_specific.json else export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client.json fi diff --git a/test/big_payload_tests/big_payload_test_globals.hpp b/test/big_payload_tests/big_payload_test_globals.hpp index 152217d..5488259 100644 --- a/test/big_payload_tests/big_payload_test_globals.hpp +++ b/test/big_payload_tests/big_payload_test_globals.hpp @@ -21,6 +21,8 @@ namespace big_payload_test { constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_LIMITED = 0x1235; constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_RANDOM = 0x1236; constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_LIMITED_GENERAL = 0x1237; + constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_GENERAL = 0x1238; + constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_SPECIFIC = 0x1239; constexpr vsomeip::service_t TEST_SERVICE_INSTANCE_ID = 0x1; constexpr vsomeip::method_t TEST_SERVICE_METHOD_ID = 0x8421; @@ -29,6 +31,8 @@ namespace big_payload_test { RANDOM, LIMITED, LIMITED_GENERAL, + QUEUE_LIMITED_GENERAL, + QUEUE_LIMITED_SPECIFIC, UNKNOWN }; } diff --git a/test/big_payload_tests/big_payload_test_local_queue_limited.json b/test/big_payload_tests/big_payload_test_local_queue_limited.json new file mode 100644 index 0000000..7252680 --- /dev/null +++ b/test/big_payload_tests/big_payload_test_local_queue_limited.json @@ -0,0 +1,42 @@ +{ + "unicast":"127.0.0.1", + "logging": + { + "level":"error", + "console":"true", + "file": + { + "enable":"false", + "path":"/tmp/vsomeip.log" + }, + "dlt":"false" + }, + "applications": + [ + { + "name":"big_payload_test_service", + "id":"0x1277" + }, + { + "name":"big_payload_test_client", + "id":"0x1344" + } + ], + "services": + [ + { + "service":"0x1234", + "instance":"0x5678" + } + ], + "endpoint-queue-limit-local" : "614428", + "routing":"big_payload_test_service", + "service-discovery": + { + "enable":"true", + "multicast":"224.244.224.245", + "port":"30490", + "protocol":"udp" + } +} + diff --git a/test/big_payload_tests/big_payload_test_local_starter.sh b/test/big_payload_tests/big_payload_test_local_starter.sh index 3d42828..6439cac 100755 --- a/test/big_payload_tests/big_payload_test_local_starter.sh +++ b/test/big_payload_tests/big_payload_test_local_starter.sh @@ -10,9 +10,9 @@ # the testcase simply executes this script. This script then runs client # and service and checks that both exit successfully. -if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" ]] +if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" && $1 != "QUEUELIMITEDGENERAL" ]] then - echo "The only allowed parameter to this script is RANDOM or LIMITED." + echo "The only allowed parameter to this script is RANDOM or LIMITED or QUEUELIMITEDGENERAL." echo "Like $0 RANDOM" exit 1 fi @@ -25,6 +25,8 @@ if [[ $# -gt 0 && $1 == "RANDOM" ]]; then export VSOMEIP_CONFIGURATION=big_payload_test_local_random.json elif [[ $# -gt 0 && $1 == "LIMITED" ]]; then export VSOMEIP_CONFIGURATION=big_payload_test_local_limited.json +elif [[ $# -gt 0 && $1 == "QUEUELIMITEDGENERAL" ]]; then + export VSOMEIP_CONFIGURATION=big_payload_test_local_queue_limited.json else export VSOMEIP_CONFIGURATION=big_payload_test_local.json fi diff --git a/test/big_payload_tests/big_payload_test_service.cpp b/test/big_payload_tests/big_payload_test_service.cpp index 34b2d85..4031550 100644 --- a/test/big_payload_tests/big_payload_test_service.cpp +++ b/test/big_payload_tests/big_payload_test_service.cpp @@ -28,6 +28,14 @@ big_payload_test_service::big_payload_test_service(big_payload_test::test_mode _ expected_messages_ = big_payload_test::BIG_PAYLOAD_TEST_NUMBER_MESSAGES / 2; service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_LIMITED_GENERAL; break; + case big_payload_test::test_mode::QUEUE_LIMITED_GENERAL: + expected_messages_ = big_payload_test::BIG_PAYLOAD_TEST_NUMBER_MESSAGES / 2; + service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_GENERAL; + break; + case big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC: + expected_messages_ = big_payload_test::BIG_PAYLOAD_TEST_NUMBER_MESSAGES / 2; + service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_SPECIFIC; + break; default: expected_messages_ = big_payload_test::BIG_PAYLOAD_TEST_NUMBER_MESSAGES; service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID; @@ -146,7 +154,9 @@ void big_payload_test_service::on_message(const std::shared_ptr