summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:07 -0800
committerJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:07 -0800
commit8bb2ed134d75803e8e6e3c4f4baa253e4d74edf4 (patch)
tree64cfea7bda038f1fe7b8fd79370104284af0fcea
parentca8af4e6da0eb1f5c268ace102c0ac6aa5545b5c (diff)
downloadvSomeIP-8bb2ed134d75803e8e6e3c4f4baa253e4d74edf4.tar.gz
vsomeip 2.10.52.10.5
-rw-r--r--CHANGES28
-rw-r--r--CMakeLists.txt2
-rw-r--r--documentation/vsomeipUserGuide159
-rw-r--r--examples/request-sample.cpp3
-rw-r--r--implementation/configuration/include/configuration.hpp18
-rw-r--r--implementation/configuration/include/configuration_impl.hpp40
-rw-r--r--implementation/configuration/include/debounce.hpp39
-rw-r--r--implementation/configuration/include/internal.hpp.in9
-rw-r--r--implementation/configuration/src/configuration_impl.cpp364
-rw-r--r--implementation/endpoints/include/client_endpoint_impl.hpp8
-rw-r--r--implementation/endpoints/include/endpoint_impl.hpp6
-rw-r--r--implementation/endpoints/include/local_client_endpoint_impl.hpp3
-rw-r--r--implementation/endpoints/include/local_server_endpoint_impl.hpp6
-rw-r--r--implementation/endpoints/include/server_endpoint_impl.hpp5
-rw-r--r--implementation/endpoints/include/tcp_client_endpoint_impl.hpp3
-rw-r--r--implementation/endpoints/include/tcp_server_endpoint_impl.hpp5
-rw-r--r--implementation/endpoints/include/udp_client_endpoint_impl.hpp3
-rw-r--r--implementation/endpoints/include/udp_server_endpoint_impl.hpp3
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp75
-rw-r--r--implementation/endpoints/src/endpoint_impl.cpp6
-rw-r--r--implementation/endpoints/src/local_client_endpoint_impl.cpp16
-rw-r--r--implementation/endpoints/src/local_server_endpoint_impl.cpp23
-rw-r--r--implementation/endpoints/src/server_endpoint_impl.cpp64
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp24
-rw-r--r--implementation/endpoints/src/tcp_server_endpoint_impl.cpp31
-rw-r--r--implementation/endpoints/src/udp_client_endpoint_impl.cpp13
-rw-r--r--implementation/endpoints/src/udp_server_endpoint_impl.cpp14
-rw-r--r--implementation/routing/include/event.hpp2
-rw-r--r--implementation/routing/include/routing_manager_impl.hpp2
-rw-r--r--implementation/routing/include/routing_manager_stub_host.hpp2
-rw-r--r--implementation/routing/src/event.cpp7
-rw-r--r--implementation/routing/src/routing_manager_base.cpp97
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp88
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp3
-rw-r--r--implementation/routing/src/routing_manager_stub.cpp6
-rw-r--r--implementation/runtime/src/application_impl.cpp22
-rw-r--r--implementation/service_discovery/include/configuration_option_impl.hpp2
-rwxr-xr-ximplementation/service_discovery/include/eventgroupentry_impl.hpp24
-rw-r--r--implementation/service_discovery/include/ip_option_impl.hpp2
-rw-r--r--implementation/service_discovery/include/ipv4_option_impl.hpp1
-rw-r--r--implementation/service_discovery/include/ipv6_option_impl.hpp1
-rwxr-xr-ximplementation/service_discovery/include/load_balancing_option_impl.hpp2
-rwxr-xr-ximplementation/service_discovery/include/protection_option_impl.hpp2
-rw-r--r--implementation/service_discovery/include/service_discovery_impl.hpp11
-rwxr-xr-ximplementation/service_discovery/src/configuration_option_impl.cpp12
-rwxr-xr-ximplementation/service_discovery/src/eventgroupentry_impl.cpp118
-rw-r--r--implementation/service_discovery/src/ip_option_impl.cpp13
-rw-r--r--implementation/service_discovery/src/ipv4_option_impl.cpp5
-rwxr-xr-ximplementation/service_discovery/src/ipv6_option_impl.cpp5
-rwxr-xr-ximplementation/service_discovery/src/load_balancing_option_impl.cpp13
-rwxr-xr-ximplementation/service_discovery/src/option_impl.cpp3
-rwxr-xr-ximplementation/service_discovery/src/protection_option_impl.cpp13
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp61
-rw-r--r--test/CMakeLists.txt76
-rw-r--r--test/big_payload_tests/big_payload_test_client.cpp34
-rwxr-xr-xtest/big_payload_tests/big_payload_test_external_starter.sh6
-rw-r--r--test/big_payload_tests/big_payload_test_globals.hpp4
-rw-r--r--test/big_payload_tests/big_payload_test_local_queue_limited.json42
-rwxr-xr-xtest/big_payload_tests/big_payload_test_local_starter.sh6
-rw-r--r--test/big_payload_tests/big_payload_test_service.cpp20
-rwxr-xr-xtest/big_payload_tests/big_payload_test_service_external_start.sh8
-rw-r--r--test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_general.json.in31
-rw-r--r--test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_specific.json.in43
-rw-r--r--test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_general.json.in87
-rw-r--r--test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_specific.json.in99
65 files changed, 1695 insertions, 248 deletions
diff --git a/CHANGES b/CHANGES
index 1fa9a5f..6f3f91d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,34 @@
Changes
=======
+v2.10.5
+- Fix possible deadlock on application shutdown
+- Try to reestablish TCP connection on resubscription if the remote
+ closed the connection
+- Introduce new configuration file parameters to control
+ interpretation of TTL field of incoming remote offers and
+ subscriptions:
+ - service-discovery > ttl_factor_offers (optional array of
+ service/instance/TTL factor tuples)
+ - service-discovery > ttl_factor_subscriptions (optional array of
+ service/instance/TTL factor tuples)
+- Added possibility to debounce external events/fields
+ based on time or change of data in the payload (maskable) via new
+ configuration file parameter:
+ - debounce (optional array)
+ For more information see the vsomeipUserGuide.
+- Added possibility to limit amount of memory used to cache outgoing
+ messages on IP port basis or globally via configuration file
+ parameter:
+ - endpoint-queue-limits (array): to limit on IP:Port (endpoint)
+ level
+ - endpoint-queue-limit-external: to generally limit all external
+ endpoints.
+ - endpoint-queue-limit-local: to limit queue sizes for local
+ communication
+ For more information see the vsomeipUserGuide.
+
+
v2.10.4
- Extended diagnosis plugin to handle requests for
"disableRxAndEnableTx" and "disableRxAndTx".
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d1df178..e062da1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -8,7 +8,7 @@ project (vsomeip)
set (VSOMEIP_MAJOR_VERSION 2)
set (VSOMEIP_MINOR_VERSION 10)
-set (VSOMEIP_PATCH_VERSION 4)
+set (VSOMEIP_PATCH_VERSION 5)
set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION})
set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in
set (CMAKE_VERBOSE_MAKEFILE off)
diff --git a/documentation/vsomeipUserGuide b/documentation/vsomeipUserGuide
index 0c222b9..3fb07c3 100644
--- a/documentation/vsomeipUserGuide
+++ b/documentation/vsomeipUserGuide
@@ -650,7 +650,7 @@ remote service hosted on beforehand specified IP and port.
On service side: the payload size limit in bytes of messages received and sent
by the service offered on previously specified IP and port.
+
-If multiple services are hosted on the same port all they share the limit
+If multiple services are hosted on the same port they all share the limit
specified.
* `max-payload-size-local`
@@ -665,6 +665,62 @@ The maximum allowed payload size for TCP communication in
bytes. By default the payload size for TCP communication is
unlimited. It can be limited via this setting.
+* `endpoint-queue-limits` (array)
++
+Array to limit the maximum allowed size in bytes of cached outgoing messages per
+IP and port (message queue size per endpoint). If not specified otherwise the
+allowed queue size is unlimited. The settings in this array only affect external
+communication. To limit the local queue size `endpoint-queue-limit-local` can
+be used.
+
+** `unicast`
++
+On client side: the IP of the remote service for which the queue size of sent
+requests should be limited.
++
+On service side: the IP of the offered service for which the queue size for
+sent responses should be limited. This IP address is therefore
+identical to the IP address specified via `unicast` setting on top level of the
+json file.
+
+** `ports` (array)
++
+Array which holds pairs of port and queue size statements.
+
+*** `port`
++
+On client side: the port of the remote service for which the queue size of sent
+requests should be limited.
++
+On service side: the port of the offered service for which the queue size for
+send responses should be limited.
+
+*** `queue-size-limit`
++
+On client side: the queue size limit in bytes of messages sent to the
+remote service hosted on beforehand specified IP and port.
++
+On service side: the queue size limit in bytes for responses sent by the service
+offered on previously specified IP and port.
++
+If multiple services are hosted on the same port they all share the limit
+specified.
+
+* `endpoint-queue-limit-external`
++
+Setting to limit the maximum allowed size in bytes of cached outgoing messages
+for external communication (message queue size per endpoint). By default the
+queue size for external communication is unlimited. It can be limited via this
+setting. Settings done in the `endpoint-queue-limits` array override this
+setting.
+
+* `endpoint-queue-limit-local`
++
+Setting to limit the maximum allowed size in bytes of cached outgoing messages
+for local communication (message queue size per endpoint). By default the queue
+size for node internal communication is unlimited. It can be limited via this
+setting.
+
* `buffer-shrink-threshold`
+
The number of processed messages which are half the size or smaller than the
@@ -714,6 +770,60 @@ The highest Service-ID in hex of a internal service range.
The highest Instance-ID in hex of a internal service-instance range.
If not specified the highest Instance-ID is 0xFFFF.
+* `debounce` (optional array)
++
+Events/fields sent by external devices will be forwarded to the
+applications only if a configurable function evaluates to true. The
+function checks whether the event/field payload has changed and whether
+a specified interval has been elapsed since the last forwarding.
+
+** `service`
++
+Service ID which hosts the events to be debounced.
+
+** `instance`
++
+Instance ID which hosts the events to be debounced.
+
+** `events`
++
+Array of events which shall be debounced based on the following
+configuration options.
+
+*** `event`
++
+Event ID.
+
+*** `on_change`
++
+Specifies whether the event is only forwared on
+paylaod change or not. (valid values: _true_, _false_).
+
+*** `ignore`
++
+Array of payload indexes with given bit mask (optional)
+to be ignored in payload change evaluation.
+Instead of specifying an index / bitmask pair, one can only define the paylaod index
+which shall be ignored in the evaluation.
+
+**** `index`
++
+Payload index to be checked with given bitmask.
+
+**** `mask`
++
+1Byte bitmask applied to byte at given payload index.
+Example mask: 0x0f ignores payload changes in low nibble of the byte at given index.
+
+*** `interval`
++
+Specifies if the event shall be debounced based on elapsed time interval.
+(valid values: _time in ms_, _never_).
+
+*** `on_change_resets_interval_` (optional)
+Specifies if interval timer is reset when payload change was detected.
+(valid values: _false_, _true_).
+
* `routing`
+
The name of the application that is responsible for the routing.
@@ -762,10 +872,55 @@ repetition phase.
+
Lifetime of entries for provided services as well as consumed services and eventgroups.
+** `ttl_factor_offers` (optional array)
++
+Array which holds correction factors for incoming remote offers. If a value
+greater than one is specified for a service instance, the TTL field of the
+corresponding service entry will be multiplied with the specified factor. +
+Example: An offer of a service is received with a TTL of 3 sec and the TTL
+factor is set to 5. The remote node stops offering the service w/o sending a
+StopOffer message. The service will then expire (marked as unavailable) 15 seconds
+after the last offer has been received.
+
+*** `service`
++
+The id of the service.
+
+*** `instance`
++
+The id of the service instance.
+
+*** `ttl_factor`
++
+TTL correction factor
+
+** `ttl_factor_subscriptions` (optional array)
++
+Array which holds correction factors for incoming remote subscriptions. If a
+value greater than one is specified for a service instance, the TTL field of the
+corresponding eventgroup entry will be multiplied with the specified factor. +
+Example: A remote subscription to an offered service is received with a TTL of 3
+sec and the TTL factor is set to 5. The remote node stops resubscribing to the
+service w/o sending a StopSubscribeEventgroup message. The subscription will
+then expire 15 seconds after the last resubscription has been received.
+
+*** `service`
++
+The id of the service.
+
+*** `instance`
++
+The id of the service instance.
+
+*** `ttl_factor`
++
+TTL correction factor
+
** `cyclic_offer_delay`
+
Cycle of the OfferService messages in the main phase.
+
** `request_response_delay`
+
Minimum delay of a unicast message to a multicast message for
@@ -926,7 +1081,7 @@ To activate the 'Audit Mode' the 'security' object has to be included in the
json file but the 'check_credentials' switch has to be set to false. For
example:
-[source, json]
+[source, bash]
----
[...]
"services" :
diff --git a/examples/request-sample.cpp b/examples/request-sample.cpp
index 39dfaaa..7732cb0 100644
--- a/examples/request-sample.cpp
+++ b/examples/request-sample.cpp
@@ -150,7 +150,6 @@ public:
{
std::unique_lock<std::mutex> its_lock(mutex_);
while (!blocked_) condition_.wait(its_lock);
- std::this_thread::sleep_for(std::chrono::milliseconds(cycle_));
if (is_available_) {
app_->send(request_, true);
std::cout << "Client/Session ["
@@ -165,8 +164,8 @@ public:
<< std::endl;
blocked_ = false;
}
-
}
+ std::this_thread::sleep_for(std::chrono::milliseconds(cycle_));
}
}
diff --git a/implementation/configuration/include/configuration.hpp b/implementation/configuration/include/configuration.hpp
index 9908e48..16ebd33 100644
--- a/implementation/configuration/include/configuration.hpp
+++ b/implementation/configuration/include/configuration.hpp
@@ -25,6 +25,8 @@
#include "../../e2e_protection/include/e2exf/config.hpp"
#include "e2e.hpp"
+#include "debounce.hpp"
+
#define VSOMEIP_CONFIG_PLUGIN_VERSION 1
namespace vsomeip {
@@ -150,6 +152,22 @@ public:
virtual bool log_status() const = 0;
virtual uint32_t get_log_status_interval() const = 0;
+
+ // TTL factor
+ typedef std::uint32_t ttl_factor_t;
+ typedef std::map<service_t, std::map<instance_t, ttl_factor_t>> ttl_map_t;
+ virtual ttl_map_t get_ttl_factor_offers() const = 0;
+ virtual ttl_map_t get_ttl_factor_subscribes() const = 0;
+
+ // Debouncing
+ virtual std::shared_ptr<cfg::debounce> get_debounce(
+ service_t _service, instance_t _instance, event_t _event) const = 0;
+
+ // Queue size limit endpoints
+ typedef std::uint32_t endpoint_queue_limit_t;
+ virtual endpoint_queue_limit_t get_endpoint_queue_limit(
+ const std::string& _address, std::uint16_t _port) const = 0;
+ virtual endpoint_queue_limit_t get_endpoint_queue_limit_local() const = 0;
};
} // namespace vsomeip
diff --git a/implementation/configuration/include/configuration_impl.hpp b/implementation/configuration/include/configuration_impl.hpp
index 265d5d9..93f7ebf 100644
--- a/implementation/configuration/include/configuration_impl.hpp
+++ b/implementation/configuration/include/configuration_impl.hpp
@@ -22,6 +22,7 @@
#include "policy.hpp"
#include "../../e2e_protection/include/e2exf/config.hpp"
#include "e2e.hpp"
+#include "debounce.hpp"
namespace vsomeip {
namespace cfg {
@@ -161,6 +162,15 @@ public:
VSOMEIP_EXPORT bool log_status() const;
VSOMEIP_EXPORT uint32_t get_log_status_interval() const;
+ VSOMEIP_EXPORT ttl_map_t get_ttl_factor_offers() const;
+ VSOMEIP_EXPORT ttl_map_t get_ttl_factor_subscribes() const;
+
+ VSOMEIP_EXPORT std::shared_ptr<debounce> get_debounce(
+ service_t _service, instance_t _instance, event_t _event) const;
+
+ VSOMEIP_EXPORT endpoint_queue_limit_t get_endpoint_queue_limit(
+ const std::string& _address, std::uint16_t _port) const;
+ VSOMEIP_EXPORT endpoint_queue_limit_t get_endpoint_queue_limit_local() const;
private:
void read_data(const std::set<std::string> &_input,
std::vector<element> &_elements,
@@ -221,6 +231,15 @@ private:
void load_policies(const element &_element);
void load_policy(const boost::property_tree::ptree &_tree);
+ void load_debounce(const element &_element);
+ void load_service_debounce(const boost::property_tree::ptree &_tree);
+ void load_events_debounce(const boost::property_tree::ptree &_tree,
+ std::map<event_t, std::shared_ptr<debounce>> &_debounces);
+ void load_event_debounce(const boost::property_tree::ptree &_tree,
+ std::map<event_t, std::shared_ptr<debounce>> &_debounces);
+ void load_event_debounce_ignore(const boost::property_tree::ptree &_tree,
+ std::map<std::size_t, byte_t> &_ignore);
+
servicegroup *find_servicegroup(const std::string &_name) const;
std::shared_ptr<client> find_client(service_t _service,
instance_t _instance) const;
@@ -243,6 +262,11 @@ private:
void load_e2e(const element &_element);
void load_e2e_protected(const boost::property_tree::ptree &_tree);
+ void load_ttl_factors(const boost::property_tree::ptree &_tree,
+ ttl_map_t* _target);
+
+ void load_endpoint_queue_sizes(const element &_element);
+
private:
std::mutex mutex_;
@@ -333,7 +357,12 @@ protected:
ET_TRACING_ENABLE,
ET_TRACING_SD_ENABLE,
ET_SERVICE_DISCOVERY_OFFER_DEBOUNCE_TIME,
- ET_MAX = 25
+ ET_SERVICE_DISCOVERY_TTL_FACTOR_OFFERS,
+ ET_SERVICE_DISCOVERY_TTL_FACTOR_SUBSCRIPTIONS,
+ ET_ENDPOINT_QUEUE_LIMITS,
+ ET_ENDPOINT_QUEUE_LIMIT_EXTERNAL,
+ ET_ENDPOINT_QUEUE_LIMIT_LOCAL,
+ ET_MAX = 30
};
bool is_configured_[ET_MAX];
@@ -355,6 +384,15 @@ protected:
bool log_status_;
uint32_t log_status_interval_;
+
+ ttl_map_t ttl_factors_offers_;
+ ttl_map_t ttl_factors_subscriptions_;
+
+ std::map<service_t, std::map<instance_t, std::map<event_t, std::shared_ptr<debounce>>>> debounces_;
+
+ std::map<std::string, std::map<std::uint16_t, endpoint_queue_limit_t>> endpoint_queue_limits_;
+ endpoint_queue_limit_t endpoint_queue_limit_external_;
+ endpoint_queue_limit_t endpoint_queue_limit_local_;
};
} // namespace cfg
diff --git a/implementation/configuration/include/debounce.hpp b/implementation/configuration/include/debounce.hpp
new file mode 100644
index 0000000..b4ce6d3
--- /dev/null
+++ b/implementation/configuration/include/debounce.hpp
@@ -0,0 +1,39 @@
+// Copyright (C) 2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef VSOMEIP_CFG_DEBOUNCE_HPP
+#define VSOMEIP_CFG_DEBOUNCE_HPP
+
+#include <map>
+
+namespace vsomeip {
+namespace cfg {
+
+// Messages are forwarded either because their value differs from the
+// last received message (on_change=true) or because the specified time
+// (interval_) between two messages has elapsed. A message that is forwarded
+// because of a changed value may reset the time until the next unchanged
+// message is forwarded or not (on_change_resets_interval). By specifiying
+// indexes and bit masks, the comparison that is carried out to decide whether
+// or not two message values differ is configurable (ignore_).
+struct debounce {
+ debounce() : on_change_(false),
+ on_change_resets_interval_(false),
+ interval_(0),
+ last_forwarded_((std::chrono::steady_clock::time_point::max)()) {
+ }
+
+ bool on_change_;
+ bool on_change_resets_interval_;
+ std::map<std::size_t, byte_t> ignore_;
+
+ long interval_;
+ std::chrono::steady_clock::time_point last_forwarded_;
+};
+
+} // namespace cfg
+} // namespace vsomeip
+
+#endif // VSOMEIP_CFG_DEBOUNCE_HPP
diff --git a/implementation/configuration/include/internal.hpp.in b/implementation/configuration/include/internal.hpp.in
index 3a987f9..6012092 100644
--- a/implementation/configuration/include/internal.hpp.in
+++ b/implementation/configuration/include/internal.hpp.in
@@ -176,13 +176,10 @@ struct configuration_data_t {
unsigned short routing_manager_host_;
};
-#ifndef _WIN32
-constexpr std::uint32_t MESSAGE_SIZE_UNLIMITED = std::numeric_limits<std::uint32_t>::max();
-#elif _MSC_VER >= 1900
-constexpr std::uint32_t MESSAGE_SIZE_UNLIMITED = (std::numeric_limits<std::uint32_t>::max)();
-#else
const std::uint32_t MESSAGE_SIZE_UNLIMITED = (std::numeric_limits<std::uint32_t>::max)();
-#endif
+
+const std::uint32_t QUEUE_SIZE_UNLIMITED = (std::numeric_limits<std::uint32_t>::max)();
+
} // namespace vsomeip
diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp
index 43e3c67..0920b66 100644
--- a/implementation/configuration/src/configuration_impl.cpp
+++ b/implementation/configuration/src/configuration_impl.cpp
@@ -77,7 +77,9 @@ configuration_impl::configuration_impl()
log_memory_(false),
log_memory_interval_(0),
log_status_(false),
- log_status_interval_(0) {
+ log_status_interval_(0),
+ endpoint_queue_limit_external_(QUEUE_SIZE_UNLIMITED),
+ endpoint_queue_limit_local_(QUEUE_SIZE_UNLIMITED) {
unicast_ = unicast_.from_string(VSOMEIP_UNICAST_ADDRESS);
for (auto i = 0; i < ET_MAX; i++)
is_configured_[i] = false;
@@ -94,7 +96,9 @@ configuration_impl::configuration_impl(const configuration_impl &_other)
max_reliable_message_size_(_other.max_reliable_message_size_),
buffer_shrink_threshold_(_other.buffer_shrink_threshold_),
permissions_shm_(VSOMEIP_DEFAULT_SHM_PERMISSION),
- umask_(VSOMEIP_DEFAULT_UMASK_LOCAL_ENDPOINTS) {
+ umask_(VSOMEIP_DEFAULT_UMASK_LOCAL_ENDPOINTS),
+ endpoint_queue_limit_external_(_other.endpoint_queue_limit_external_),
+ endpoint_queue_limit_local_(_other.endpoint_queue_limit_local_) {
applications_.insert(_other.applications_.begin(), _other.applications_.end());
services_.insert(_other.services_.begin(), _other.services_.end());
@@ -144,6 +148,8 @@ configuration_impl::configuration_impl(const configuration_impl &_other)
log_memory_interval_ = _other.log_memory_interval_;
log_status_ = _other.log_status_;
log_status_interval_ = _other.log_status_interval_;
+
+ debounces_ = _other.debounces_;
}
configuration_impl::~configuration_impl() {
@@ -332,6 +338,7 @@ bool configuration_impl::load_data(const std::vector<element> &_elements,
load_network(e);
load_diagnosis_address(e);
load_payload_sizes(e);
+ load_endpoint_queue_sizes(e);
load_permissions(e);
load_policies(e);
load_tracing(e);
@@ -348,6 +355,7 @@ bool configuration_impl::load_data(const std::vector<element> &_elements,
load_watchdog(e);
load_selective_broadcasts_support(e);
load_e2e(e);
+ load_debounce(e);
}
}
@@ -892,6 +900,22 @@ void configuration_impl::load_service_discovery(
its_converter >> sd_offer_debounce_time_;
is_configured_[ET_SERVICE_DISCOVERY_OFFER_DEBOUNCE_TIME] = true;
}
+ } else if (its_key == "ttl_factor_offers") {
+ if (is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_OFFERS]) {
+ VSOMEIP_WARNING << "Multiple definitions for service_discovery.ttl_factor_offers."
+ " Ignoring definition from " << _element.name_;
+ } else {
+ load_ttl_factors(i->second, &ttl_factors_offers_);
+ is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_OFFERS] = true;
+ }
+ } else if (its_key == "ttl_factor_subscriptions") {
+ if (is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_SUBSCRIPTIONS]) {
+ VSOMEIP_WARNING << "Multiple definitions for service_discovery.ttl_factor_subscriptions."
+ " Ignoring definition from " << _element.name_;
+ } else {
+ load_ttl_factors(i->second, &ttl_factors_subscriptions_);
+ is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_SUBSCRIPTIONS] = true;
+ }
}
}
} catch (...) {
@@ -2636,5 +2660,341 @@ uint32_t configuration_impl::get_log_status_interval() const {
return log_status_interval_;
}
+void configuration_impl::load_ttl_factors(
+ const boost::property_tree::ptree &_tree, ttl_map_t* _target) {
+ const service_t ILLEGAL_VALUE(0xffff);
+ for (const auto& i : _tree) {
+ service_t its_service(ILLEGAL_VALUE);
+ instance_t its_instance(ILLEGAL_VALUE);
+ configuration::ttl_factor_t its_ttl_factor(0);
+
+ for (const auto& j : i.second) {
+ std::string its_key(j.first);
+ std::string its_value(j.second.data());
+ std::stringstream its_converter;
+
+ if (its_key == "ttl_factor") {
+ its_converter << its_value;
+ its_converter >> its_ttl_factor;
+ } else {
+ // Trim "its_value"
+ if (its_value.size() > 1 && its_value[0] == '0' && its_value[1] == 'x') {
+ its_converter << std::hex << its_value;
+ } else {
+ its_converter << std::dec << its_value;
+ }
+
+ if (its_key == "service") {
+ its_converter >> its_service;
+ } else if (its_key == "instance") {
+ its_converter >> its_instance;
+ }
+ }
+ }
+ if (its_service != ILLEGAL_VALUE
+ && its_instance != ILLEGAL_VALUE
+ && its_ttl_factor > 0) {
+ (*_target)[its_service][its_instance] = its_ttl_factor;
+ } else {
+ VSOMEIP_ERROR << "Invalid ttl factor configuration";
+ }
+ }
+}
+
+configuration::ttl_map_t configuration_impl::get_ttl_factor_offers() const {
+ return ttl_factors_offers_;
+}
+
+configuration::ttl_map_t configuration_impl::get_ttl_factor_subscribes() const {
+ return ttl_factors_subscriptions_;
+}
+
+configuration::endpoint_queue_limit_t
+configuration_impl::get_endpoint_queue_limit(
+ const std::string& _address, std::uint16_t _port) const {
+ auto found_address = endpoint_queue_limits_.find(_address);
+ if (found_address != endpoint_queue_limits_.end()) {
+ auto found_port = found_address->second.find(_port);
+ if (found_port != found_address->second.end()) {
+ return found_port->second;
+ }
+ }
+ return endpoint_queue_limit_external_;
+}
+
+configuration::endpoint_queue_limit_t
+configuration_impl::get_endpoint_queue_limit_local() const {
+ return endpoint_queue_limit_local_;
+}
+
+void configuration_impl::load_endpoint_queue_sizes(const element &_element) {
+ const std::string endpoint_queue_limits("endpoint-queue-limits");
+ const std::string endpoint_queue_limit_external("endpoint-queue-limit-external");
+ const std::string endpoint_queue_limit_local("endpoint-queue-limit-local");
+
+ try {
+ if (_element.tree_.get_child_optional(endpoint_queue_limit_external)) {
+ if (is_configured_[ET_ENDPOINT_QUEUE_LIMIT_EXTERNAL]) {
+ VSOMEIP_WARNING << "Multiple definitions for "
+ << endpoint_queue_limit_external
+ << " Ignoring definition from " << _element.name_;
+ } else {
+ is_configured_[ET_ENDPOINT_QUEUE_LIMIT_EXTERNAL] = true;
+ auto mpsl = _element.tree_.get_child(
+ endpoint_queue_limit_external);
+ std::string s(mpsl.data());
+ try {
+ endpoint_queue_limit_external_ =
+ static_cast<configuration::endpoint_queue_limit_t>(std::stoul(
+ s.c_str(), NULL, 10));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR<<__func__ << ": " << endpoint_queue_limit_external
+ << " " << e.what();
+ }
+ }
+ }
+ if (_element.tree_.get_child_optional(endpoint_queue_limit_local)) {
+ if (is_configured_[ET_ENDPOINT_QUEUE_LIMIT_LOCAL]) {
+ VSOMEIP_WARNING << "Multiple definitions for "
+ << endpoint_queue_limit_local
+ << " Ignoring definition from " << _element.name_;
+ } else {
+ is_configured_[ET_ENDPOINT_QUEUE_LIMIT_LOCAL] = true;
+ auto mpsl = _element.tree_.get_child(endpoint_queue_limit_local);
+ std::string s(mpsl.data());
+ try {
+ endpoint_queue_limit_local_=
+ static_cast<configuration::endpoint_queue_limit_t>(
+ std::stoul(s.c_str(), NULL, 10));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR<< __func__ << ": "<< endpoint_queue_limit_local
+ << " " << e.what();
+ }
+ }
+ }
+
+ if (_element.tree_.get_child_optional(endpoint_queue_limits)) {
+ if (is_configured_[ET_ENDPOINT_QUEUE_LIMITS]) {
+ VSOMEIP_WARNING << "Multiple definitions for "
+ << endpoint_queue_limits
+ << " Ignoring definition from " << _element.name_;
+ } else {
+ is_configured_[ET_ENDPOINT_QUEUE_LIMITS] = true;
+ const std::string unicast("unicast");
+ const std::string ports("ports");
+ const std::string port("port");
+ const std::string queue_size_limit("queue-size-limit");
+
+ for (const auto i : _element.tree_.get_child(endpoint_queue_limits)) {
+ if (!i.second.get_child_optional(unicast)
+ || !i.second.get_child_optional(ports)) {
+ continue;
+ }
+ std::string its_unicast(i.second.get_child(unicast).data());
+ for (const auto j : i.second.get_child(ports)) {
+
+ if (!j.second.get_child_optional(port)
+ || !j.second.get_child_optional(queue_size_limit)) {
+ continue;
+ }
+
+ std::uint16_t its_port = ILLEGAL_PORT;
+ std::uint32_t its_queue_size_limit = 0;
+
+ try {
+ std::string p(j.second.get_child(port).data());
+ its_port = static_cast<std::uint16_t>(std::stoul(p.c_str(),
+ NULL, 10));
+ std::string s(j.second.get_child(queue_size_limit).data());
+ its_queue_size_limit = static_cast<std::uint32_t>(std::stoul(
+ s.c_str(), NULL, 10));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << __func__ << ":" << e.what();
+ }
+
+ if (its_port == ILLEGAL_PORT || its_queue_size_limit == 0) {
+ continue;
+ }
+
+ endpoint_queue_limits_[its_unicast][its_port] = its_queue_size_limit;
+ }
+ }
+ }
+ }
+ } catch (...) {
+ }
+}
+
+void configuration_impl::load_debounce(const element &_element) {
+ try {
+ auto its_debounce = _element.tree_.get_child("debounce");
+ for (auto i = its_debounce.begin(); i != its_debounce.end(); ++i) {
+ load_service_debounce(i->second);
+ }
+ } catch (...) {
+ }
+}
+
+void configuration_impl::load_service_debounce(
+ const boost::property_tree::ptree &_tree) {
+ service_t its_service(0);
+ instance_t its_instance(0);
+ std::map<event_t, std::shared_ptr<debounce>> its_debounces;
+
+ for (auto i = _tree.begin(); i != _tree.end(); ++i) {
+ std::string its_key(i->first);
+ std::string its_value(i->second.data());
+ std::stringstream its_converter;
+
+ if (its_key == "service") {
+ if (its_value.size() > 1 && its_value[0] == '0' && its_value[1] == 'x') {
+ its_converter << std::hex << its_value;
+ } else {
+ its_converter << std::dec << its_value;
+ }
+ its_converter >> its_service;
+ } else if (its_key == "instance") {
+ if (its_value.size() > 1 && its_value[0] == '0' && its_value[1] == 'x') {
+ its_converter << std::hex << its_value;
+ } else {
+ its_converter << std::dec << its_value;
+ }
+ its_converter >> its_instance;
+ } else if (its_key == "events") {
+ load_events_debounce(i->second, its_debounces);
+ }
+ }
+
+ // TODO: Improve error handling!
+ if (its_service > 0 && its_instance > 0 && !its_debounces.empty()) {
+ auto find_service = debounces_.find(its_service);
+ if (find_service != debounces_.end()) {
+ auto find_instance = find_service->second.find(its_instance);
+ if (find_instance != find_service->second.end()) {
+ VSOMEIP_ERROR << "Multiple debounce configurations for service "
+ << std::hex << std::setw(4) << std::setfill('0') << its_service
+ << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance;
+ return;
+ }
+ }
+ debounces_[its_service][its_instance] = its_debounces;
+ }
+}
+
+void configuration_impl::load_events_debounce(
+ const boost::property_tree::ptree &_tree,
+ std::map<event_t, std::shared_ptr<debounce>> &_debounces) {
+ for (auto i = _tree.begin(); i != _tree.end(); ++i) {
+ load_event_debounce(i->second, _debounces);
+ }
+}
+
+void configuration_impl::load_event_debounce(
+ const boost::property_tree::ptree &_tree,
+ std::map<event_t, std::shared_ptr<debounce>> &_debounces) {
+ event_t its_event(0);
+ std::shared_ptr<debounce> its_debounce = std::make_shared<debounce>();
+
+ for (auto i = _tree.begin(); i != _tree.end(); ++i) {
+ std::string its_key(i->first);
+ std::string its_value(i->second.data());
+ std::stringstream its_converter;
+
+ if (its_key == "event") {
+ if (its_value.size() > 1 && its_value[0] == '0' && its_value[1] == 'x') {
+ its_converter << std::hex << its_value;
+ } else {
+ its_converter << std::dec << its_value;
+ }
+ its_converter >> its_event;
+ } else if (its_key == "on_change") {
+ its_debounce->on_change_ = (its_value == "true");
+ } else if (its_key == "on_change_resets_interval") {
+ its_debounce->on_change_resets_interval_ = (its_value == "true");
+ } else if (its_key == "ignore") {
+ load_event_debounce_ignore(i->second, its_debounce->ignore_);
+ } else if (its_key == "interval") {
+ if (its_value == "never") {
+ its_debounce->interval_ = -1;
+ } else {
+ its_converter << std::dec << its_value;
+ its_converter >> its_debounce->interval_;
+ }
+ }
+ }
+
+ // TODO: Improve error handling
+ if (its_event > 0) {
+ auto find_event = _debounces.find(its_event);
+ if (find_event == _debounces.end()) {
+ _debounces[its_event] = its_debounce;
+ }
+ }
+}
+
+void configuration_impl::load_event_debounce_ignore(
+ const boost::property_tree::ptree &_tree,
+ std::map<std::size_t, byte_t> &_ignore) {
+ std::size_t its_ignored;
+ byte_t its_mask;
+ std::stringstream its_converter;
+
+ for (auto i = _tree.begin(); i != _tree.end(); ++i) {
+ std::string its_value = i->second.data();
+
+ its_mask = 0xff;
+
+ if (!its_value.empty()
+ && std::find_if(its_value.begin(), its_value.end(),
+ [](char _c) { return !std::isdigit(_c); })
+ == its_value.end()) {
+ its_converter.str("");
+ its_converter.clear();
+ its_converter << std::dec << its_value;
+ its_converter >> its_ignored;
+
+ } else {
+ for (auto j = i->second.begin(); j != i->second.end(); ++j) {
+ std::string its_ignore_key(j->first);
+ std::string its_ignore_value(j->second.data());
+
+ if (its_ignore_key == "index") {
+ its_converter.str("");
+ its_converter.clear();
+ its_converter << std::dec << its_ignore_value;
+ its_converter >> its_ignored;
+ } else if (its_ignore_key == "mask") {
+ its_converter.str("");
+ its_converter.clear();
+
+ int its_tmp_mask;
+ its_converter << std::hex << its_ignore_value;
+ its_converter >> its_tmp_mask;
+
+ its_mask = static_cast<byte_t>(its_tmp_mask);
+ }
+ }
+ }
+
+ _ignore[its_ignored] = its_mask;
+ }
+}
+
+std::shared_ptr<debounce> configuration_impl::get_debounce(
+ service_t _service, instance_t _instance, event_t _event) const {
+ auto found_service = debounces_.find(_service);
+ if (found_service != debounces_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ auto found_event = found_instance->second.find(_event);
+ if (found_event != found_instance->second.end()) {
+ return found_event->second;
+ }
+ }
+ }
+ return nullptr;
+}
+
} // namespace config
} // namespace vsomeip
diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp
index b1bccfa..1981bc9 100644
--- a/implementation/endpoints/include/client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/client_endpoint_impl.hpp
@@ -37,7 +37,8 @@ public:
client_endpoint_impl(std::shared_ptr<endpoint_host> _host,
endpoint_type _local, endpoint_type _remote,
boost::asio::io_service &_io,
- std::uint32_t _max_message_size);
+ std::uint32_t _max_message_size,
+ configuration::endpoint_queue_limit_t _queue_limit);
virtual ~client_endpoint_impl();
bool send(const uint8_t *_data, uint32_t _size, bool _flush);
@@ -70,8 +71,8 @@ public:
protected:
virtual void send_queued() = 0;
- void shutdown_and_close_socket();
- void shutdown_and_close_socket_unlocked();
+ void shutdown_and_close_socket(bool _recreate_socket);
+ void shutdown_and_close_socket_unlocked(bool _recreate_socket);
void start_connect_timer();
mutable std::mutex socket_mutex_;
@@ -88,6 +89,7 @@ protected:
// send data
message_buffer_ptr_t packetizer_;
std::deque<message_buffer_ptr_t> queue_;
+ std::size_t queue_size_;
std::mutex mutex_;
diff --git a/implementation/endpoints/include/endpoint_impl.hpp b/implementation/endpoints/include/endpoint_impl.hpp
index cf98ed1..49b9f7f 100644
--- a/implementation/endpoints/include/endpoint_impl.hpp
+++ b/implementation/endpoints/include/endpoint_impl.hpp
@@ -16,6 +16,7 @@
#include "buffer.hpp"
#include "endpoint.hpp"
+#include "../../configuration/include/configuration.hpp"
namespace vsomeip {
@@ -29,7 +30,8 @@ public:
endpoint_impl(std::shared_ptr<endpoint_host> _adapter,
endpoint_type _local,
boost::asio::io_service &_io,
- std::uint32_t _max_message_size);
+ std::uint32_t _max_message_size,
+ configuration::endpoint_queue_limit_t _queue_limit);
virtual ~endpoint_impl();
void enable_magic_cookies();
@@ -87,6 +89,8 @@ protected:
error_handler_t error_handler_;
std::mutex error_handler_mutex_;
+
+ const configuration::endpoint_queue_limit_t queue_limit_;
};
} // namespace vsomeip
diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp
index dc56f23..d6ee995 100644
--- a/implementation/endpoints/include/local_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp
@@ -34,7 +34,8 @@ public:
local_client_endpoint_impl(std::shared_ptr<endpoint_host> _host,
endpoint_type _remote,
boost::asio::io_service &_io,
- std::uint32_t _max_message_size);
+ std::uint32_t _max_message_size,
+ configuration::endpoint_queue_limit_t _queue_limit);
virtual ~local_client_endpoint_impl();
diff --git a/implementation/endpoints/include/local_server_endpoint_impl.hpp b/implementation/endpoints/include/local_server_endpoint_impl.hpp
index 458d59f..d54cf69 100644
--- a/implementation/endpoints/include/local_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/local_server_endpoint_impl.hpp
@@ -42,14 +42,16 @@ public:
endpoint_type _local,
boost::asio::io_service &_io,
std::uint32_t _max_message_size,
- std::uint32_t _buffer_shrink_threshold);
+ std::uint32_t _buffer_shrink_threshold,
+ configuration::endpoint_queue_limit_t _queue_limit);
local_server_endpoint_impl(std::shared_ptr<endpoint_host> _host,
endpoint_type _local,
boost::asio::io_service &_io,
std::uint32_t _max_message_size,
int native_socket,
- std::uint32_t _buffer_shrink_threshold);
+ std::uint32_t _buffer_shrink_threshold,
+ configuration::endpoint_queue_limit_t _queue_limit);
virtual ~local_server_endpoint_impl();
diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp
index 715c9b8..ae89508 100644
--- a/implementation/endpoints/include/server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/server_endpoint_impl.hpp
@@ -27,12 +27,13 @@ class server_endpoint_impl: public endpoint_impl<Protocol>,
public:
typedef typename Protocol::socket socket_type;
typedef typename Protocol::endpoint endpoint_type;
- typedef typename std::map<endpoint_type, std::deque<message_buffer_ptr_t> > queue_type;
+ typedef typename std::map<endpoint_type, std::pair<size_t, std::deque<message_buffer_ptr_t>>> queue_type;
typedef typename queue_type::iterator queue_iterator_type;
server_endpoint_impl(std::shared_ptr<endpoint_host> _host,
endpoint_type _local, boost::asio::io_service &_io,
- std::uint32_t _max_message_size);
+ std::uint32_t _max_message_size,
+ configuration::endpoint_queue_limit_t _queue_limit);
virtual ~server_endpoint_impl();
bool is_client() const;
diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
index 801c98c..1c77664 100644
--- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
@@ -26,7 +26,8 @@ public:
boost::asio::io_service &_io,
std::uint32_t _max_message_size,
std::uint32_t buffer_shrink_threshold,
- std::chrono::milliseconds _send_timeout);
+ std::chrono::milliseconds _send_timeout,
+ configuration::endpoint_queue_limit_t _queue_limit);
virtual ~tcp_client_endpoint_impl();
void start();
diff --git a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp
index ab6fdb9..4318c1b 100644
--- a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp
@@ -31,7 +31,8 @@ public:
boost::asio::io_service &_io,
std::uint32_t _max_message_size,
std::uint32_t _buffer_shrink_threshold,
- std::chrono::milliseconds _send_timeout);
+ std::chrono::milliseconds _send_timeout,
+ configuration::endpoint_queue_limit_t _queue_limit);
virtual ~tcp_server_endpoint_impl();
void start();
@@ -88,7 +89,7 @@ private:
bool _magic_cookies_enabled,
boost::asio::io_service & _io_service,
std::chrono::milliseconds _send_timeout);
- void send_magic_cookie(message_buffer_ptr_t &_buffer);
+ bool send_magic_cookie(message_buffer_ptr_t &_buffer);
bool is_magic_cookie(size_t _offset) const;
void receive_cbk(boost::system::error_code const &_error,
std::size_t _bytes);
diff --git a/implementation/endpoints/include/udp_client_endpoint_impl.hpp b/implementation/endpoints/include/udp_client_endpoint_impl.hpp
index b83e2f1..a1d8761 100644
--- a/implementation/endpoints/include/udp_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/udp_client_endpoint_impl.hpp
@@ -29,7 +29,8 @@ public:
udp_client_endpoint_impl(std::shared_ptr<endpoint_host> _host,
endpoint_type _local,
endpoint_type _remote,
- boost::asio::io_service &_io);
+ boost::asio::io_service &_io,
+ configuration::endpoint_queue_limit_t _queue_limit);
virtual ~udp_client_endpoint_impl();
void start();
diff --git a/implementation/endpoints/include/udp_server_endpoint_impl.hpp b/implementation/endpoints/include/udp_server_endpoint_impl.hpp
index bed7c0c..6dcb537 100644
--- a/implementation/endpoints/include/udp_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/udp_server_endpoint_impl.hpp
@@ -25,7 +25,8 @@ class udp_server_endpoint_impl: public udp_server_endpoint_base_impl {
public:
udp_server_endpoint_impl(std::shared_ptr<endpoint_host> _host,
endpoint_type _local,
- boost::asio::io_service &_io);
+ boost::asio::io_service &_io,
+ configuration::endpoint_queue_limit_t _queue_limit);
virtual ~udp_server_endpoint_impl();
void start();
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 3783add..2fb9fa9 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -21,6 +21,7 @@
#include "../../configuration/include/internal.hpp"
#include "../../logging/include/logger.hpp"
#include "../../utility/include/utility.hpp"
+#include "../../utility/include/byteorder.hpp"
namespace vsomeip {
@@ -30,13 +31,15 @@ client_endpoint_impl<Protocol>::client_endpoint_impl(
endpoint_type _local,
endpoint_type _remote,
boost::asio::io_service &_io,
- std::uint32_t _max_message_size)
- : endpoint_impl<Protocol>(_host, _local, _io, _max_message_size),
+ std::uint32_t _max_message_size,
+ configuration::endpoint_queue_limit_t _queue_limit)
+ : endpoint_impl<Protocol>(_host, _local, _io, _max_message_size, _queue_limit),
socket_(new socket_type(_io)), remote_(_remote),
flush_timer_(_io), connect_timer_(_io),
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
is_connected_(false),
packetizer_(std::make_shared<message_buffer_t>()),
+ queue_size_(0),
was_not_connected_(false),
local_port_(0) {
}
@@ -62,6 +65,7 @@ void client_endpoint_impl<Protocol>::stop() {
endpoint_impl<Protocol>::sending_blocked_ = true;
// delete unsent messages
queue_.clear();
+ queue_size_ = 0;
}
{
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
@@ -69,7 +73,7 @@ void client_endpoint_impl<Protocol>::stop() {
connect_timer_.cancel(ec);
}
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
- shutdown_and_close_socket();
+ shutdown_and_close_socket(false);
}
template<typename Protocol>
@@ -118,14 +122,50 @@ bool client_endpoint_impl<Protocol>::send(const uint8_t *_data,
if (packetizer_->size() + _size > endpoint_impl<Protocol>::max_message_size_
&& !packetizer_->empty()) {
queue_.push_back(packetizer_);
+ queue_size_ += packetizer_->size();
packetizer_ = std::make_shared<message_buffer_t>();
}
+ if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
+ && queue_size_ + _size > endpoint_impl<Protocol>::queue_limit_) {
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_size >= VSOMEIP_SESSION_POS_MAX) {
+ // this will yield wrong IDs for local communication as the commands
+ // are prepended to the actual payload
+ // it will print:
+ // (lowbyte service ID + highbyte methoid)
+ // [(Command + lowerbyte sender's client ID).
+ // highbyte sender's client ID + lowbyte command size.
+ // lowbyte methodid + highbyte vsomeipd length]
+ its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
+ _data[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
+ _data[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
+ _data[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
+ _data[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_ERROR << "cei::send: queue size limit (" << std::dec
+ << endpoint_impl<Protocol>::queue_limit_
+ << ") reached. Dropping message ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "] "
+ << "queue_size: " << std::dec << queue_size_
+ << " data size: " << std::dec << _size;
+ return false;
+ }
packetizer_->insert(packetizer_->end(), _data, _data + _size);
if (_flush) {
flush_timer_.cancel();
queue_.push_back(packetizer_);
+ queue_size_ += packetizer_->size();
packetizer_ = std::make_shared<message_buffer_t>();
} else {
flush_timer_.expires_from_now(
@@ -152,6 +192,7 @@ bool client_endpoint_impl<Protocol>::flush() {
std::lock_guard<std::mutex> its_lock(mutex_);
if (!packetizer_->empty()) {
queue_.push_back(packetizer_);
+ queue_size_ += packetizer_->size();
packetizer_ = std::make_shared<message_buffer_t>();
if (queue_.size() == 1) { // no writing in progress
send_queued();
@@ -168,13 +209,13 @@ void client_endpoint_impl<Protocol>::connect_cbk(
boost::system::error_code const &_error) {
if (_error == boost::asio::error::operation_aborted) {
// endpoint was stopped
- shutdown_and_close_socket();
+ shutdown_and_close_socket(false);
return;
}
std::shared_ptr<endpoint_host> its_host = this->host_.lock();
if (its_host) {
if (_error && _error != boost::asio::error::already_connected) {
- shutdown_and_close_socket();
+ shutdown_and_close_socket(true);
start_connect_timer();
// Double the timeout as long as the maximum allowed is larger
if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
@@ -224,33 +265,42 @@ void client_endpoint_impl<Protocol>::send_cbk(
if (!_error) {
std::lock_guard<std::mutex> its_lock(mutex_);
if (queue_.size() > 0) {
+ queue_size_ -= queue_.front()->size();
queue_.pop_front();
send_queued();
}
} else if (_error == boost::asio::error::broken_pipe) {
is_connected_ = false;
+ bool stopping(false);
{
std::lock_guard<std::mutex> its_lock(mutex_);
- if (endpoint_impl<Protocol>::sending_blocked_) {
+ stopping = endpoint_impl<Protocol>::sending_blocked_;
+ if (stopping) {
queue_.clear();
+ queue_size_ = 0;
} else {
VSOMEIP_WARNING << "cei::send_cbk received error: "
<< _error.message() << " (" << std::dec
<< _error.value() << ") " << std::dec << queue_.size();
}
}
- shutdown_and_close_socket();
+ if (!stopping) {
+ print_status();
+ }
+ shutdown_and_close_socket(true);
connect();
} else if (_error == boost::asio::error::not_connected
|| _error == boost::asio::error::bad_descriptor) {
was_not_connected_ = true;
+ shutdown_and_close_socket(true);
connect();
} else if (_error == boost::asio::error::operation_aborted) {
// endpoint was stopped
- shutdown_and_close_socket();
+ shutdown_and_close_socket(false);
} else {
VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
<< " (" << std::dec << _error.value() << ")" ;
+ print_status();
}
}
@@ -263,19 +313,22 @@ void client_endpoint_impl<Protocol>::flush_cbk(
}
template<typename Protocol>
-void client_endpoint_impl<Protocol>::shutdown_and_close_socket() {
+void client_endpoint_impl<Protocol>::shutdown_and_close_socket(bool _recreate_socket) {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- shutdown_and_close_socket_unlocked();
+ shutdown_and_close_socket_unlocked(_recreate_socket);
}
template<typename Protocol>
-void client_endpoint_impl<Protocol>::shutdown_and_close_socket_unlocked() {
+void client_endpoint_impl<Protocol>::shutdown_and_close_socket_unlocked(bool _recreate_socket) {
local_port_ = 0;
if (socket_->is_open()) {
boost::system::error_code its_error;
socket_->shutdown(Protocol::socket::shutdown_both, its_error);
socket_->close(its_error);
}
+ if (_recreate_socket) {
+ socket_.reset(new socket_type(endpoint_impl<Protocol>::service_));
+ }
}
template<typename Protocol>
diff --git a/implementation/endpoints/src/endpoint_impl.cpp b/implementation/endpoints/src/endpoint_impl.cpp
index cf35a39..9cde3e9 100644
--- a/implementation/endpoints/src/endpoint_impl.cpp
+++ b/implementation/endpoints/src/endpoint_impl.cpp
@@ -22,7 +22,8 @@ endpoint_impl<Protocol>::endpoint_impl(
std::shared_ptr<endpoint_host> _host,
endpoint_type _local,
boost::asio::io_service &_io,
- std::uint32_t _max_message_size)
+ std::uint32_t _max_message_size,
+ configuration::endpoint_queue_limit_t _queue_limit)
: service_(_io),
host_(_host),
is_supporting_magic_cookies_(false),
@@ -30,7 +31,8 @@ endpoint_impl<Protocol>::endpoint_impl(
max_message_size_(_max_message_size),
use_count_(0),
sending_blocked_(false),
- local_(_local) {
+ local_(_local),
+ queue_limit_(_queue_limit) {
}
template<typename Protocol>
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp
index 64b6314..cc444a9 100644
--- a/implementation/endpoints/src/local_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp
@@ -27,8 +27,10 @@ local_client_endpoint_impl::local_client_endpoint_impl(
std::shared_ptr< endpoint_host > _host,
endpoint_type _remote,
boost::asio::io_service &_io,
- std::uint32_t _max_message_size)
- : local_client_endpoint_base_impl(_host, _remote, _remote, _io, _max_message_size),
+ std::uint32_t _max_message_size,
+ configuration::endpoint_queue_limit_t _queue_limit)
+ : local_client_endpoint_base_impl(_host, _remote, _remote, _io,
+ _max_message_size, _queue_limit),
// Using _remote for the local(!) endpoint is ok,
// because we have no bind for local endpoints!
recv_buffer_(1,0) {
@@ -49,11 +51,11 @@ void local_client_endpoint_impl::restart() {
std::lock_guard<std::mutex> its_lock(mutex_);
sending_blocked_ = false;
queue_.clear();
+ queue_size_ = 0;
}
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- shutdown_and_close_socket_unlocked();
- socket_.reset(new socket_type(service_));
+ shutdown_and_close_socket_unlocked(true);
}
start_connect_timer();
}
@@ -95,7 +97,7 @@ void local_client_endpoint_impl::stop() {
}
}
}
- shutdown_and_close_socket();
+ shutdown_and_close_socket(false);
}
void local_client_endpoint_impl::connect() {
@@ -266,9 +268,7 @@ void local_client_endpoint_impl::print_status() {
{
std::lock_guard<std::mutex> its_lock(mutex_);
its_queue_size = queue_.size();
- for (const auto &m : queue_) {
- its_data_size += m->size();
- }
+ its_data_size = queue_size_;
}
VSOMEIP_INFO << "status lce: " << its_path << " queue: "
diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp
index cf6d8eb..c558661 100644
--- a/implementation/endpoints/src/local_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp
@@ -29,8 +29,10 @@ local_server_endpoint_impl::local_server_endpoint_impl(
std::shared_ptr< endpoint_host > _host,
endpoint_type _local, boost::asio::io_service &_io,
std::uint32_t _max_message_size,
- std::uint32_t _buffer_shrink_threshold)
- : local_server_endpoint_base_impl(_host, _local, _io, _max_message_size),
+ std::uint32_t _buffer_shrink_threshold,
+ configuration::endpoint_queue_limit_t _queue_limit)
+ : local_server_endpoint_base_impl(_host, _local, _io,
+ _max_message_size, _queue_limit),
acceptor_(_io),
buffer_shrink_threshold_(_buffer_shrink_threshold) {
is_supporting_magic_cookies_ = false;
@@ -57,8 +59,10 @@ local_server_endpoint_impl::local_server_endpoint_impl(
endpoint_type _local, boost::asio::io_service &_io,
std::uint32_t _max_message_size,
int native_socket,
- std::uint32_t _buffer_shrink_threshold)
- : local_server_endpoint_base_impl(_host, _local, _io, _max_message_size),
+ std::uint32_t _buffer_shrink_threshold,
+ configuration::endpoint_queue_limit_t _queue_limit)
+ : local_server_endpoint_base_impl(_host, _local, _io,
+ _max_message_size, _queue_limit),
acceptor_(_io),
buffer_shrink_threshold_(_buffer_shrink_threshold) {
is_supporting_magic_cookies_ = false;
@@ -143,6 +147,7 @@ void local_server_endpoint_impl::send_queued(
auto connection_iterator = connections_.find(_queue_iterator->first);
if (connection_iterator != connections_.end()) {
connection_iterator->second->send_queued(_queue_iterator);
+ its_connection = connection_iterator->second;
} else {
VSOMEIP_INFO << "Didn't find connection: "
#ifdef _WIN32
@@ -152,7 +157,7 @@ void local_server_endpoint_impl::send_queued(
<< _queue_iterator->first.path()
#endif
<< " dropping outstanding messages (" << std::dec
- << _queue_iterator->second.size() << ").";
+ << _queue_iterator->second.second.size() << ").";
queues_.erase(_queue_iterator->first);
}
}
@@ -348,7 +353,7 @@ void local_server_endpoint_impl::connection::send_queued(
return;
}
- message_buffer_ptr_t its_buffer = _queue_iterator->second.front();
+ message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
#if 0
std::stringstream msg;
msg << "lse::sq: ";
@@ -664,10 +669,8 @@ void local_server_endpoint_impl::print_status() {
}
auto found_queue = queues_.find(c.first);
if (found_queue != queues_.end()) {
- its_queue_size = found_queue->second.size();
- for (const auto &m : found_queue->second) {
- its_data_size += m->size();
- }
+ its_queue_size = found_queue->second.second.size();
+ its_data_size = found_queue->second.first;
}
VSOMEIP_INFO << "status lse: client: " << its_remote_path
<< " queue: " << std::dec << its_queue_size
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp
index 8ffbfdb..17e26df 100644
--- a/implementation/endpoints/src/server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/server_endpoint_impl.cpp
@@ -25,8 +25,10 @@ namespace vsomeip {
template<typename Protocol>
server_endpoint_impl<Protocol>::server_endpoint_impl(
std::shared_ptr<endpoint_host> _host, endpoint_type _local,
- boost::asio::io_service &_io, std::uint32_t _max_message_size)
- : endpoint_impl<Protocol>(_host, _local, _io, _max_message_size),
+ boost::asio::io_service &_io, std::uint32_t _max_message_size,
+ configuration::endpoint_queue_limit_t _queue_limit)
+ : endpoint_impl<Protocol>(_host, _local, _io, _max_message_size,
+ _queue_limit),
flush_timer_(_io) {
}
@@ -140,25 +142,65 @@ bool server_endpoint_impl<Protocol>::send_intern(
target_queue_iterator = queues_.insert(queues_.begin(),
std::make_pair(
_target,
- std::deque<message_buffer_ptr_t>()
+ std::make_pair(std::size_t(0),
+ std::deque<message_buffer_ptr_t>())
));
}
// TODO compare against value from configuration here
- const bool queue_size_zero_on_entry(target_queue_iterator->second.empty());
+ const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty());
if (target_packetizer->size() + _size
> endpoint_impl<Protocol>::max_message_size_
&& !target_packetizer->empty()) {
- target_queue_iterator->second.push_back(target_packetizer);
+ target_queue_iterator->second.second.push_back(target_packetizer);
+ target_queue_iterator->second.first += target_packetizer->size();
target_packetizer = std::make_shared<message_buffer_t>();
packetizer_[_target] = target_packetizer;
}
+ if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
+ && target_queue_iterator->second.first + _size >
+ endpoint_impl<Protocol>::queue_limit_) {
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_size >= VSOMEIP_SESSION_POS_MAX) {
+ // this will yield wrong IDs for local communication as the commands
+ // are prepended to the actual payload
+ // it will print:
+ // (lowbyte service ID + highbyte methoid)
+ // [(Command + lowerbyte sender's client ID).
+ // highbyte sender's client ID + lowbyte command size.
+ // lowbyte methodid + highbyte vsomeipd length]
+ its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
+ _data[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
+ _data[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
+ _data[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
+ _data[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_ERROR << "sei::send_intern: queue size limit (" << std::dec
+ << endpoint_impl<Protocol>::queue_limit_
+ << ") reached. Dropping message ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
+ << " queue_size: " << std::dec << target_queue_iterator->second.first
+ << " data size: " << std::dec << _size;
+ return false;
+ }
+
+
target_packetizer->insert(target_packetizer->end(), _data, _data + _size);
if (_flush) {
flush_timer_.cancel();
- target_queue_iterator->second.push_back(target_packetizer);
+ target_queue_iterator->second.second.push_back(target_packetizer);
+ target_queue_iterator->second.first += target_packetizer->size();
packetizer_[_target] = std::make_shared<message_buffer_t>();
} else {
std::chrono::milliseconds flush_timeout(VSOMEIP_DEFAULT_FLUSH_TIMEOUT);
@@ -170,7 +212,7 @@ bool server_endpoint_impl<Protocol>::send_intern(
std::placeholders::_1));
}
- if (queue_size_zero_on_entry && !target_queue_iterator->second.empty()) { // no writing in progress
+ if (queue_size_zero_on_entry && !target_queue_iterator->second.second.empty()) { // no writing in progress
send_queued(target_queue_iterator);
}
@@ -183,7 +225,7 @@ bool server_endpoint_impl<Protocol>::flush(
bool is_flushed = false;
std::lock_guard<std::mutex> its_lock(mutex_);
auto queue_iterator = queues_.find(_target);
- if (queue_iterator != queues_.end() && !queue_iterator->second.empty()) {
+ if (queue_iterator != queues_.end() && !queue_iterator->second.second.empty()) {
send_queued(queue_iterator);
is_flushed = true;
}
@@ -205,8 +247,10 @@ void server_endpoint_impl<Protocol>::send_cbk(
std::lock_guard<std::mutex> its_lock(mutex_);
if (!_error) {
- _queue_iterator->second.pop_front();
- if (_queue_iterator->second.size() > 0) {
+ _queue_iterator->second.first -=
+ _queue_iterator->second.second.front()->size();
+ _queue_iterator->second.second.pop_front();
+ if (_queue_iterator->second.second.size() > 0) {
send_queued(_queue_iterator);
}
} else {
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index 2646f01..f1d2efc 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -29,8 +29,10 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl(
boost::asio::io_service &_io,
std::uint32_t _max_message_size,
std::uint32_t _buffer_shrink_threshold,
- std::chrono::milliseconds _send_timeout)
- : tcp_client_endpoint_base_impl(_host, _local, _remote, _io, _max_message_size),
+ std::chrono::milliseconds _send_timeout,
+ configuration::endpoint_queue_limit_t _queue_limit)
+ : tcp_client_endpoint_base_impl(_host, _local, _remote, _io,
+ _max_message_size, _queue_limit),
recv_buffer_size_initial_(VSOMEIP_SOMEIP_HEADER_SIZE),
recv_buffer_(recv_buffer_size_initial_, 0),
recv_buffer_size_(0),
@@ -64,11 +66,10 @@ void tcp_client_endpoint_impl::restart() {
is_connected_ = false;
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- shutdown_and_close_socket_unlocked();
+ shutdown_and_close_socket_unlocked(true);
recv_buffer_size_ = 0;
recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
recv_buffer_.shrink_to_fit();
- socket_.reset(new socket_type(service_));
}
{
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -94,6 +95,7 @@ void tcp_client_endpoint_impl::restart() {
<< " size: " << std::dec << m->size();
}
queue_.clear();
+ queue_size_ = 0;
}
start_connect_timer();
}
@@ -348,6 +350,7 @@ void tcp_client_endpoint_impl::send_magic_cookie(message_buffer_ptr_t &_buffer)
CLIENT_COOKIE,
CLIENT_COOKIE + sizeof(CLIENT_COOKIE)
);
+ queue_size_ += sizeof(CLIENT_COOKIE);
} else {
VSOMEIP_WARNING << "Packet full. Cannot insert magic cookie!";
}
@@ -496,10 +499,11 @@ void tcp_client_endpoint_impl::receive_cbk(
if (_error == boost::asio::error::connection_reset ||
_error == boost::asio::error::eof ||
_error == boost::asio::error::timed_out) {
- if(_error == boost::asio::error::timed_out) {
- VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: " << _error.message();
- }
- shutdown_and_close_socket_unlocked();
+ VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: " << _error.message()
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ is_connected_ = false;
+ shutdown_and_close_socket_unlocked(false);
} else {
its_lock.unlock();
receive();
@@ -594,9 +598,7 @@ void tcp_client_endpoint_impl::print_status() {
{
std::lock_guard<std::mutex> its_lock(mutex_);
its_queue_size = queue_.size();
- for (const auto &m : queue_) {
- its_data_size += m->size();
- }
+ its_data_size = queue_size_;
}
std::string local;
{
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
index 37e3554..5a061e1 100644
--- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
@@ -25,8 +25,10 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl(
std::shared_ptr<endpoint_host> _host, endpoint_type _local,
boost::asio::io_service &_io, std::uint32_t _max_message_size,
std::uint32_t _buffer_shrink_threshold,
- std::chrono::milliseconds _send_timeout)
- : tcp_server_endpoint_base_impl(_host, _local, _io, _max_message_size),
+ std::chrono::milliseconds _send_timeout,
+ configuration::endpoint_queue_limit_t _queue_limit)
+ : tcp_server_endpoint_base_impl(_host, _local, _io,
+ _max_message_size, _queue_limit),
acceptor_(_io),
buffer_shrink_threshold_(_buffer_shrink_threshold),
local_port_(_local.port()),
@@ -110,7 +112,7 @@ void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iter
<< _queue_iterator->first.address().to_string() << ":" << std::dec
<< static_cast<std::uint16_t>(_queue_iterator->first.port())
<< " dropping outstanding messages (" << std::dec
- << _queue_iterator->second.size() << ").";
+ << _queue_iterator->second.second.size() << ").";
queues_.erase(_queue_iterator->first);
}
}
@@ -310,7 +312,7 @@ void tcp_server_endpoint_impl::connection::send_queued(
" couldn't lock server_";
return;
}
- message_buffer_ptr_t its_buffer = _queue_iterator->second.front();
+ message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
(*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
(*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
@@ -328,8 +330,10 @@ void tcp_server_endpoint_impl::connection::send_queued(
std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
- send_magic_cookie(its_buffer);
- last_cookie_sent_ = now;
+ if (send_magic_cookie(its_buffer)) {
+ last_cookie_sent_ = now;
+ _queue_iterator->second.first += sizeof(SERVICE_COOKIE);
+ }
}
}
@@ -351,14 +355,16 @@ void tcp_server_endpoint_impl::connection::send_queued(
}
}
-void tcp_server_endpoint_impl::connection::send_magic_cookie(
+bool tcp_server_endpoint_impl::connection::send_magic_cookie(
message_buffer_ptr_t &_buffer) {
if (max_message_size_ == MESSAGE_SIZE_UNLIMITED
|| max_message_size_ - _buffer->size() >=
VSOMEIP_SOMEIP_HEADER_SIZE + VSOMEIP_SOMEIP_MAGIC_COOKIE_SIZE) {
_buffer->insert(_buffer->begin(), SERVICE_COOKIE,
SERVICE_COOKIE + sizeof(SERVICE_COOKIE));
+ return true;
}
+ return false;
}
bool tcp_server_endpoint_impl::connection::is_magic_cookie(size_t _offset) const {
@@ -553,7 +559,10 @@ void tcp_server_endpoint_impl::connection::receive_cbk(
|| _error == boost::asio::error::connection_reset
|| _error == boost::asio::error::timed_out) {
if(_error == boost::asio::error::timed_out) {
- VSOMEIP_WARNING << "tcp_server_endpoint receive_cbk: " << _error.message();
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ VSOMEIP_WARNING << "tcp_server_endpoint receive_cbk: " << _error.message()
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
}
{
std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_);
@@ -746,10 +755,8 @@ void tcp_server_endpoint_impl::print_status() {
}
auto found_queue = queues_.find(c.first);
if (found_queue != queues_.end()) {
- its_queue_size = found_queue->second.size();
- for (const auto &m : found_queue->second) {
- its_data_size += m->size();
- }
+ its_queue_size = found_queue->second.second.size();
+ its_data_size = found_queue->second.first;
}
VSOMEIP_INFO << "status tse: client: "
<< c.second->get_address_port_remote()
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
index 28a4888..c8ca0e0 100644
--- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
@@ -19,9 +19,10 @@ udp_client_endpoint_impl::udp_client_endpoint_impl(
std::shared_ptr< endpoint_host > _host,
endpoint_type _local,
endpoint_type _remote,
- boost::asio::io_service &_io)
+ boost::asio::io_service &_io,
+ configuration::endpoint_queue_limit_t _queue_limit)
: udp_client_endpoint_base_impl(_host, _local, _remote, _io,
- VSOMEIP_MAX_UDP_MESSAGE_SIZE),
+ VSOMEIP_MAX_UDP_MESSAGE_SIZE, _queue_limit),
recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0),
remote_address_(_remote.address()),
remote_port_(_remote.port()) {
@@ -81,7 +82,7 @@ void udp_client_endpoint_impl::restart() {
std::lock_guard<std::mutex> its_lock(mutex_);
queue_.clear();
}
- shutdown_and_close_socket();
+ shutdown_and_close_socket(false);
start_connect_timer();
}
@@ -215,7 +216,7 @@ void udp_client_endpoint_impl::receive_cbk(
receive();
} else {
if (_error == boost::asio::error::connection_refused) {
- shutdown_and_close_socket();
+ shutdown_and_close_socket(false);
} else {
receive();
}
@@ -256,9 +257,7 @@ void udp_client_endpoint_impl::print_status() {
{
std::lock_guard<std::mutex> its_lock(mutex_);
its_queue_size = queue_.size();
- for (const auto &m : queue_) {
- its_data_size += m->size();
- }
+ its_data_size = queue_size_;
}
std::string local;
{
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
index b712266..92990f1 100644
--- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
@@ -24,9 +24,10 @@ namespace vsomeip {
udp_server_endpoint_impl::udp_server_endpoint_impl(
std::shared_ptr< endpoint_host > _host,
endpoint_type _local,
- boost::asio::io_service &_io)
+ boost::asio::io_service &_io,
+ configuration::endpoint_queue_limit_t _queue_limit)
: server_endpoint_impl<ip::udp_ext>(
- _host, _local, _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE),
+ _host, _local, _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE, _queue_limit),
socket_(_io, _local.protocol()),
joined_group_(false),
recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0),
@@ -121,7 +122,7 @@ bool udp_server_endpoint_impl::send_to(
void udp_server_endpoint_impl::send_queued(
const queue_iterator_type _queue_iterator) {
- message_buffer_ptr_t its_buffer = _queue_iterator->second.front();
+ message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
#if 0
std::stringstream msg;
msg << "usei::sq(" << _queue_iterator->first.address().to_string() << ":"
@@ -405,10 +406,9 @@ void udp_server_endpoint_impl::print_status() {
for (const auto &c : queues_) {
std::size_t its_data_size(0);
std::size_t its_queue_size(0);
- its_queue_size = c.second.size();
- for (const auto &m : c.second) {
- its_data_size += m->size();
- }
+ its_queue_size = c.second.second.size();
+ its_data_size = c.second.first;
+
boost::system::error_code ec;
VSOMEIP_INFO << "status use: client: "
<< c.first.address().to_string(ec) << ":"
diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp
index 164f922..ac483ab 100644
--- a/implementation/routing/include/event.hpp
+++ b/implementation/routing/include/event.hpp
@@ -53,7 +53,7 @@ public:
const std::shared_ptr<endpoint_definition> _target,
bool _force, bool _flush);
- void set_payload_dont_notify(const std::shared_ptr<payload> &_payload);
+ bool set_payload_dont_notify(const std::shared_ptr<payload> &_payload);
void set_payload(const std::shared_ptr<payload> &_payload,
bool _force, bool _flush);
diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp
index dbdf2b9..ad6810b 100644
--- a/implementation/routing/include/routing_manager_impl.hpp
+++ b/implementation/routing/include/routing_manager_impl.hpp
@@ -160,7 +160,7 @@ public:
client_t _bound_client,
const boost::asio::ip::address &_remote_address,
std::uint16_t _remote_port);
- void on_message(service_t _service, instance_t _instance,
+ bool on_message(service_t _service, instance_t _instance,
const byte_t *_data, length_t _size, bool _reliable, bool _is_valid_crc = true);
void on_notification(client_t _client, service_t _service,
instance_t _instance, const byte_t *_data, length_t _size,
diff --git a/implementation/routing/include/routing_manager_stub_host.hpp b/implementation/routing/include/routing_manager_stub_host.hpp
index 065be80..5497d6e 100644
--- a/implementation/routing/include/routing_manager_stub_host.hpp
+++ b/implementation/routing/include/routing_manager_stub_host.hpp
@@ -56,7 +56,7 @@ public:
virtual void unsubscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0;
- virtual void on_message(service_t _service, instance_t _instance,
+ virtual bool on_message(service_t _service, instance_t _instance,
const byte_t *_data, length_t _size, bool _reliable, bool _is_valid_crc = true) = 0;
virtual void on_notification(client_t _client,
diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp
index e3dde2c..ad228de 100644
--- a/implementation/routing/src/event.cpp
+++ b/implementation/routing/src/event.cpp
@@ -92,16 +92,19 @@ const std::shared_ptr<payload> event::get_payload() const {
return (message_->get_payload());
}
-void event::set_payload_dont_notify(const std::shared_ptr<payload> &_payload) {
+bool event::set_payload_dont_notify(const std::shared_ptr<payload> &_payload) {
std::lock_guard<std::mutex> its_lock(mutex_);
- if(is_cache_placeholder_) {
+ if (is_cache_placeholder_) {
reset_payload(_payload);
is_set_ = true;
} else {
if (set_payload_helper(_payload, false)) {
reset_payload(_payload);
+ } else {
+ return false;
}
}
+ return true;
}
void event::set_payload(const std::shared_ptr<payload> &_payload,
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp
index 1d0c795..0e8ce5d 100644
--- a/implementation/routing/src/routing_manager_base.cpp
+++ b/implementation/routing/src/routing_manager_base.cpp
@@ -238,6 +238,100 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
its_event->set_eventgroups(_eventgroups);
}
+ if (_is_shadow && !_epsilon_change_func) {
+ std::shared_ptr<vsomeip::cfg::debounce> its_debounce
+ = configuration_->get_debounce(_service, _instance, _event);
+ if (its_debounce) {
+ VSOMEIP_WARNING << "Using debounce configuration for "
+ << " SOME/IP event "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0')
+ << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0')
+ << _event << ".";
+ std::stringstream its_debounce_parameters;
+ its_debounce_parameters << "(on_change="
+ << (its_debounce->on_change_ ? "true" : "false")
+ << ", ignore=[ ";
+ for (auto i : its_debounce->ignore_)
+ its_debounce_parameters << "(" << std::dec << i.first
+ << ", " << std::hex << (int)i.second << ") ";
+ its_debounce_parameters << "], interval="
+ << std::dec << its_debounce->interval_ << ")";
+ VSOMEIP_WARNING << "Debounce parameters: "
+ << its_debounce_parameters.str();
+ _epsilon_change_func = [its_debounce](
+ const std::shared_ptr<payload> &_old,
+ const std::shared_ptr<payload> &_new) {
+ bool is_changed(false), is_elapsed(false);
+
+ // Check whether we should forward because of changed data
+ if (its_debounce->on_change_) {
+ length_t its_min_length, its_max_length;
+
+ if (_old->get_length() < _new->get_length()) {
+ its_min_length = _old->get_length();
+ its_max_length = _new->get_length();
+ } else {
+ its_min_length = _new->get_length();
+ its_max_length = _old->get_length();
+ }
+
+ // Check whether all additional bytes (if any) are excluded
+ for (length_t i = its_min_length; i < its_max_length; i++) {
+ auto j = its_debounce->ignore_.find(i);
+ if (j == its_debounce->ignore_.end() && j->second == 0xFF) {
+ is_changed = true;
+ break;
+ }
+ }
+
+ if (!is_changed) {
+ const byte_t *its_old = _old->get_data();
+ const byte_t *its_new = _new->get_data();
+ for (length_t i = 0; i < its_min_length; i++) {
+ auto j = its_debounce->ignore_.find(i);
+ if (j == its_debounce->ignore_.end()) {
+ if (its_old[i] != its_new[i]) {
+ is_changed = true;
+ break;
+ }
+ } else if (j->second != 0xFF) {
+ if ((its_old[i] & ~(j->second)) != (its_new[i] & ~(j->second))) {
+ is_changed = true;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (its_debounce->interval_ > -1) {
+ // Check whether we should forward because of the elapsed time since
+ // we did last time
+ std::chrono::steady_clock::time_point its_current
+ = std::chrono::steady_clock::now();
+
+ long elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+ its_current - its_debounce->last_forwarded_).count();
+ is_elapsed = (its_debounce->last_forwarded_ == (std::chrono::steady_clock::time_point::max)()
+ || elapsed >= its_debounce->interval_);
+ if (is_elapsed || (is_changed && its_debounce->on_change_resets_interval_))
+ its_debounce->last_forwarded_ = its_current;
+ }
+ return (is_changed || is_elapsed);
+ };
+ } else {
+ _epsilon_change_func = [](const std::shared_ptr<payload> &_old,
+ const std::shared_ptr<payload> &_new) {
+ (void)_old;
+ (void)_new;
+ return true;
+ };
+ }
+ }
+
its_event->set_epsilon_change_function(_epsilon_change_func);
its_event->set_change_resets_cycle(_change_resets_cycle);
its_event->set_update_cycle(_cycle);
@@ -670,7 +764,8 @@ std::shared_ptr<endpoint> routing_manager_base::create_local_unlocked(client_t _
#else
boost::asio::local::stream_protocol::endpoint(its_path.str())
#endif
- , io_, configuration_->get_max_message_size_local());
+ , io_, configuration_->get_max_message_size_local(),
+ configuration_->get_endpoint_queue_limit_local());
// Messages sent to the VSOMEIP_ROUTING_CLIENT are meant to be routed to
// external devices. Therefore, its local endpoint must not be found by
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 88aaca3..6af17a4 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -1029,7 +1029,9 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
method_t its_method;
bool its_is_crc_valid(true);
instance_t its_instance(0x0);
-
+#ifdef USE_DLT
+ bool is_forwarded(true);
+#endif
if (_size >= VSOMEIP_SOMEIP_HEADER_SIZE) {
its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
_data[VSOMEIP_SERVICE_POS_MAX]);
@@ -1121,30 +1123,36 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
its_data[VSOMEIP_CLIENT_POS_MAX] = 0x0;
}
// Common way of message handling
+#ifdef USE_DLT
+ is_forwarded =
+#endif
on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(), its_is_crc_valid);
+
}
}
}
#ifdef USE_DLT
- const uint16_t its_data_size
- = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
-
- tc::trace_header its_header;
- const boost::asio::ip::address_v4 its_remote_address =
- _remote_address.is_v4() ? _remote_address.to_v4() :
- boost::asio::ip::address_v4::from_string("6.6.6.6");
- tc::protocol_e its_protocol =
- _receiver->is_local() ? tc::protocol_e::local :
- _receiver->is_reliable() ? tc::protocol_e::tcp :
+ if (is_forwarded) {
+ const uint16_t its_data_size
+ = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
+
+ tc::trace_header its_header;
+ const boost::asio::ip::address_v4 its_remote_address =
+ _remote_address.is_v4() ? _remote_address.to_v4() :
+ boost::asio::ip::address_v4::from_string("6.6.6.6");
+ tc::protocol_e its_protocol =
+ _receiver->is_local() ? tc::protocol_e::local :
+ _receiver->is_reliable() ? tc::protocol_e::tcp :
tc::protocol_e::udp;
- its_header.prepare(its_remote_address, _remote_port, its_protocol, false,
- its_instance);
- tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data,
- its_data_size);
+ its_header.prepare(its_remote_address, _remote_port, its_protocol, false,
+ its_instance);
+ tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data,
+ its_data_size);
+ }
#endif
}
-void routing_manager_impl::on_message(
+bool routing_manager_impl::on_message(
service_t _service, instance_t _instance,
const byte_t *_data, length_t _size,
bool _reliable, bool _is_valid_crc) {
@@ -1158,6 +1166,7 @@ void routing_manager_impl::on_message(
VSOMEIP_INFO << msg.str();
#endif
client_t its_client;
+ bool is_forwarded(true);
if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
its_client = find_local_client(_service, _instance);
@@ -1169,12 +1178,13 @@ void routing_manager_impl::on_message(
if (its_client == VSOMEIP_ROUTING_CLIENT
&& utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
- deliver_notification(_service, _instance, _data, _size, _reliable, _is_valid_crc);
+ is_forwarded = deliver_notification(_service, _instance, _data, _size, _reliable, _is_valid_crc);
} else if (its_client == host_->get_client()) {
deliver_message(_data, _size, _instance, _reliable, _is_valid_crc);
} else {
send(its_client, _data, _size, _instance, true, _reliable, _is_valid_crc); //send to proxy
}
+ return is_forwarded;
}
void routing_manager_impl::on_notification(client_t _client,
@@ -1515,21 +1525,20 @@ bool routing_manager_impl::deliver_notification(
service_t _service, instance_t _instance,
const byte_t *_data, length_t _length,
bool _reliable, bool _is_valid_crc) {
- bool is_delivered(false);
-
method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
_data[VSOMEIP_METHOD_POS_MAX]);
std::shared_ptr<event> its_event = find_event(_service, _instance, its_method);
if (its_event) {
- if(its_event->is_field() && !its_event->is_provided()) {
- // store the current value of the remote field
+ if (!its_event->is_provided()) {
const uint32_t its_length(utility::get_payload_size(_data, _length));
- std::shared_ptr<payload> its_payload =
- runtime::get()->create_payload(
- &_data[VSOMEIP_PAYLOAD_POS],
- its_length);
- its_event->set_payload_dont_notify(its_payload);
+ std::shared_ptr<payload> its_payload
+ = runtime::get()->create_payload(&_data[VSOMEIP_PAYLOAD_POS],
+ its_length);
+ if (!its_event->set_payload_dont_notify(its_payload)) {
+ // do not forward the notification as it was filtered
+ return false;
+ }
}
for (const auto its_local_client : its_event->get_subscribers()) {
@@ -1545,7 +1554,7 @@ bool routing_manager_impl::deliver_notification(
}
}
- return is_delivered;
+ return true;
}
std::shared_ptr<eventgroupinfo> routing_manager_impl::find_eventgroup(
@@ -1727,7 +1736,9 @@ std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint(
configuration_->get_max_message_size_reliable(
_address.to_string(), _remote_port),
configuration_->get_buffer_shrink_threshold(),
- std::chrono::milliseconds(configuration_->get_sd_ttl() * 666));
+ std::chrono::milliseconds(configuration_->get_sd_ttl() * 666),
+ configuration_->get_endpoint_queue_limit(
+ _address.to_string(), _remote_port));
if (configuration_->has_enabled_magic_cookies(_address.to_string(),
_remote_port)) {
@@ -1742,7 +1753,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint(
boost::asio::ip::udp::v6()),
_local_port),
boost::asio::ip::udp::endpoint(_address, _remote_port),
- io_);
+ io_, configuration_->get_endpoint_queue_limit(
+ _address.to_string(), _remote_port));
}
} catch (...) {
host_->on_error(error_code_e::CLIENT_ENDPOINT_CREATION_FAILED);
@@ -1765,7 +1777,9 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
configuration_->get_max_message_size_reliable(
its_unicast.to_string(), _port),
configuration_->get_buffer_shrink_threshold(),
- std::chrono::milliseconds(configuration_->get_sd_ttl() * 666));
+ std::chrono::milliseconds(configuration_->get_sd_ttl() * 666),
+ configuration_->get_endpoint_queue_limit(
+ its_unicast.to_string(), _port));
if (configuration_->has_enabled_magic_cookies(
its_unicast.to_string(), _port) ||
configuration_->has_enabled_magic_cookies(
@@ -1773,6 +1787,9 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
its_endpoint->enable_magic_cookies();
}
} else {
+ configuration::endpoint_queue_limit_t its_limit =
+ configuration_->get_endpoint_queue_limit(
+ its_unicast.to_string(), _port);
#ifndef _WIN32
if (its_unicast.is_v4()) {
its_unicast = boost::asio::ip::address_v4::any();
@@ -1782,8 +1799,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
#endif
boost::asio::ip::udp::endpoint ep(its_unicast, _port);
its_endpoint = std::make_shared<udp_server_endpoint_impl>(
- shared_from_this(),
- ep, io_);
+ shared_from_this(), ep, io_, its_limit);
}
} else {
@@ -3287,10 +3303,10 @@ void routing_manager_impl::clear_remote_subscriber(
if (its_instance != its_service->second.end()) {
auto its_client = its_instance->second.find(_client);
if (its_client != its_instance->second.end()) {
- if (its_client->second.size() <= 1) {
- its_instance->second.erase(_client);
- } else {
- its_client->second.erase(_target);
+ if (its_client->second.erase(_target)) {
+ if (!its_client->second.size()) {
+ its_instance->second.erase(_client);
+ }
}
}
}
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index a62ee96..a4af448 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -1783,7 +1783,8 @@ void routing_manager_proxy::init_receiver() {
boost::asio::local::stream_protocol::endpoint(its_client.str()),
#endif
io_, configuration_->get_max_message_size_local(),
- configuration_->get_buffer_shrink_threshold());
+ configuration_->get_buffer_shrink_threshold(),
+ configuration_->get_endpoint_queue_limit_local());
#ifdef _WIN32
VSOMEIP_INFO << "Listening at " << port;
#else
diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp
index 504458e..010c8d0 100644
--- a/implementation/routing/src/routing_manager_stub.cpp
+++ b/implementation/routing/src/routing_manager_stub.cpp
@@ -848,7 +848,8 @@ void routing_manager_stub::init_routing_endpoint() {
boost::asio::local::stream_protocol::endpoint(endpoint_path_),
#endif
io_, configuration_->get_max_message_size_local(),
- configuration_->get_buffer_shrink_threshold());
+ configuration_->get_buffer_shrink_threshold(),
+ configuration_->get_endpoint_queue_limit_local());
} catch (const std::exception &e) {
VSOMEIP_ERROR << ERROR_INFO[static_cast<int>(error_code_e::SERVER_ENDPOINT_CREATION_FAILED)]
<< " (" << static_cast<int>(error_code_e::SERVER_ENDPOINT_CREATION_FAILED) << ")";
@@ -1459,7 +1460,8 @@ void routing_manager_stub::create_local_receiver() {
boost::asio::local::stream_protocol::endpoint(local_receiver_path_),
#endif
io_, configuration_->get_max_message_size_local(),
- configuration_->get_buffer_shrink_threshold());
+ configuration_->get_buffer_shrink_threshold(),
+ configuration_->get_endpoint_queue_limit_local());
} catch (const std::exception &e) {
VSOMEIP_ERROR << ERROR_INFO[static_cast<int>(error_code_e::SERVER_ENDPOINT_CREATION_FAILED)]
<< " (" << static_cast<int>(error_code_e::SERVER_ENDPOINT_CREATION_FAILED) << ")";
diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp
index 1d49df1..8d22c7c 100644
--- a/implementation/runtime/src/application_impl.cpp
+++ b/implementation/runtime/src/application_impl.cpp
@@ -1669,16 +1669,18 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
<< "type=" << static_cast<std::uint32_t>(its_sync_handler->handler_type_)
<< " thread=" << std::hex << its_id;
}
- {
- std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
- running_dispatchers_.insert(its_id);
- }
- try {
- _handler->handler_();
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << "application_impl::invoke_handler caught exception: "
- << e.what();
- print_blocking_call(its_sync_handler);
+ if (is_dispatching_) {
+ {
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
+ running_dispatchers_.insert(its_id);
+ }
+ try {
+ _handler->handler_();
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "application_impl::invoke_handler caught exception: "
+ << e.what();
+ print_blocking_call(its_sync_handler);
+ }
}
boost::system::error_code ec;
its_dispatcher_timer.cancel(ec);
diff --git a/implementation/service_discovery/include/configuration_option_impl.hpp b/implementation/service_discovery/include/configuration_option_impl.hpp
index e600523..1d231e8 100644
--- a/implementation/service_discovery/include/configuration_option_impl.hpp
+++ b/implementation/service_discovery/include/configuration_option_impl.hpp
@@ -24,7 +24,7 @@ class configuration_option_impl: public option_impl {
public:
configuration_option_impl();
virtual ~configuration_option_impl();
- bool operator==(const option_impl &_other) const;
+ bool operator==(const configuration_option_impl &_other) const;
void add_item(const std::string &_key, const std::string &_value);
void remove_item(const std::string &_key);
diff --git a/implementation/service_discovery/include/eventgroupentry_impl.hpp b/implementation/service_discovery/include/eventgroupentry_impl.hpp
index 877cdbf..3a5d897 100755
--- a/implementation/service_discovery/include/eventgroupentry_impl.hpp
+++ b/implementation/service_discovery/include/eventgroupentry_impl.hpp
@@ -8,6 +8,7 @@
#include "entry_impl.hpp"
#include "../../endpoints/include/endpoint_definition.hpp"
+#include "message_impl.hpp"
namespace vsomeip {
namespace sd {
@@ -31,19 +32,20 @@ public:
bool deserialize(vsomeip::deserializer *_from);
bool operator==(const eventgroupentry_impl& _other) const {
- return !(ttl_ != _other.ttl_ ||
- service_ != _other.service_ ||
- instance_ != _other.instance_ ||
- eventgroup_ != _other.eventgroup_ ||
- index1_ != _other.index1_ ||
- index2_ != _other.index2_ ||
- num_options_[0] != _other.num_options_[0] ||
- num_options_[1] != _other.num_options_[1] ||
- major_version_ != _other.major_version_ ||
- counter_ != _other.counter_);
+ return (ttl_ == _other.ttl_ &&
+ service_ == _other.service_ &&
+ instance_ == _other.instance_ &&
+ eventgroup_ == _other.eventgroup_ &&
+ index1_ == _other.index1_ &&
+ index2_ == _other.index2_ &&
+ num_options_[0] == _other.num_options_[0] &&
+ num_options_[1] == _other.num_options_[1] &&
+ major_version_ == _other.major_version_ &&
+ counter_ == _other.counter_);
}
- bool is_matching_subscribe(const eventgroupentry_impl& _other) const;
+ bool is_matching_subscribe(const eventgroupentry_impl& _other,
+ const message_impl::options_t& _options) const;
void add_target(const std::shared_ptr<endpoint_definition> &_target);
std::shared_ptr<endpoint_definition> get_target(bool _reliable) const;
diff --git a/implementation/service_discovery/include/ip_option_impl.hpp b/implementation/service_discovery/include/ip_option_impl.hpp
index 67f356b..1345835 100644
--- a/implementation/service_discovery/include/ip_option_impl.hpp
+++ b/implementation/service_discovery/include/ip_option_impl.hpp
@@ -17,7 +17,7 @@ class ip_option_impl: public option_impl {
public:
ip_option_impl();
virtual ~ip_option_impl();
- bool operator ==(const option_impl &_option) const;
+ virtual bool operator ==(const ip_option_impl &_option) const;
uint16_t get_port() const;
void set_port(uint16_t _port);
diff --git a/implementation/service_discovery/include/ipv4_option_impl.hpp b/implementation/service_discovery/include/ipv4_option_impl.hpp
index 5f22534..8ede63d 100644
--- a/implementation/service_discovery/include/ipv4_option_impl.hpp
+++ b/implementation/service_discovery/include/ipv4_option_impl.hpp
@@ -17,6 +17,7 @@ class ipv4_option_impl: public ip_option_impl {
public:
ipv4_option_impl(bool _is_multicast);
virtual ~ipv4_option_impl();
+ bool operator ==(const ipv4_option_impl &_other) const;
const ipv4_address_t & get_address() const;
void set_address(const ipv4_address_t &_address);
diff --git a/implementation/service_discovery/include/ipv6_option_impl.hpp b/implementation/service_discovery/include/ipv6_option_impl.hpp
index b082808..92701c0 100644
--- a/implementation/service_discovery/include/ipv6_option_impl.hpp
+++ b/implementation/service_discovery/include/ipv6_option_impl.hpp
@@ -17,6 +17,7 @@ class ipv6_option_impl: public ip_option_impl {
public:
ipv6_option_impl(bool _is_multicast);
virtual ~ipv6_option_impl();
+ bool operator ==(const ipv6_option_impl &_other) const;
const ipv6_address_t & get_address() const;
void set_address(const ipv6_address_t &_address);
diff --git a/implementation/service_discovery/include/load_balancing_option_impl.hpp b/implementation/service_discovery/include/load_balancing_option_impl.hpp
index e65989e..767c345 100755
--- a/implementation/service_discovery/include/load_balancing_option_impl.hpp
+++ b/implementation/service_discovery/include/load_balancing_option_impl.hpp
@@ -16,7 +16,7 @@ class load_balancing_option_impl: public option_impl {
public:
load_balancing_option_impl();
virtual ~load_balancing_option_impl();
- bool operator ==(const option_impl &_other) const;
+ bool operator ==(const load_balancing_option_impl &_other) const;
priority_t get_priority() const;
void set_priority(priority_t _priority);
diff --git a/implementation/service_discovery/include/protection_option_impl.hpp b/implementation/service_discovery/include/protection_option_impl.hpp
index 589bb7a..33d5849 100755
--- a/implementation/service_discovery/include/protection_option_impl.hpp
+++ b/implementation/service_discovery/include/protection_option_impl.hpp
@@ -16,7 +16,7 @@ class protection_option_impl: public option_impl {
public:
protection_option_impl();
virtual ~protection_option_impl();
- virtual bool operator ==(const option_impl &_other) const;
+ bool operator ==(const protection_option_impl &_other) const;
alive_counter_t get_alive_counter() const;
void set_alive_counter(alive_counter_t _counter);
diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp
index 214450d..d3c5f07 100644
--- a/implementation/service_discovery/include/service_discovery_impl.hpp
+++ b/implementation/service_discovery/include/service_discovery_impl.hpp
@@ -23,6 +23,7 @@
#include "ipv4_option_impl.hpp"
#include "ipv6_option_impl.hpp"
#include "deserializer.hpp"
+#include "../../configuration/include/configuration.hpp"
namespace vsomeip {
@@ -312,7 +313,12 @@ private:
const std::shared_ptr<endpoint_definition> &_subscriber);
bool check_stop_subscribe_subscribe(message_impl::entries_t::const_iterator _iter,
- message_impl::entries_t::const_iterator _end) const;
+ message_impl::entries_t::const_iterator _end,
+ const message_impl::options_t& _options) const;
+
+ configuration::ttl_factor_t get_ttl_factor(
+ service_t _service, instance_t _instance,
+ const configuration::ttl_map_t& _ttl_map) const;
private:
boost::asio::io_service &io_;
@@ -402,6 +408,9 @@ private:
std::map<eventgroup_t,
std::map<client_t, std::vector<std::shared_ptr<subscriber_t>>>>>> pending_remote_subscriptions_;
std::mutex response_mutex_;
+
+ configuration::ttl_map_t ttl_factor_offers_;
+ configuration::ttl_map_t ttl_factor_subscriptions_;
};
} // namespace sd
diff --git a/implementation/service_discovery/src/configuration_option_impl.cpp b/implementation/service_discovery/src/configuration_option_impl.cpp
index cb1937e..e9cf058 100755
--- a/implementation/service_discovery/src/configuration_option_impl.cpp
+++ b/implementation/service_discovery/src/configuration_option_impl.cpp
@@ -20,14 +20,10 @@ configuration_option_impl::configuration_option_impl() {
configuration_option_impl::~configuration_option_impl() {
}
-bool configuration_option_impl::operator ==(const option_impl &_other) const {
- if (_other.get_type() != option_type_e::CONFIGURATION)
- return false;
-
- const configuration_option_impl& other =
- dynamic_cast<const configuration_option_impl &>(_other);
-
- return (configuration_ == other.configuration_);
+bool configuration_option_impl::operator ==(
+ const configuration_option_impl &_other) const {
+ return (option_impl::operator ==(_other)
+ && configuration_ == _other.configuration_);
}
void configuration_option_impl::add_item(const std::string &_key,
diff --git a/implementation/service_discovery/src/eventgroupentry_impl.cpp b/implementation/service_discovery/src/eventgroupentry_impl.cpp
index 1f9dd4a..f5394be 100755
--- a/implementation/service_discovery/src/eventgroupentry_impl.cpp
+++ b/implementation/service_discovery/src/eventgroupentry_impl.cpp
@@ -7,6 +7,8 @@
#include "../include/eventgroupentry_impl.hpp"
#include "../../message/include/deserializer.hpp"
#include "../../message/include/serializer.hpp"
+#include "../include/ipv4_option_impl.hpp"
+#include "../include/ipv6_option_impl.hpp"
namespace vsomeip {
namespace sd {
@@ -116,18 +118,110 @@ bool eventgroupentry_impl::deserialize(vsomeip::deserializer *_from) {
}
bool eventgroupentry_impl::is_matching_subscribe(
- const eventgroupentry_impl& _other) const {
- return !(ttl_ != 0
- || _other.ttl_ == 0
- || service_ != _other.service_
- || instance_ != _other.instance_
- || eventgroup_ != _other.eventgroup_
- || index1_ != _other.index1_
- || index2_ != _other.index2_
- || num_options_[0] != _other.num_options_[0]
- || num_options_[1] != _other.num_options_[1]
- || major_version_ != _other.major_version_
- || counter_ != _other.counter_);
+ const eventgroupentry_impl& _other,
+ const message_impl::options_t& _options) const {
+ if (ttl_ == 0
+ && _other.ttl_ > 0
+ && service_ == _other.service_
+ && instance_ == _other.instance_
+ && eventgroup_ == _other.eventgroup_
+ && index1_ == _other.index1_
+ && index2_ == _other.index2_
+ && num_options_[0] == _other.num_options_[0]
+ && num_options_[1] == _other.num_options_[1]
+ && major_version_ == _other.major_version_
+ && counter_ == _other.counter_) {
+ return true;
+ } else if (ttl_ == 0
+ && _other.ttl_ > 0
+ && service_ == _other.service_
+ && instance_ == _other.instance_
+ && eventgroup_ == _other.eventgroup_
+ && major_version_ == _other.major_version_
+ && counter_ == _other.counter_) {
+ // check if entries reference options at different indexes but the
+ // options itself are identical
+ // check if number of options referenced is the same
+ if (num_options_[0] + num_options_[1]
+ != _other.num_options_[0] + _other.num_options_[1] ||
+ num_options_[0] + num_options_[1] == 0) {
+ return false;
+ }
+ // read out ip options of current and _other
+ std::vector<std::shared_ptr<ip_option_impl>> its_options_current;
+ std::vector<std::shared_ptr<ip_option_impl>> its_options_other;
+ for (const auto option_run : {0,1}) {
+ for (const auto option_index : options_[option_run]) {
+ switch (_options[option_index]->get_type()) {
+ case option_type_e::IP4_ENDPOINT:
+ its_options_current.push_back(
+ std::static_pointer_cast<ipv4_option_impl>(
+ _options[option_index]));
+ break;
+ case option_type_e::IP6_ENDPOINT:
+ its_options_current.push_back(
+ std::static_pointer_cast<ipv6_option_impl>(
+ _options[option_index]));
+ break;
+ default:
+ break;
+ }
+ }
+ for (const auto option_index : _other.options_[option_run]) {
+ switch (_options[option_index]->get_type()) {
+ case option_type_e::IP4_ENDPOINT:
+ its_options_other.push_back(
+ std::static_pointer_cast<ipv4_option_impl>(
+ _options[option_index]));
+ break;
+ case option_type_e::IP6_ENDPOINT:
+ its_options_other.push_back(
+ std::static_pointer_cast<ipv6_option_impl>(
+ _options[option_index]));
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ if (!its_options_current.size() || !its_options_other.size()) {
+ return false;
+ }
+
+ // search every option of current in other
+ for (const auto& c : its_options_current) {
+ bool found(false);
+ for (const auto& o : its_options_other) {
+ if (*c == *o) {
+ switch (c->get_type()) {
+ case option_type_e::IP4_ENDPOINT:
+ if (static_cast<ipv4_option_impl*>(c.get())->get_address()
+ == static_cast<ipv4_option_impl*>(o.get())->get_address()) {
+ found = true;
+ }
+ break;
+ case option_type_e::IP6_ENDPOINT:
+ if (static_cast<ipv6_option_impl*>(c.get())->get_address()
+ == static_cast<ipv6_option_impl*>(o.get())->get_address()) {
+ found = true;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ if (found) {
+ break;
+ }
+ }
+ if (!found) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
}
void eventgroupentry_impl::add_target(
diff --git a/implementation/service_discovery/src/ip_option_impl.cpp b/implementation/service_discovery/src/ip_option_impl.cpp
index 9be1c5f..0684dc8 100644
--- a/implementation/service_discovery/src/ip_option_impl.cpp
+++ b/implementation/service_discovery/src/ip_option_impl.cpp
@@ -21,15 +21,10 @@ ip_option_impl::ip_option_impl() :
ip_option_impl::~ip_option_impl() {
}
-bool ip_option_impl::operator ==(const option_impl &_other) const {
- if (type_ != _other.get_type())
- return false;
-
-#ifdef VSOMEIP_TODO
- const ip_option_impl & other =
- dynamic_cast<const ip_option_impl &>(_other);
-#endif
- return true;
+bool ip_option_impl::operator ==(const ip_option_impl &_other) const {
+ return (option_impl::operator ==(_other)
+ && protocol_ == _other.protocol_
+ && port_ == _other.port_);
}
unsigned short ip_option_impl::get_port() const {
diff --git a/implementation/service_discovery/src/ipv4_option_impl.cpp b/implementation/service_discovery/src/ipv4_option_impl.cpp
index 00bf7b5..22ee7d8 100644
--- a/implementation/service_discovery/src/ipv4_option_impl.cpp
+++ b/implementation/service_discovery/src/ipv4_option_impl.cpp
@@ -25,6 +25,11 @@ ipv4_option_impl::ipv4_option_impl(bool _is_multicast) :
ipv4_option_impl::~ipv4_option_impl() {
}
+bool ipv4_option_impl::operator ==(const ipv4_option_impl &_other) const {
+ return (ip_option_impl::operator ==(_other)
+ && address_ == _other.address_);
+}
+
const ipv4_address_t & ipv4_option_impl::get_address() const {
return address_;
}
diff --git a/implementation/service_discovery/src/ipv6_option_impl.cpp b/implementation/service_discovery/src/ipv6_option_impl.cpp
index 94c8fe4..e4de3d6 100755
--- a/implementation/service_discovery/src/ipv6_option_impl.cpp
+++ b/implementation/service_discovery/src/ipv6_option_impl.cpp
@@ -25,6 +25,11 @@ ipv6_option_impl::ipv6_option_impl(bool _is_multicast) :
ipv6_option_impl::~ipv6_option_impl() {
}
+bool ipv6_option_impl::operator ==(const ipv6_option_impl &_other) const {
+ return (ip_option_impl::operator ==(_other)
+ && address_ == _other.address_);
+}
+
const ipv6_address_t & ipv6_option_impl::get_address() const {
return address_;
}
diff --git a/implementation/service_discovery/src/load_balancing_option_impl.cpp b/implementation/service_discovery/src/load_balancing_option_impl.cpp
index 8fe8217..937dd4d 100755
--- a/implementation/service_discovery/src/load_balancing_option_impl.cpp
+++ b/implementation/service_discovery/src/load_balancing_option_impl.cpp
@@ -20,14 +20,11 @@ load_balancing_option_impl::load_balancing_option_impl() {
load_balancing_option_impl::~load_balancing_option_impl() {
}
-bool load_balancing_option_impl::operator ==(const option_impl &_other) const {
- if (_other.get_type() != option_type_e::LOAD_BALANCING)
- return false;
-
- const load_balancing_option_impl& other =
- dynamic_cast<const load_balancing_option_impl &>(_other);
-
- return (priority_ == other.priority_ && weight_ == other.weight_);
+bool load_balancing_option_impl::operator ==(
+ const load_balancing_option_impl &_other) const {
+ return (option_impl::operator ==(_other)
+ && priority_ == _other.priority_
+ && priority_ == _other.weight_);
}
priority_t load_balancing_option_impl::get_priority() const {
diff --git a/implementation/service_discovery/src/option_impl.cpp b/implementation/service_discovery/src/option_impl.cpp
index 6854507..8191912 100755
--- a/implementation/service_discovery/src/option_impl.cpp
+++ b/implementation/service_discovery/src/option_impl.cpp
@@ -20,8 +20,7 @@ option_impl::~option_impl() {
}
bool option_impl::operator ==(const option_impl &_other) const {
- (void)_other;
- return false;
+ return (type_ == _other.type_ && length_ == _other.length_);
}
uint16_t option_impl::get_length() const {
diff --git a/implementation/service_discovery/src/protection_option_impl.cpp b/implementation/service_discovery/src/protection_option_impl.cpp
index b7db090..f847650 100755
--- a/implementation/service_discovery/src/protection_option_impl.cpp
+++ b/implementation/service_discovery/src/protection_option_impl.cpp
@@ -20,14 +20,11 @@ protection_option_impl::protection_option_impl() {
protection_option_impl::~protection_option_impl() {
}
-bool protection_option_impl::operator ==(const option_impl &_other) const {
- if (_other.get_type() != option_type_e::PROTECTION)
- return false;
-
- const protection_option_impl& other =
- dynamic_cast<const protection_option_impl &>(_other);
-
- return (counter_ == other.counter_ && crc_ == other.crc_);
+bool protection_option_impl::operator ==(
+ const protection_option_impl &_other) const {
+ return (option_impl::operator ==(_other)
+ && counter_ == _other.counter_
+ && crc_ == _other.crc_);
}
alive_counter_t protection_option_impl::get_alive_counter() const {
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index f2171fb..fb13a8b 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -131,6 +131,9 @@ void service_discovery_impl::init() {
offer_debounce_time_ = std::chrono::milliseconds(
its_configuration->get_sd_offer_debounce_time());
ttl_timer_runtime_ = cyclic_offer_delay_ / 2;
+
+ ttl_factor_offers_ = its_configuration->get_ttl_factor_offers();
+ ttl_factor_subscriptions_ = its_configuration->get_ttl_factor_subscribes();
} else {
VSOMEIP_ERROR << "SD: no configuration found!";
}
@@ -1194,7 +1197,8 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
if (is_stop_subscribe_subscribe) {
force_initial_events = true;
}
- is_stop_subscribe_subscribe = check_stop_subscribe_subscribe(iter, its_end);
+ is_stop_subscribe_subscribe = check_stop_subscribe_subscribe(
+ iter, its_end, its_message->get_options());
process_eventgroupentry(its_eventgroup_entry, its_options,
its_message_response, _destination,
its_message_id, is_stop_subscribe_subscribe, force_initial_events);
@@ -1357,7 +1361,8 @@ void service_discovery_impl::process_offerservice_serviceentry(
}
host_->add_routing_info(_service, _instance,
- _major, _minor, _ttl * 5,
+ _major, _minor,
+ _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_),
_reliable_address, _reliable_port,
_unreliable_address, _unreliable_port);
@@ -1410,13 +1415,24 @@ void service_discovery_impl::process_offerservice_serviceentry(
its_eventgroup.first,
its_subscription, false, true);
its_client.second->set_tcp_connection_established(false);
- }
+ // restart TCP endpoint if not connected
+ if (its_subscription->get_endpoint(true)
+ && !its_subscription->get_endpoint(true)->is_connected()) {
+ its_subscription->get_endpoint(true)->restart();
+ }
+ }
its_subscription->set_acknowledged(false);
} else {
insert_nack_subscription_on_resubscribe(its_message,
_service, _instance, its_eventgroup.first,
its_subscription);
+
+ // restart TCP endpoint if not connected
+ if (its_subscription->get_endpoint(true)
+ && !its_subscription->get_endpoint(true)->is_connected()) {
+ its_subscription->get_endpoint(true)->restart();
+ }
}
}
}
@@ -2171,8 +2187,8 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service,
client_t its_subscribing_remote_client = VSOMEIP_ROUTING_CLIENT;
switch (host_->on_remote_subscription(_service, _instance,
_eventgroup, target.subscriber_, target.target_,
- _ttl * 5, &its_subscribing_remote_client,
- _message_id)) {
+ _ttl * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_),
+ &its_subscribing_remote_client, _message_id)) {
case remote_subscription_state_e::SUBSCRIPTION_ACKED:
insert_subscription_ack(its_message, _service,
_instance, _eventgroup, its_info, _ttl,
@@ -2460,7 +2476,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
found_client->second, _reliable, !_reliable);
found_client->second->set_udp_connection_established(true);
} else {
- // don't insert reliable endpoint option if the
+ // don't insert unreliable endpoint option if the
// UDP client endpoint is not yet connected
found_client->second->set_udp_connection_established(false);
}
@@ -3181,7 +3197,12 @@ void service_discovery_impl::update_subscription_expiration_timer(
if (entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
&& entry->get_ttl()) {
const std::chrono::steady_clock::time_point its_expiration = now
- + std::chrono::seconds(entry->get_ttl() * 5);
+ + std::chrono::seconds(
+ entry->get_ttl()
+ * get_ttl_factor(
+ entry->get_service(),
+ entry->get_instance(),
+ ttl_factor_subscriptions_));
if (its_expiration < next_subscription_expiration_) {
next_subscription_expiration_ = its_expiration;
}
@@ -3364,17 +3385,35 @@ void service_discovery_impl::remote_subscription_remove(
bool service_discovery_impl::check_stop_subscribe_subscribe(
message_impl::entries_t::const_iterator _iter,
- message_impl::entries_t::const_iterator _end) const {
+ message_impl::entries_t::const_iterator _end,
+ const message_impl::options_t& _options) const {
const message_impl::entries_t::const_iterator its_next = std::next(_iter);
- if ((*_iter)->get_type() != entry_type_e::SUBSCRIBE_EVENTGROUP
+ if ((*_iter)->get_ttl() > 0
+ || (*_iter)->get_type() != entry_type_e::STOP_SUBSCRIBE_EVENTGROUP
|| its_next == _end
- || (*its_next)->get_type() != entry_type_e::STOP_SUBSCRIBE_EVENTGROUP) {
+ || (*its_next)->get_type() != entry_type_e::SUBSCRIBE_EVENTGROUP) {
return false;
}
return (*static_cast<eventgroupentry_impl*>(_iter->get())).is_matching_subscribe(
- *(static_cast<eventgroupentry_impl*>(its_next->get())));
+ *(static_cast<eventgroupentry_impl*>(its_next->get())), _options);
}
+configuration::ttl_factor_t service_discovery_impl::get_ttl_factor(
+ service_t _service, instance_t _instance,
+ const configuration::ttl_map_t& _ttl_map) const {
+ configuration::ttl_factor_t its_ttl_factor(1);
+ auto found_service = _ttl_map.find(_service);
+ if (found_service != _ttl_map.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ its_ttl_factor = found_instance->second;
+ }
+ }
+ return its_ttl_factor;
+}
+
+
+
} // namespace sd
} // namespace vsomeip
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index b3a29c8..25ac281 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -852,6 +852,14 @@ if(NOT ${TESTS_BAT})
)
# Copy config file for client and service into $BUILDDIR/test
+ set(TEST_QUEUE_LIMITED_LOCAL_BIG_PAYLOAD_CONFIG_FILE ${TEST_BIG_PAYLOAD_NAME}_local_queue_limited.json)
+ copy_to_builddir(
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_QUEUE_LIMITED_LOCAL_BIG_PAYLOAD_CONFIG_FILE}
+ ${PROJECT_BINARY_DIR}/test/${TEST_QUEUE_LIMITED_LOCAL_BIG_PAYLOAD_CONFIG_FILE}
+ ${TEST_BIG_PAYLOAD_SERVICE}
+ )
+
+ # Copy config file for client and service into $BUILDDIR/test
set(TEST_LOCAL_BIG_PAYLOAD_CONFIG_FILE_RANDOM ${TEST_BIG_PAYLOAD_NAME}_local_random.json)
copy_to_builddir(
${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_LOCAL_BIG_PAYLOAD_CONFIG_FILE_RANDOM}
@@ -939,6 +947,54 @@ if(NOT ${TESTS_BAT})
${TEST_BIG_PAYLOAD_SERVICE}
)
+ # Copy config file for client and service into $BUILDDIR/test
+ set(TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL ${TEST_BIG_PAYLOAD_NAME}_tcp_client_queue_limited_general.json)
+ configure_file(
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/conf/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL}.in
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL}
+ @ONLY)
+ copy_to_builddir(
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL}
+ ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_GENERAL}
+ ${TEST_BIG_PAYLOAD_CLIENT}
+ )
+
+ # Copy config file for client and service into $BUILDDIR/test
+ set(TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL ${TEST_BIG_PAYLOAD_NAME}_tcp_service_queue_limited_general.json)
+ configure_file(
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/conf/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL}.in
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL}
+ @ONLY)
+ copy_to_builddir(
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL}
+ ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_GENERAL}
+ ${TEST_BIG_PAYLOAD_SERVICE}
+ )
+
+ # Copy config file for client and service into $BUILDDIR/test
+ set(TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC ${TEST_BIG_PAYLOAD_NAME}_tcp_client_queue_limited_specific.json)
+ configure_file(
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/conf/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}.in
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}
+ @ONLY)
+ copy_to_builddir(
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}
+ ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_CLIENT_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}
+ ${TEST_BIG_PAYLOAD_CLIENT}
+ )
+
+ # Copy config file for client and service into $BUILDDIR/test
+ set(TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC ${TEST_BIG_PAYLOAD_NAME}_tcp_service_queue_limited_specific.json)
+ configure_file(
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/conf/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}.in
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}
+ @ONLY)
+ copy_to_builddir(
+ ${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}
+ ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_SERVICE_CONFIG_FILE_QUEUE_LIMITED_SPECIFIC}
+ ${TEST_BIG_PAYLOAD_SERVICE}
+ )
+
# Copy bashscript to start client local to $BUILDDIR/test
set(TEST_LOCAL_BIG_PAYLOAD_CLIENT_START_SCRIPT ${TEST_BIG_PAYLOAD_NAME}_client_local_start.sh)
copy_to_builddir(${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_LOCAL_BIG_PAYLOAD_CLIENT_START_SCRIPT}
@@ -957,6 +1013,7 @@ if(NOT ${TESTS_BAT})
set(TEST_LOCAL_BIG_PAYLOAD_NAME big_payload_test_local)
set(TEST_LOCAL_BIG_PAYLOAD_NAME_RANDOM big_payload_test_local_random)
set(TEST_LOCAL_BIG_PAYLOAD_NAME_LIMITED big_payload_test_local_limited)
+ set(TEST_LOCAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED big_payload_test_local_queue_limited)
set(TEST_LOCAL_BIG_PAYLOAD_STARTER ${TEST_LOCAL_BIG_PAYLOAD_NAME}_starter.sh)
copy_to_builddir(${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_LOCAL_BIG_PAYLOAD_STARTER}
${PROJECT_BINARY_DIR}/test/${TEST_LOCAL_BIG_PAYLOAD_STARTER}
@@ -967,7 +1024,9 @@ if(NOT ${TESTS_BAT})
set(TEST_EXTERNAL_BIG_PAYLOAD_NAME big_payload_test_external)
set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_RANDOM big_payload_test_external_random)
set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_LIMITED big_payload_test_external_limited)
- set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_LIMITED_GENERAL big_payload_test_external_limited_general)
+ set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_LIMITED_GENERAL big_payload_test_external_limited_general)
+ set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_GENERAL big_payload_test_external_queue_limited_general)
+ set(TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_SPECIFIC big_payload_test_external_queue_limited_specific)
set(TEST_EXTERNAL_BIG_PAYLOAD_STARTER ${TEST_EXTERNAL_BIG_PAYLOAD_NAME}_starter.sh)
copy_to_builddir(${PROJECT_SOURCE_DIR}/test/big_payload_tests/${TEST_EXTERNAL_BIG_PAYLOAD_STARTER}
${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_STARTER}
@@ -2218,6 +2277,21 @@ if(NOT ${TESTS_BAT})
)
set_tests_properties(${TEST_EXTERNAL_BIG_PAYLOAD_NAME_LIMITED_GENERAL} PROPERTIES TIMEOUT 120)
+ add_test(NAME ${TEST_LOCAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED}
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_LOCAL_BIG_PAYLOAD_STARTER} QUEUELIMITEDGENERAL
+ )
+ set_tests_properties(${TEST_LOCAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED} PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_GENERAL}
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_STARTER} QUEUELIMITEDGENERAL
+ )
+ set_tests_properties(${TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_GENERAL} PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_SPECIFIC}
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_EXTERNAL_BIG_PAYLOAD_STARTER} QUEUELIMITEDSPECIFIC
+ )
+ set_tests_properties(${TEST_EXTERNAL_BIG_PAYLOAD_NAME_QUEUE_LIMITED_SPECIFIC} PROPERTIES TIMEOUT 120)
+
# client id tests
add_test(NAME ${TEST_CLIENT_ID_NAME}_diff_client_ids_diff_ports
COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_CLIENT_ID_MASTER_STARTER} ${TEST_CLIENT_ID_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE})
diff --git a/test/big_payload_tests/big_payload_test_client.cpp b/test/big_payload_tests/big_payload_test_client.cpp
index 6264836..9b50e79 100644
--- a/test/big_payload_tests/big_payload_test_client.cpp
+++ b/test/big_payload_tests/big_payload_test_client.cpp
@@ -31,6 +31,12 @@ big_payload_test_client::big_payload_test_client(
case big_payload_test::test_mode::LIMITED_GENERAL:
service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_LIMITED_GENERAL;
break;
+ case big_payload_test::test_mode::QUEUE_LIMITED_GENERAL:
+ service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_GENERAL;
+ break;
+ case big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC:
+ service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_SPECIFIC;
+ break;
default:
service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID;
break;
@@ -71,7 +77,9 @@ void big_payload_test_client::stop()
{
VSOMEIP_INFO << "Stopping...";
if (test_mode_ == big_payload_test::test_mode::LIMITED
- || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) {
+ || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) {
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
ASSERT_EQ(number_of_acknowledged_messages_, number_of_messages_to_send_ / 4);
}
@@ -82,7 +90,9 @@ void big_payload_test_client::stop()
void big_payload_test_client::join_sender_thread(){
sender_.join();
if (test_mode_ == big_payload_test::test_mode::LIMITED
- || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) {
+ || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) {
ASSERT_EQ(number_of_acknowledged_messages_, number_of_messages_to_send_ / 4);
} else {
ASSERT_EQ(number_of_sent_messages_, number_of_acknowledged_messages_);
@@ -150,7 +160,9 @@ void big_payload_test_client::on_message(const std::shared_ptr<vsomeip::message>
}
number_of_acknowledged_messages_++;
if (test_mode_ == big_payload_test::test_mode::LIMITED
- || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) {
+ || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) {
if (number_of_acknowledged_messages_ == number_of_messages_to_send_ / 4) {
send();
}
@@ -191,7 +203,9 @@ void big_payload_test_client::run()
unsigned int datasize(std::rand() % big_payload_test::BIG_PAYLOAD_SIZE_RANDOM);
its_payload_data.assign(datasize, big_payload_test::DATA_CLIENT_TO_SERVICE);
} else if (test_mode_ == big_payload_test::test_mode::LIMITED
- || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) {
+ || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) {
if (i % 2) {
// try to sent a too big payload for half of the messages
its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 3,
@@ -207,6 +221,10 @@ void big_payload_test_client::run()
its_payload->set_data(its_payload_data);
request_->set_payload(its_payload);
app_->send(request_, true);
+ if (test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ }
VSOMEIP_INFO << "Client/Session [" << std::setw(4) << std::setfill('0')
<< std::hex << request_->get_client() << "/" << std::setw(4)
<< std::setfill('0') << std::hex << request_->get_session()
@@ -223,7 +241,9 @@ void big_payload_test_client::run()
GTEST_FATAL_FAILURE_("Didn't receive all replies within time");
} else {
if (test_mode_ == big_payload_test::LIMITED
- || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) {
+ || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) {
ASSERT_EQ(number_of_messages_to_send_ / 4,
number_of_acknowledged_messages_);
} else {
@@ -258,6 +278,10 @@ int main(int argc, char** argv)
test_mode = big_payload_test::test_mode::LIMITED;
} else if (std::string("LIMITEDGENERAL") == std::string(argv[1])) {
test_mode = big_payload_test::test_mode::LIMITED_GENERAL;
+ } else if (std::string("QUEUELIMITEDGENERAL") == std::string(argv[1])) {
+ test_mode = big_payload_test::test_mode::QUEUE_LIMITED_GENERAL;
+ } else if (std::string("QUEUELIMITEDSPECIFIC") == std::string(argv[1])) {
+ test_mode = big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC;
}
}
return RUN_ALL_TESTS();
diff --git a/test/big_payload_tests/big_payload_test_external_starter.sh b/test/big_payload_tests/big_payload_test_external_starter.sh
index f76da1c..6c04a3d 100755
--- a/test/big_payload_tests/big_payload_test_external_starter.sh
+++ b/test/big_payload_tests/big_payload_test_external_starter.sh
@@ -10,7 +10,7 @@
# the testcase simply executes this script. This script then runs client
# and service and checks that both exit successfully.
-if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" && $1 != "LIMITEDGENERAL" ]]
+if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" && $1 != "LIMITEDGENERAL" && $1 != "QUEUELIMITEDGENERAL" && $1 != "QUEUELIMITEDSPECIFIC" ]]
then
echo "The only allowed parameter to this script is RANDOM or LIMITED or LIMITEDGENERAL."
echo "Like $0 RANDOM"
@@ -24,6 +24,10 @@ if [[ $# -gt 0 && $1 == "RANDOM" ]]; then
export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client_random.json
elif [[ $# -gt 0 && $1 == "LIMITEDGENERAL" ]]; then
export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client_limited_general.json
+elif [[ $# -gt 0 && $1 == "QUEUELIMITEDGENERAL" ]]; then
+ export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client_queue_limited_general.json
+elif [[ $# -gt 0 && $1 == "QUEUELIMITEDSPECIFIC" ]]; then
+ export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client_queue_limited_specific.json
else
export VSOMEIP_CONFIGURATION=big_payload_test_tcp_client.json
fi
diff --git a/test/big_payload_tests/big_payload_test_globals.hpp b/test/big_payload_tests/big_payload_test_globals.hpp
index 152217d..5488259 100644
--- a/test/big_payload_tests/big_payload_test_globals.hpp
+++ b/test/big_payload_tests/big_payload_test_globals.hpp
@@ -21,6 +21,8 @@ namespace big_payload_test {
constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_LIMITED = 0x1235;
constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_RANDOM = 0x1236;
constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_LIMITED_GENERAL = 0x1237;
+ constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_GENERAL = 0x1238;
+ constexpr vsomeip::service_t TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_SPECIFIC = 0x1239;
constexpr vsomeip::service_t TEST_SERVICE_INSTANCE_ID = 0x1;
constexpr vsomeip::method_t TEST_SERVICE_METHOD_ID = 0x8421;
@@ -29,6 +31,8 @@ namespace big_payload_test {
RANDOM,
LIMITED,
LIMITED_GENERAL,
+ QUEUE_LIMITED_GENERAL,
+ QUEUE_LIMITED_SPECIFIC,
UNKNOWN
};
}
diff --git a/test/big_payload_tests/big_payload_test_local_queue_limited.json b/test/big_payload_tests/big_payload_test_local_queue_limited.json
new file mode 100644
index 0000000..7252680
--- /dev/null
+++ b/test/big_payload_tests/big_payload_test_local_queue_limited.json
@@ -0,0 +1,42 @@
+{
+ "unicast":"127.0.0.1",
+ "logging":
+ {
+ "level":"error",
+ "console":"true",
+ "file":
+ {
+ "enable":"false",
+ "path":"/tmp/vsomeip.log"
+ },
+ "dlt":"false"
+ },
+ "applications":
+ [
+ {
+ "name":"big_payload_test_service",
+ "id":"0x1277"
+ },
+ {
+ "name":"big_payload_test_client",
+ "id":"0x1344"
+ }
+ ],
+ "services":
+ [
+ {
+ "service":"0x1234",
+ "instance":"0x5678"
+ }
+ ],
+ "endpoint-queue-limit-local" : "614428",
+ "routing":"big_payload_test_service",
+ "service-discovery":
+ {
+ "enable":"true",
+ "multicast":"224.244.224.245",
+ "port":"30490",
+ "protocol":"udp"
+ }
+}
+
diff --git a/test/big_payload_tests/big_payload_test_local_starter.sh b/test/big_payload_tests/big_payload_test_local_starter.sh
index 3d42828..6439cac 100755
--- a/test/big_payload_tests/big_payload_test_local_starter.sh
+++ b/test/big_payload_tests/big_payload_test_local_starter.sh
@@ -10,9 +10,9 @@
# the testcase simply executes this script. This script then runs client
# and service and checks that both exit successfully.
-if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" ]]
+if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" && $1 != "QUEUELIMITEDGENERAL" ]]
then
- echo "The only allowed parameter to this script is RANDOM or LIMITED."
+ echo "The only allowed parameter to this script is RANDOM or LIMITED or QUEUELIMITEDGENERAL."
echo "Like $0 RANDOM"
exit 1
fi
@@ -25,6 +25,8 @@ if [[ $# -gt 0 && $1 == "RANDOM" ]]; then
export VSOMEIP_CONFIGURATION=big_payload_test_local_random.json
elif [[ $# -gt 0 && $1 == "LIMITED" ]]; then
export VSOMEIP_CONFIGURATION=big_payload_test_local_limited.json
+elif [[ $# -gt 0 && $1 == "QUEUELIMITEDGENERAL" ]]; then
+ export VSOMEIP_CONFIGURATION=big_payload_test_local_queue_limited.json
else
export VSOMEIP_CONFIGURATION=big_payload_test_local.json
fi
diff --git a/test/big_payload_tests/big_payload_test_service.cpp b/test/big_payload_tests/big_payload_test_service.cpp
index 34b2d85..4031550 100644
--- a/test/big_payload_tests/big_payload_test_service.cpp
+++ b/test/big_payload_tests/big_payload_test_service.cpp
@@ -28,6 +28,14 @@ big_payload_test_service::big_payload_test_service(big_payload_test::test_mode _
expected_messages_ = big_payload_test::BIG_PAYLOAD_TEST_NUMBER_MESSAGES / 2;
service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_LIMITED_GENERAL;
break;
+ case big_payload_test::test_mode::QUEUE_LIMITED_GENERAL:
+ expected_messages_ = big_payload_test::BIG_PAYLOAD_TEST_NUMBER_MESSAGES / 2;
+ service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_GENERAL;
+ break;
+ case big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC:
+ expected_messages_ = big_payload_test::BIG_PAYLOAD_TEST_NUMBER_MESSAGES / 2;
+ service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID_QUEUE_LIMITED_SPECIFIC;
+ break;
default:
expected_messages_ = big_payload_test::BIG_PAYLOAD_TEST_NUMBER_MESSAGES;
service_id_ = big_payload_test::TEST_SERVICE_SERVICE_ID;
@@ -146,7 +154,9 @@ void big_payload_test_service::on_message(const std::shared_ptr<vsomeip::message
its_payload_data.assign(std::rand() % big_payload_test::BIG_PAYLOAD_SIZE_RANDOM,
big_payload_test::DATA_SERVICE_TO_CLIENT);
} else if (test_mode_ == big_payload_test::test_mode::LIMITED
- || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) {
+ || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) {
if (number_of_received_messages_ % 2) {
// try to send to big response for half of the received messsages.
// this way the client will only get replies for a fourth of his sent
@@ -194,7 +204,9 @@ void big_payload_test_service::run()
}
std::this_thread::sleep_for(std::chrono::seconds(3));
if (test_mode_ == big_payload_test::test_mode::LIMITED
- || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL) {
+ || test_mode_ == big_payload_test::test_mode::LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_GENERAL
+ || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) {
ASSERT_EQ(number_of_received_messages_, expected_messages_);
}
stop();
@@ -223,6 +235,10 @@ int main(int argc, char** argv)
test_mode = big_payload_test::test_mode::LIMITED;
} else if (std::string("LIMITEDGENERAL") == std::string(argv[1])) {
test_mode = big_payload_test::test_mode::LIMITED_GENERAL;
+ } else if (std::string("QUEUELIMITEDGENERAL") == std::string(argv[1])) {
+ test_mode = big_payload_test::test_mode::QUEUE_LIMITED_GENERAL;
+ } else if (std::string("QUEUELIMITEDSPECIFIC") == std::string(argv[1])) {
+ test_mode = big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC;
}
}
return RUN_ALL_TESTS();
diff --git a/test/big_payload_tests/big_payload_test_service_external_start.sh b/test/big_payload_tests/big_payload_test_service_external_start.sh
index 22dd4df..de3224c 100755
--- a/test/big_payload_tests/big_payload_test_service_external_start.sh
+++ b/test/big_payload_tests/big_payload_test_service_external_start.sh
@@ -4,9 +4,9 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
-if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" && $1 != "LIMITEDGENERAL" ]]
+if [[ $# -gt 0 && $1 != "RANDOM" && $1 != "LIMITED" && $1 != "LIMITEDGENERAL" && $1 != "QUEUELIMITEDGENERAL" && $1 != "QUEUELIMITEDSPECIFIC" ]]
then
- echo "The only allowed parameter to this script is RANDOM or LIMITED or LIMITEDGENERAL."
+ echo "The only allowed parameter to this script is RANDOM, LIMITED, LIMITEDGENERAL, QUEUELIMITEDGENERAL or QUEUELIMITEDSPECIFIC"
echo "Like $0 RANDOM"
exit 1
fi
@@ -16,6 +16,10 @@ if [[ $# -gt 0 && $1 == "RANDOM" ]]; then
export VSOMEIP_CONFIGURATION=big_payload_test_tcp_service_random.json
elif [[ $# -gt 0 && $1 == "LIMITEDGENERAL" ]]; then
export VSOMEIP_CONFIGURATION=big_payload_test_tcp_service_limited_general.json
+elif [[ $# -gt 0 && $1 == "QUEUELIMITEDGENERAL" ]]; then
+ export VSOMEIP_CONFIGURATION=big_payload_test_tcp_service_queue_limited_general.json
+elif [[ $# -gt 0 && $1 == "QUEUELIMITEDSPECIFIC" ]]; then
+ export VSOMEIP_CONFIGURATION=big_payload_test_tcp_service_queue_limited_specific.json
else
export VSOMEIP_CONFIGURATION=big_payload_test_tcp_service.json
fi
diff --git a/test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_general.json.in b/test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_general.json.in
new file mode 100644
index 0000000..a193ab8
--- /dev/null
+++ b/test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_general.json.in
@@ -0,0 +1,31 @@
+{
+ "unicast":"@TEST_IP_MASTER@",
+ "netmask":"255.255.255.0",
+ "logging":
+ {
+ "level":"info",
+ "console":"true",
+ "file":
+ {
+ "enable":"true",
+ "path":"/var/log/vsomeip.log"
+ },
+ "dlt":"true"
+ },
+ "applications":
+ [
+ {
+ "name":"big_payload_test_client",
+ "id":"0x1343"
+ }
+ ],
+ "endpoint-queue-limit-external" : "614416",
+ "routing":"big_payload_test_client",
+ "service-discovery":
+ {
+ "enable":"true",
+ "multicast":"224.244.224.245",
+ "port":"30490",
+ "protocol":"udp"
+ }
+}
diff --git a/test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_specific.json.in b/test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_specific.json.in
new file mode 100644
index 0000000..79f3486
--- /dev/null
+++ b/test/big_payload_tests/conf/big_payload_test_tcp_client_queue_limited_specific.json.in
@@ -0,0 +1,43 @@
+{
+ "unicast":"@TEST_IP_MASTER@",
+ "netmask":"255.255.255.0",
+ "logging":
+ {
+ "level":"info",
+ "console":"true",
+ "file":
+ {
+ "enable":"true",
+ "path":"/var/log/vsomeip.log"
+ },
+ "dlt":"true"
+ },
+ "applications":
+ [
+ {
+ "name":"big_payload_test_client",
+ "id":"0x1343"
+ }
+ ],
+ "endpoint-queue-limits" :
+ [
+ {
+ "unicast":"@TEST_IP_SLAVE@",
+ "ports":
+ [
+ {
+ "port":"30509",
+ "queue-size-limit":"614416"
+ }
+ ]
+ }
+ ],
+ "routing":"big_payload_test_client",
+ "service-discovery":
+ {
+ "enable":"true",
+ "multicast":"224.244.224.245",
+ "port":"30490",
+ "protocol":"udp"
+ }
+}
diff --git a/test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_general.json.in b/test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_general.json.in
new file mode 100644
index 0000000..ad5b28c
--- /dev/null
+++ b/test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_general.json.in
@@ -0,0 +1,87 @@
+{
+ "unicast":"@TEST_IP_SLAVE@",
+ "logging":
+ {
+ "level":"debug",
+ "console":"true",
+ "file":
+ {
+ "enable":"false",
+ "path":"/tmp/vsomeip.log"
+ },
+ "dlt":"false"
+ },
+ "applications":
+ [
+ {
+ "name":"big_payload_test_service",
+ "id":"0x1277"
+ }
+ ],
+ "services":
+ [
+ {
+ "service":"0x1234",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1235",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1236",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1237",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1238",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1239",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ }
+ ],
+ "endpoint-queue-limit-external" : "614416",
+ "routing":"big_payload_test_service",
+ "service-discovery":
+ {
+ "enable":"true",
+ "multicast":"224.244.224.245",
+ "port":"30490",
+ "protocol":"udp"
+ }
+}
diff --git a/test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_specific.json.in b/test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_specific.json.in
new file mode 100644
index 0000000..87bcdaa
--- /dev/null
+++ b/test/big_payload_tests/conf/big_payload_test_tcp_service_queue_limited_specific.json.in
@@ -0,0 +1,99 @@
+{
+ "unicast":"@TEST_IP_SLAVE@",
+ "logging":
+ {
+ "level":"debug",
+ "console":"true",
+ "file":
+ {
+ "enable":"false",
+ "path":"/tmp/vsomeip.log"
+ },
+ "dlt":"false"
+ },
+ "applications":
+ [
+ {
+ "name":"big_payload_test_service",
+ "id":"0x1277"
+ }
+ ],
+ "services":
+ [
+ {
+ "service":"0x1234",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1235",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1236",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1237",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1238",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ },
+ {
+ "service":"0x1239",
+ "instance":"0x01",
+ "reliable":
+ {
+ "port":"30509",
+ "enable-magic-cookies":"false"
+ }
+ }
+ ],
+ "endpoint-queue-limits" :
+ [
+ {
+ "unicast":"@TEST_IP_SLAVE@",
+ "ports":
+ [
+ {
+ "port":"30509",
+ "queue-size-limit":"614416"
+ }
+ ]
+ }
+ ],
+ "routing":"big_payload_test_service",
+ "service-discovery":
+ {
+ "enable":"true",
+ "multicast":"224.244.224.245",
+ "port":"30490",
+ "protocol":"udp"
+ }
+}