summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2017-02-28 03:59:50 -0800
committerJuergen Gehring <juergen.gehring@bmw.de>2017-02-28 03:59:50 -0800
commit199b12ef4b4abe120624a757fcbe8f8cd54c3712 (patch)
tree0c2c1be255333ad87149034e4be5de7b12b74d93
parent07d7573c007322be07689575ce5d73c45f030d6d (diff)
downloadvSomeIP-199b12ef4b4abe120624a757fcbe8f8cd54c3712.tar.gz
vSomeIP 2.6.02.6.0
-rw-r--r--CHANGES9
-rw-r--r--CMakeLists.txt8
-rw-r--r--documentation/vsomeipUserGuide9
-rw-r--r--implementation/configuration/include/internal.hpp.in17
-rw-r--r--implementation/endpoints/include/endpoint.hpp2
-rw-r--r--implementation/endpoints/include/local_client_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/include/virtual_server_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/src/local_client_endpoint_impl.cpp25
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp17
-rw-r--r--implementation/endpoints/src/tcp_server_endpoint_impl.cpp4
-rw-r--r--implementation/endpoints/src/udp_client_endpoint_impl.cpp8
-rw-r--r--implementation/endpoints/src/virtual_server_endpoint_impl.cpp4
-rw-r--r--implementation/logging/src/dlt_sink_backend.cpp7
-rw-r--r--implementation/routing/include/event.hpp17
-rw-r--r--implementation/routing/include/routing_manager.hpp8
-rw-r--r--implementation/routing/include/routing_manager_base.hpp47
-rw-r--r--implementation/routing/include/routing_manager_impl.hpp26
-rw-r--r--implementation/routing/include/routing_manager_proxy.hpp58
-rw-r--r--implementation/routing/include/routing_manager_stub.hpp35
-rw-r--r--implementation/routing/include/routing_manager_stub_host.hpp8
-rw-r--r--implementation/routing/src/event.cpp79
-rw-r--r--implementation/routing/src/routing_manager_base.cpp372
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp312
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp417
-rw-r--r--implementation/routing/src/routing_manager_stub.cpp340
-rw-r--r--implementation/runtime/include/application_impl.hpp16
-rw-r--r--implementation/runtime/src/application_impl.cpp311
-rw-r--r--implementation/service_discovery/include/defines.hpp2
-rw-r--r--implementation/service_discovery/include/service_discovery.hpp2
-rw-r--r--implementation/service_discovery/include/service_discovery_host.hpp6
-rw-r--r--implementation/service_discovery/include/service_discovery_impl.hpp33
-rw-r--r--implementation/service_discovery/include/subscription.hpp9
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp400
-rw-r--r--implementation/service_discovery/src/subscription.cpp14
-rw-r--r--implementation/utility/include/utility.hpp1
-rw-r--r--implementation/utility/src/utility.cpp71
-rw-r--r--interface/vsomeip/application.hpp16
-rw-r--r--interface/vsomeip/constants.hpp1
-rw-r--r--test/CMakeLists.txt98
-rw-r--r--test/application_tests/application_test.cpp2
-rwxr-xr-xtest/big_payload_tests/big_payload_test_external_starter.sh15
-rwxr-xr-xtest/client_id_tests/client_id_test_master_starter.sh9
-rw-r--r--test/client_id_tests/client_id_test_service.cpp1
-rw-r--r--test/cpu_load_tests/cpu_load_test_client.cpp19
-rwxr-xr-xtest/cpu_load_tests/cpu_load_test_master_starter.sh8
-rw-r--r--test/cpu_load_tests/cpu_load_test_service.cpp15
-rw-r--r--test/initial_event_tests/initial_event_test_client.cpp180
-rwxr-xr-xtest/initial_event_tests/initial_event_test_master_starter.sh78
-rw-r--r--test/initial_event_tests/initial_event_test_service.cpp41
-rwxr-xr-xtest/initial_event_tests/initial_event_test_slave_starter.sh59
-rwxr-xr-xtest/magic_cookies_tests/magic_cookies_test_starter.sh4
-rwxr-xr-xtest/offer_tests/conf/offer_test_external_master_starter.sh.in18
-rwxr-xr-xtest/payload_tests/external_local_payload_test_client_external_starter.sh8
-rwxr-xr-xtest/payload_tests/external_local_payload_test_client_local_and_external_starter.sh9
-rwxr-xr-xtest/payload_tests/external_local_payload_test_client_local_starter.sh4
-rwxr-xr-xtest/routing_tests/external_local_routing_test_starter.sh13
-rw-r--r--test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_master.json.in2
-rw-r--r--test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_slave.json.in2
-rwxr-xr-xtest/subscribe_notify_one_tests/subscribe_notify_one_test_master_starter.sh9
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_master.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_slave.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_master.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_master.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_slave.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_slave.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_master.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_slave.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_master.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_slave.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_master.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_slave.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_master.json.in2
-rw-r--r--test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_slave.json.in2
-rwxr-xr-xtest/subscribe_notify_tests/subscribe_notify_test_master_starter.sh13
74 files changed, 2235 insertions, 1115 deletions
diff --git a/CHANGES b/CHANGES
index 724b151..7ca9b53 100644
--- a/CHANGES
+++ b/CHANGES
@@ -276,3 +276,12 @@ v2.5.3
- Fixed initial events on unsubscription
- Improved dispatcher handling for blocking calls
- Crashed applications are now automatically unsubscribed
+
+v2.6.0
+- Fixed races and crashes
+- Fixed repetition phase timings for find service messages
+- Reworked internal event/field distribution to reduce CPU load
+- Reworked internal routing info distribution leading to fewer and smaller
+ messages and lower CPU load
+- Extend public application interface with second unsubscribe method with
+ additional event parameter
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b981997..10ed61c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -7,8 +7,8 @@ cmake_minimum_required (VERSION 2.8.12)
project (vsomeip)
set (VSOMEIP_MAJOR_VERSION 2)
-set (VSOMEIP_MINOR_VERSION 5)
-set (VSOMEIP_PATCH_VERSION 3)
+set (VSOMEIP_MINOR_VERSION 6)
+set (VSOMEIP_PATCH_VERSION 0)
set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION})
set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in
set (CMAKE_VERBOSE_MAKEFILE off)
@@ -200,6 +200,10 @@ set (VSOMEIP_ROUTING "vsomeipd")
if (ROUTING)
set (VSOMEIP_ROUTING ${ROUTING})
endif ()
+set (VSOMEIP_ROUTING_READY_MESSAGE "SOME/IP routing ready.")
+if (ROUTING_READY_MESSAGE)
+set (VSOMEIP_ROUTING_READY_MESSAGE ${ROUTING_READY_MESSAGE})
+endif ()
message("Predefined unicast address: ${VSOMEIP_UNICAST_ADDRESS}")
message("Predefined diagnosis address: ${VSOMEIP_DIAGNOSIS_ADDRESS}")
message("Predefined routing application: ${VSOMEIP_ROUTING}")
diff --git a/documentation/vsomeipUserGuide b/documentation/vsomeipUserGuide
index ee972bf..4bc644d 100644
--- a/documentation/vsomeipUserGuide
+++ b/documentation/vsomeipUserGuide
@@ -107,6 +107,15 @@ cmake -DENABLE_SIGNAL_HANDLING=1 ..
In the default setting, the application has to take care of shutting
down vsomeip in case these signals are received.
+Compilation with user defined "READY" message
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+To compile vsomeip with a user defined message signal the IP routing
+to be ready to send/receive messages, call cmake like:
+[source,bash]
+----
+cmake -DROUTING_READY_MESSAGE=<YOUR MESSAGE> ..
+----
+
Compilation of examples
^^^^^^^^^^^^^^^^^^^^^^^
For compilation of the examples call:
diff --git a/implementation/configuration/include/internal.hpp.in b/implementation/configuration/include/internal.hpp.in
index 72f1bd2..686ea8e 100644
--- a/implementation/configuration/include/internal.hpp.in
+++ b/implementation/configuration/include/internal.hpp.in
@@ -94,10 +94,10 @@
#define VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE 17
#define VSOMEIP_RELEASE_SERVICE_COMMAND_SIZE 11
#define VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE 16
-#define VSOMEIP_SUBSCRIBE_COMMAND_SIZE 16
-#define VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE 15
-#define VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE 15
-#define VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE 14
+#define VSOMEIP_SUBSCRIBE_COMMAND_SIZE 18
+#define VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE 17
+#define VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE 17
+#define VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE 16
#define VSOMEIP_REGISTER_EVENT_COMMAND_SIZE 15
#define VSOMEIP_UNREGISTER_EVENT_COMMAND_SIZE 14
#define VSOMEIP_ID_RESPONSE_COMMAND_SIZE 12
@@ -115,8 +115,17 @@
#define VSOMEIP_MAX_CLIENTS 255
+#define VSOMEIP_ROUTING_READY_MESSAGE "@VSOMEIP_ROUTING_READY_MESSAGE@"
+
namespace vsomeip {
+typedef enum {
+ RIE_ADD_CLIENT = 0x0,
+ RIE_ADD_SERVICE_INSTANCE = 0x1,
+ RIE_DEL_SERVICE_INSTANCE = 0x2,
+ RIE_DEL_CLIENT = 0x3,
+} routing_info_entry_e;
+
struct configuration_data_t {
#ifndef _WIN32
volatile char initialized_;
diff --git a/implementation/endpoints/include/endpoint.hpp b/implementation/endpoints/include/endpoint.hpp
index 1b08e77..39b018e 100644
--- a/implementation/endpoints/include/endpoint.hpp
+++ b/implementation/endpoints/include/endpoint.hpp
@@ -44,6 +44,8 @@ public:
virtual void increment_use_count() = 0;
virtual void decrement_use_count() = 0;
virtual uint32_t get_use_count() = 0;
+
+ virtual void restart() = 0;
};
} // namespace vsomeip
diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp
index bd82b2c..46d5c1d 100644
--- a/implementation/endpoints/include/local_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp
@@ -49,6 +49,8 @@ public:
void register_error_handler(error_handler_t _error_handler);
+ void restart();
+
private:
void send_queued();
diff --git a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
index 0c09df3..8237e30 100644
--- a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
@@ -49,6 +49,8 @@ public:
void decrement_use_count();
uint32_t get_use_count();
+ void restart();
+
private:
std::string address_;
uint16_t port_;
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp
index 5e847c5..6535ec6 100644
--- a/implementation/endpoints/src/local_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp
@@ -43,21 +43,16 @@ bool local_client_endpoint_impl::is_local() const {
return true;
}
-void local_client_endpoint_impl::start() {
- bool is_open(false);
+void local_client_endpoint_impl::restart() {
{
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- is_open = socket_.is_open();
- }
- if (is_open) {
- {
- std::lock_guard<std::mutex> its_lock(mutex_);
- sending_blocked_ = false;
- }
- restart();
- } else {
- connect();
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ sending_blocked_ = false;
}
+ client_endpoint_impl::restart();
+}
+
+void local_client_endpoint_impl::start() {
+ connect();
}
void local_client_endpoint_impl::connect() {
@@ -69,6 +64,10 @@ void local_client_endpoint_impl::connect() {
if (!its_error || its_error == boost::asio::error::already_open) {
socket_.set_option(boost::asio::socket_base::reuse_address(true), its_error);
+ if (its_error) {
+ VSOMEIP_WARNING << "local_client_endpoint_impl::connect: "
+ << "couldn't enable SO_REUSEADDR: " << its_error.message();
+ }
socket_.connect(remote_, its_connect_error);
// Credentials
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index e4c3b91..916c78a 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -62,12 +62,19 @@ void tcp_client_endpoint_impl::connect() {
if (!its_error || its_error == boost::asio::error::already_open) {
// Nagle algorithm off
socket_.set_option(ip::tcp::no_delay(true), its_error);
+ if (its_error) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't disable "
+ << "Nagle algorithm: " << its_error.message();
+ }
// Enable SO_REUSEADDR to avoid bind problems with services going offline
// and coming online again and the user has specified only a small number
// of ports in the clients section for one service instance
socket_.set_option(boost::asio::socket_base::reuse_address(true), its_error);
-
+ if (its_error) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
+ << "SO_REUSEADDR: " << its_error.message();
+ }
// In case a client endpoint port was configured,
// bind to it before connecting
if (local_.port() != ILLEGAL_PORT) {
@@ -178,7 +185,13 @@ unsigned short tcp_client_endpoint_impl::get_local_port() const {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
if (socket_.is_open()) {
- return socket_.local_endpoint(its_error).port();
+ endpoint_type its_endpoint = socket_.local_endpoint(its_error);
+ if (!its_error) {
+ return its_endpoint.port();
+ } else {
+ VSOMEIP_WARNING << "tcp_client_endpoint_impl::get_local_port() "
+ << " couldn't get local_endpoint: " << its_error.message();
+ }
}
return 0;
}
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
index 684f735..39f2802 100644
--- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
@@ -298,8 +298,8 @@ void tcp_server_endpoint_impl::connection::send_queued(
void tcp_server_endpoint_impl::connection::send_magic_cookie(
message_buffer_ptr_t &_buffer) {
- if (recv_buffer_size_initial_ == MESSAGE_SIZE_UNLIMITED
- || recv_buffer_size_initial_ - _buffer->size() >=
+ if (max_message_size_ == MESSAGE_SIZE_UNLIMITED
+ || max_message_size_ - _buffer->size() >=
VSOMEIP_SOMEIP_HEADER_SIZE + VSOMEIP_SOMEIP_MAGIC_COOKIE_SIZE) {
_buffer->insert(_buffer->begin(), SERVICE_COOKIE,
SERVICE_COOKIE + sizeof(SERVICE_COOKIE));
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
index 579805e..66aab17 100644
--- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
@@ -137,7 +137,13 @@ unsigned short udp_client_endpoint_impl::get_local_port() const {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
if (socket_.is_open()) {
- return socket_.local_endpoint(its_error).port();
+ endpoint_type its_endpoint = socket_.local_endpoint(its_error);
+ if (!its_error) {
+ return its_endpoint.port();
+ } else {
+ VSOMEIP_WARNING << "udp_client_endpoint_impl::get_local_port() "
+ << " couldn't get local_endpoint: " << its_error.message();
+ }
}
return 0;
}
diff --git a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
index 4d83f62..4ea9ed2 100644
--- a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
@@ -109,4 +109,8 @@ uint32_t virtual_server_endpoint_impl::get_use_count() {
return use_count_;
}
+void virtual_server_endpoint_impl::restart() {
+
+}
+
} // namespace vsomeip
diff --git a/implementation/logging/src/dlt_sink_backend.cpp b/implementation/logging/src/dlt_sink_backend.cpp
index 9f37150..5b140fd 100644
--- a/implementation/logging/src/dlt_sink_backend.cpp
+++ b/implementation/logging/src/dlt_sink_backend.cpp
@@ -37,9 +37,10 @@ BOOST_LOG_ATTRIBUTE_KEYWORD(severity, "Severity", logging::trivial::severity_lev
void dlt_sink_backend::consume(const logging::record_view &rec) {
#ifdef USE_DLT
- std::string message = *rec[expressions::smessage];
- logging::trivial::severity_level severity_level = *rec[severity];
- DLT_LOG_STRING(dlt_, level_as_dlt(severity_level), message.c_str());
+ auto message = rec[expressions::smessage];
+ auto severity_level = rec[severity];
+ DLT_LOG_STRING(dlt_, (severity_level)?level_as_dlt(*severity_level):DLT_LOG_WARN,
+ (message)?message.get<std::string>().c_str():"consume() w/o message");
#else
(void)rec;
#endif
diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp
index e127807..93eee34 100644
--- a/implementation/routing/include/event.hpp
+++ b/implementation/routing/include/event.hpp
@@ -77,13 +77,20 @@ public:
// SIP_RPC_359 (epsilon change)
void set_epsilon_change_function(const epsilon_change_func_t &_epsilon_change_func);
- const std::set<eventgroup_t> & get_eventgroups() const;
+ const std::set<eventgroup_t> get_eventgroups() const;
+ std::set<eventgroup_t> get_eventgroups(client_t _client) const;
void add_eventgroup(eventgroup_t _eventgroup);
void set_eventgroups(const std::set<eventgroup_t> &_eventgroups);
void notify_one(const std::shared_ptr<endpoint_definition> &_target, bool _flush);
void notify_one(client_t _client, bool _flush);
+ bool add_subscriber(eventgroup_t _eventgroup, client_t _client);
+ void remove_subscriber(eventgroup_t _eventgroup, client_t _client);
+ bool has_subscriber(eventgroup_t _eventgroup, client_t _client);
+ std::set<client_t> get_subscribers();
+ void clear_subscribers();
+
void add_ref(client_t _client, bool _is_provided);
void remove_ref(client_t _client, bool _is_provided);
bool has_ref();
@@ -96,6 +103,8 @@ public:
bool has_ref(client_t _client, bool _is_provided);
+ std::set<client_t> get_subscribers(eventgroup_t _eventgroup);
+
private:
void update_cbk(boost::system::error_code const &_error);
void notify(bool _flush);
@@ -120,11 +129,10 @@ private:
std::chrono::milliseconds cycle_;
std::atomic<bool> change_resets_cycle_;
-
std::atomic<bool> is_updating_on_change_;
- std::mutex eventgroups_mutex_;
- std::set<eventgroup_t> eventgroups_;
+ mutable std::mutex eventgroups_mutex_;
+ std::map<eventgroup_t, std::set<client_t>> eventgroups_;
std::atomic<bool> is_set_;
std::atomic<bool> is_provided_;
@@ -133,7 +141,6 @@ private:
std::map<client_t, std::map<bool, uint32_t>> refs_;
std::atomic<bool> is_shadow_;
-
std::atomic<bool> is_cache_placeholder_;
epsilon_change_func_t epsilon_change_func_;
diff --git a/implementation/routing/include/routing_manager.hpp b/implementation/routing/include/routing_manager.hpp
index 4911701..8c408c3 100644
--- a/implementation/routing/include/routing_manager.hpp
+++ b/implementation/routing/include/routing_manager.hpp
@@ -52,11 +52,11 @@ public:
virtual void subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major,
+ major_version_t _major, event_t _event,
subscription_type_e _subscription_type) = 0;
virtual void unsubscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) = 0;
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0;
virtual bool send(client_t _client, std::shared_ptr<message> _message,
bool _flush) = 0;
@@ -81,7 +81,7 @@ public:
virtual void unregister_event(client_t _client, service_t _service,
instance_t _instance, event_t _event, bool _is_provided) = 0;
- virtual std::shared_ptr<event> get_event(service_t _service,
+ virtual std::shared_ptr<event> find_event(service_t _service,
instance_t _instance, event_t _event) const = 0;
virtual std::set<std::shared_ptr<event>> find_events(service_t _service,
@@ -99,6 +99,8 @@ public:
instance_t _instance, bool _reliable) = 0;
virtual void set_routing_state(routing_state_e _routing_state) = 0;
+
+
};
} // namespace vsomeip
diff --git a/implementation/routing/include/routing_manager_base.hpp b/implementation/routing/include/routing_manager_base.hpp
index d3c0e04..625ed6a 100644
--- a/implementation/routing/include/routing_manager_base.hpp
+++ b/implementation/routing/include/routing_manager_base.hpp
@@ -73,19 +73,16 @@ public:
virtual void unregister_event(client_t _client, service_t _service, instance_t _instance,
event_t _event, bool _is_provided);
- virtual std::shared_ptr<event> get_event(service_t _service,
- instance_t _instance, event_t _event) const;
-
virtual std::set<std::shared_ptr<event>> find_events(service_t _service,
instance_t _instance, eventgroup_t _eventgroup) const;
virtual void subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major,
+ major_version_t _major, event_t _event,
subscription_type_e _subscription_type);
virtual void unsubscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup);
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event);
virtual void notify(service_t _service, instance_t _instance,
event_t _event, std::shared_ptr<payload> _payload,
@@ -119,6 +116,9 @@ public:
virtual void set_routing_state(routing_state_e _routing_state) = 0;
+ virtual std::shared_ptr<event> find_event(service_t _service, instance_t _instance,
+ event_t _event) const;
+
protected:
std::shared_ptr<serviceinfo> find_service(service_t _service, instance_t _instance) const;
std::shared_ptr<serviceinfo> create_service_info(service_t _service,
@@ -139,14 +139,9 @@ protected:
std::unordered_set<client_t> get_connected_clients();
- std::shared_ptr<event> find_event(service_t _service, instance_t _instance,
- event_t _event) const;
std::shared_ptr<eventgroupinfo> find_eventgroup(service_t _service,
instance_t _instance, eventgroup_t _eventgroup) const;
- std::set<client_t> find_local_clients(service_t _service,
- instance_t _instance, eventgroup_t _eventgroup);
-
void remove_eventgroup_info(service_t _service, instance_t _instance,
eventgroup_t _eventgroup);
@@ -160,7 +155,7 @@ protected:
bool _flush, bool _reliable, uint8_t _command) const;
bool insert_subscription(service_t _service, instance_t _instance,
- eventgroup_t _eventgroup, client_t _client);
+ eventgroup_t _eventgroup, event_t _event, client_t _client);
std::shared_ptr<deserializer> get_deserializer();
void put_deserializer(std::shared_ptr<deserializer>);
@@ -170,9 +165,11 @@ protected:
virtual void send_subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major, subscription_type_e _subscription_type) = 0;
+ major_version_t _major, event_t _event,
+ subscription_type_e _subscription_type) = 0;
- void remove_pending_subscription(service_t _service, instance_t _instance);
+ void remove_pending_subscription(service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup, event_t _event);
void send_pending_notify_ones(service_t _service, instance_t _instance,
eventgroup_t _eventgroup, client_t _client);
@@ -181,6 +178,10 @@ protected:
void unset_all_eventpayloads(service_t _service, instance_t _instance,
eventgroup_t _eventgroup);
+ void notify_one_current_value(client_t _client, service_t _service,
+ instance_t _instance,
+ eventgroup_t _eventgroup, event_t _event);
+
private:
std::shared_ptr<endpoint> create_local_unlocked(client_t _client);
std::shared_ptr<endpoint> find_local_unlocked(client_t _client);
@@ -188,6 +189,9 @@ private:
std::set<std::tuple<service_t, instance_t, eventgroup_t>>
get_subscriptions(const client_t _client);
+ virtual bool create_placeholder_event_and_subscribe(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event, client_t _client) = 0;
protected:
routing_manager_host *host_;
boost::asio::io_service &io_;
@@ -209,34 +213,37 @@ protected:
std::map<service_t,
std::map<instance_t,
std::map<eventgroup_t, std::shared_ptr<eventgroupinfo> > > > eventgroups_;
+ // Events (part of one or more eventgroups)
mutable std::mutex events_mutex_;
std::map<service_t,
std::map<instance_t, std::map<event_t, std::shared_ptr<event> > > > events_;
- std::mutex eventgroup_clients_mutex_;
- std::map<service_t,
- std::map<instance_t, std::map<eventgroup_t, std::set<client_t> > > > eventgroup_clients_;
#ifdef USE_DLT
std::shared_ptr<tc::trace_connector> tc_;
#endif
- struct eventgroup_data_t {
+ struct subscription_data_t {
service_t service_;
instance_t instance_;
eventgroup_t eventgroup_;
major_version_t major_;
+ event_t event_;
subscription_type_e subscription_type_;
- bool operator<(const eventgroup_data_t &_other) const {
+ bool operator<(const subscription_data_t &_other) const {
return (service_ < _other.service_
|| (service_ == _other.service_
&& instance_ < _other.instance_)
|| (service_ == _other.service_
&& instance_ == _other.instance_
- && eventgroup_ < _other.eventgroup_));
+ && eventgroup_ < _other.eventgroup_)
+ || (service_ == _other.service_
+ && instance_ == _other.instance_
+ && eventgroup_ == _other.eventgroup_
+ && event_ < _other.event_));
}
};
- std::set<eventgroup_data_t> pending_subscriptions_;
+ std::set<subscription_data_t> pending_subscriptions_;
private:
services_t services_;
diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp
index d29a3d7..2992c9e 100644
--- a/implementation/routing/include/routing_manager_impl.hpp
+++ b/implementation/routing/include/routing_manager_impl.hpp
@@ -74,11 +74,11 @@ public:
instance_t _instance);
void subscribe(client_t _client, service_t _service, instance_t _instance,
- eventgroup_t _eventgroup, major_version_t _major,
+ eventgroup_t _eventgroup, major_version_t _major, event_t _event,
subscription_type_e _subscription_type);
void unsubscribe(client_t _client, service_t _service, instance_t _instance,
- eventgroup_t _eventgroup);
+ eventgroup_t _eventgroup, event_t _event);
bool send(client_t _client, std::shared_ptr<message> _message, bool _flush);
@@ -110,18 +110,15 @@ public:
instance_t _instance, event_t _event,
bool _is_provided);
- void notify(service_t _service, instance_t _instance, event_t _event,
- std::shared_ptr<payload> _payload, bool _force, bool _flush);
-
void notify_one(service_t _service, instance_t _instance,
event_t _event, std::shared_ptr<payload> _payload,
client_t _client, bool _force, bool _flush);
void on_subscribe_nack(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup);
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event);
void on_subscribe_ack(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup);
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event);
void on_identify_response(client_t _client, service_t _service, instance_t _instance,
bool _reliable);
@@ -134,6 +131,7 @@ public:
client_t _client) {
return routing_manager_base::find_or_create_local(_client);
}
+
void remove_local(client_t _client);
void on_stop_offer_service(client_t _client, service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor);
@@ -253,6 +251,8 @@ private:
bool is_identifying(client_t _client, service_t _service,
instance_t _instance, bool _reliable);
+ std::set<eventgroup_t> get_subscribed_eventgroups(service_t _service,
+ instance_t _instance);
private:
return_code_e check_error(const byte_t *_data, length_t _size,
instance_t _instance);
@@ -305,7 +305,8 @@ private:
void send_subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major, subscription_type_e _subscription_type);
+ major_version_t _major, event_t _event,
+ subscription_type_e _subscription_type);
void on_net_if_state_changed(std::string _if, bool _available);
@@ -317,6 +318,15 @@ private:
void requested_service_remove(client_t _client, service_t _service,
instance_t _instance);
+ void call_sd_reliable_endpoint_connected(service_t _service, instance_t _instance,
+ std::shared_ptr<endpoint> _endpoint);
+
+ bool create_placeholder_event_and_subscribe(service_t _service,
+ instance_t _instance,
+ eventgroup_t _eventgroup,
+ event_t _event,
+ client_t _client);
+
std::shared_ptr<routing_manager_stub> stub_;
std::shared_ptr<sd::service_discovery> discovery_;
diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp
index 27c071d..e725913 100644
--- a/implementation/routing/include/routing_manager_proxy.hpp
+++ b/implementation/routing/include/routing_manager_proxy.hpp
@@ -9,6 +9,7 @@
#include <map>
#include <mutex>
#include <atomic>
+#include <tuple>
#include <boost/asio/io_service.hpp>
#include <boost/asio/steady_timer.hpp>
@@ -51,11 +52,11 @@ public:
instance_t _instance);
void subscribe(client_t _client, service_t _service, instance_t _instance,
- eventgroup_t _eventgroup, major_version_t _major,
+ eventgroup_t _eventgroup, major_version_t _major, event_t _event,
subscription_type_e _subscription_type);
void unsubscribe(client_t _client, service_t _service, instance_t _instance,
- eventgroup_t _eventgroup);
+ eventgroup_t _eventgroup, event_t _event);
bool send(client_t _client, const byte_t *_data, uint32_t _size,
instance_t _instance, bool _flush = true, bool _reliable = false);
@@ -113,13 +114,14 @@ private:
bool _is_field, bool _is_provided);
void send_subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major, subscription_type_e _subscription_type);
+ major_version_t _major, event_t _event,
+ subscription_type_e _subscription_type);
void send_subscribe_nack(client_t _subscriber, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup);
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event);
void send_subscribe_ack(client_t _subscriber, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup);
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event);
bool is_field(service_t _service, instance_t _instance,
event_t _event) const;
@@ -155,6 +157,12 @@ private:
bool is_client_known(client_t _client);
+ bool create_placeholder_event_and_subscribe(service_t _service,
+ instance_t _instance,
+ eventgroup_t _eventgroup,
+ event_t _event,
+ client_t _client);
+
private:
enum class inner_state_type_e : std::uint8_t {
ST_REGISTERED = 0x0,
@@ -197,42 +205,16 @@ private:
std::set<eventgroup_t> eventgroups_;
bool operator<(const event_data_t &_other) const {
- if (service_ < _other.service_) {
- return true;
- }
- if (service_ == _other.service_ && instance_ < _other.instance_) {
- return true;
- }
- if (service_ == _other.service_ && instance_ == _other.instance_
- && event_ < _other.event_) {
- return true;
- }
- if (service_ == _other.service_ && instance_ == _other.instance_
- && event_ == _other.event_
- && is_provided_ != _other.is_provided_) {
- return true;
- }
- if (service_ == _other.service_
- && instance_ == _other.instance_
- && event_ == _other.event_
- && is_provided_ == _other.is_provided_
- && is_field_ != _other.is_field_) {
- return true;
- }
- if (service_ == _other.service_
- && instance_ == _other.instance_
- && event_ == _other.event_
- && is_provided_ == _other.is_provided_
- && is_field_ == _other.is_field_
- && eventgroups_ < _other.eventgroups_) {
- return true;
- }
- return false;
- }
+ return std::tie(service_, instance_, event_, is_field_,
+ is_provided_, eventgroups_)
+ < std::tie(_other.service_, _other.instance_, _other.event_,
+ _other.is_field_, _other.is_provided_,
+ _other.eventgroups_);
+ }
};
std::set<event_data_t> pending_event_registrations_;
- std::map<client_t, std::set<eventgroup_data_t>> pending_ingoing_subscripitons_;
+ std::map<client_t, std::set<subscription_data_t>> pending_ingoing_subscripitons_;
std::mutex pending_ingoing_subscripitons_mutex_;
std::mutex deserialize_mutex_;
diff --git a/implementation/routing/include/routing_manager_stub.hpp b/implementation/routing/include/routing_manager_stub.hpp
index 3cf89a7..819352f 100644
--- a/implementation/routing/include/routing_manager_stub.hpp
+++ b/implementation/routing/include/routing_manager_stub.hpp
@@ -58,20 +58,20 @@ public:
instance_t _instance, major_version_t _major, minor_version_t _minor);
void send_subscribe(std::shared_ptr<vsomeip::endpoint> _target,
- client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major, bool _is_remote_subscriber);
+ client_t _client, service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup, major_version_t _major,
+ event_t _event, bool _is_remote_subscriber);
void send_unsubscribe(std::shared_ptr<vsomeip::endpoint> _target,
client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- bool _is_remote_subscriber);
+ event_t _event, bool _is_remote_subscriber);
void send_subscribe_nack(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup);
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event);
void send_subscribe_ack(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup);
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event);
bool contained_in_routing_info(client_t _client, service_t _service,
instance_t _instance, major_version_t _major,
@@ -82,6 +82,9 @@ public:
bool is_registered(client_t _client) const;
void deregister_erroneous_client(client_t _client);
client_t get_client() const;
+ void on_request_service(client_t _client, service_t _service,
+ instance_t _instance, major_version_t _major,
+ minor_version_t _minor);
#ifndef _WIN32
virtual bool check_credentials(client_t _client, uid_t _uid, gid_t _gid);
#endif
@@ -91,9 +94,18 @@ private:
void on_register_application(client_t _client);
void on_deregister_application(client_t _client);
- void broadcast_routing_info(bool _empty = false,
- client_t _ignore = VSOMEIP_ROUTING_CLIENT);
- void send_routing_info(client_t _client, bool _empty = false);
+ void broadcast_routing_stop();
+
+ void send_routing_info_delta(client_t _target, routing_info_entry_e _entry,
+ client_t _client, service_t _service = ANY_SERVICE,
+ instance_t _instance = ANY_INSTANCE,
+ major_version_t _major = ANY_MAJOR,
+ minor_version_t _minor = ANY_MINOR);
+
+ void inform_requesters(client_t _hoster, service_t _service,
+ instance_t _instance, major_version_t _major,
+ minor_version_t _minor, routing_info_entry_e _entry,
+ bool _inform_service);
void broadcast_ping() const;
void on_pong(client_t _client);
@@ -109,6 +121,8 @@ private:
(void)_routing_state;
};
+ bool is_already_connected(client_t _source, client_t _sink);
+
private:
routing_manager_stub_host *host_;
boost::asio::io_service &io_;
@@ -145,6 +159,9 @@ private:
boost::asio::steady_timer pinged_clients_timer_;
std::mutex pinged_clients_mutex_;
std::map<client_t, boost::asio::steady_timer::time_point> pinged_clients_;
+
+ std::map<client_t, std::map<service_t, std::map<instance_t, std::pair<major_version_t, minor_version_t> > > > service_requests_;
+ std::map<client_t, std::set<client_t>> connection_matrix_;
};
} // namespace vsomeip
diff --git a/implementation/routing/include/routing_manager_stub_host.hpp b/implementation/routing/include/routing_manager_stub_host.hpp
index cb8e83c..1606439 100644
--- a/implementation/routing/include/routing_manager_stub_host.hpp
+++ b/implementation/routing/include/routing_manager_stub_host.hpp
@@ -40,17 +40,17 @@ public:
virtual void subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major,
+ major_version_t _major, event_t _event,
subscription_type_e _subscription_type) = 0;
virtual void on_subscribe_nack(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) = 0;
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0;
virtual void on_subscribe_ack(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) = 0;
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0;
virtual void unsubscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) = 0;
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0;
virtual void on_message(service_t _service, instance_t _instance,
const byte_t *_data, length_t _size, bool _reliable) = 0;
diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp
index 5ede55e..023987f 100644
--- a/implementation/routing/src/event.cpp
+++ b/implementation/routing/src/event.cpp
@@ -3,6 +3,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#include <vsomeip/constants.hpp>
#include <vsomeip/defines.hpp>
#include <vsomeip/message.hpp>
#include <vsomeip/payload.hpp>
@@ -197,18 +198,38 @@ void event::set_epsilon_change_function(const epsilon_change_func_t &_epsilon_ch
}
}
-const std::set<eventgroup_t> & event::get_eventgroups() const {
- return (eventgroups_);
+const std::set<eventgroup_t> event::get_eventgroups() const {
+ std::set<eventgroup_t> its_eventgroups;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ for (const auto e : eventgroups_) {
+ its_eventgroups.insert(e.first);
+ }
+ }
+ return its_eventgroups;
+}
+
+std::set<eventgroup_t> event::get_eventgroups(client_t _client) const {
+ std::set<eventgroup_t> its_eventgroups;
+
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ for (auto e : eventgroups_) {
+ if (e.second.find(_client) != e.second.end())
+ its_eventgroups.insert(e.first);
+ }
+ return its_eventgroups;
}
void event::add_eventgroup(eventgroup_t _eventgroup) {
std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- eventgroups_.insert(_eventgroup);
+ if (eventgroups_.find(_eventgroup) == eventgroups_.end())
+ eventgroups_[_eventgroup] = std::set<client_t>();
}
void event::set_eventgroups(const std::set<eventgroup_t> &_eventgroups) {
std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- eventgroups_ = _eventgroups;
+ for (auto e : _eventgroups)
+ eventgroups_[e] = std::set<client_t>();
}
void event::update_cbk(boost::system::error_code const &_error) {
@@ -308,6 +329,46 @@ bool event::has_ref() {
return refs_.size() != 0;
}
+bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client) {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ return eventgroups_[_eventgroup].insert(_client).second;
+}
+
+void event::remove_subscriber(eventgroup_t _eventgroup, client_t _client) {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ auto find_eventgroup = eventgroups_.find(_eventgroup);
+ if (find_eventgroup != eventgroups_.end())
+ find_eventgroup->second.erase(_client);
+}
+
+bool event::has_subscriber(eventgroup_t _eventgroup, client_t _client) {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ auto find_eventgroup = eventgroups_.find(_eventgroup);
+ if (find_eventgroup != eventgroups_.end()) {
+ if (_client == ANY_CLIENT) {
+ return (find_eventgroup->second.size() > 0);
+ } else {
+ return (find_eventgroup->second.find(_client)
+ != find_eventgroup->second.end());
+ }
+ }
+ return false;
+}
+
+std::set<client_t> event::get_subscribers() {
+ std::set<client_t> its_subscribers;
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ for (auto e : eventgroups_)
+ its_subscribers.insert(e.second.begin(), e.second.end());
+ return its_subscribers;
+}
+
+void event::clear_subscribers() {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ for (auto e : eventgroups_)
+ e.second.clear();
+}
+
bool event::has_ref(client_t _client, bool _is_provided) {
std::lock_guard<std::mutex> its_lock(refs_mutex_);
auto its_client = refs_.find(_client);
@@ -370,4 +431,14 @@ bool event::compare(const std::shared_ptr<payload> &_lhs,
return is_change;
}
+std::set<client_t> event::get_subscribers(eventgroup_t _eventgroup) {
+ std::set<client_t> its_subscribers;
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ auto found_eventgroup = eventgroups_.find(_eventgroup);
+ if (found_eventgroup != eventgroups_.end()) {
+ its_subscribers = found_eventgroup->second;
+ }
+ return its_subscribers;
+}
+
} // namespace vsomeip
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp
index ac5da9c..8f10057 100644
--- a/implementation/routing/src/routing_manager_base.cpp
+++ b/implementation/routing/src/routing_manager_base.cpp
@@ -95,16 +95,7 @@ void routing_manager_base::stop_offer_service(client_t _client, service_t _servi
}
for (auto &e : events) {
e.second->unset_payload();
- }
- {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto its_service = eventgroup_clients_.find(_service);
- if (its_service != eventgroup_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- its_instance->second.clear();
- }
- }
+ e.second->clear_subscribers();
}
}
@@ -142,6 +133,7 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
epsilon_change_func_t _epsilon_change_func,
bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
+ bool transfer_subscriptions_from_any_event(false);
if (its_event) {
if(!its_event->is_cache_placeholder()) {
if (its_event->is_field() == _is_field) {
@@ -151,9 +143,13 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
if (_is_shadow && _is_provided) {
its_event->set_shadow(_is_shadow);
}
+ if (_client == host_->get_client() && _is_provided) {
+ its_event->set_shadow(false);
+ }
for (auto eg : _eventgroups) {
its_event->add_eventgroup(eg);
}
+ transfer_subscriptions_from_any_event = true;
} else {
VSOMEIP_ERROR << "Event registration update failed. "
"Specified arguments do not match existing registration.";
@@ -168,6 +164,9 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
if (_is_shadow && _is_provided) {
its_event->set_shadow(_is_shadow);
}
+ if (_client == host_->get_client() && _is_provided) {
+ its_event->set_shadow(false);
+ }
its_event->set_field(_is_field);
its_event->set_provided(_is_provided);
its_event->set_cache_placeholder(false);
@@ -180,7 +179,9 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
its_eventgroups.insert(_event);
its_event->set_eventgroups(its_eventgroups);
} else {
- its_event->set_eventgroups(_eventgroups);
+ for (auto eg : _eventgroups) {
+ its_event->add_eventgroup(eg);
+ }
}
its_event->set_epsilon_change_function(_epsilon_change_func);
@@ -211,8 +212,33 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
its_event->set_epsilon_change_function(_epsilon_change_func);
its_event->set_change_resets_cycle(_change_resets_cycle);
its_event->set_update_cycle(_cycle);
+
+ if (_is_provided) {
+ transfer_subscriptions_from_any_event = true;
+ }
}
+ if (transfer_subscriptions_from_any_event) {
+ // check if someone subscribed to ANY_EVENT and the subscription
+ // was stored in the cache placeholder. Move the subscribers
+ // into new event
+ std::shared_ptr<event> its_any_event =
+ find_event(_service, _instance, ANY_EVENT);
+ if (its_any_event) {
+ std::set<eventgroup_t> any_events_eventgroups =
+ its_any_event->get_eventgroups();
+ for (eventgroup_t eventgroup : _eventgroups) {
+ auto found_eg = any_events_eventgroups.find(eventgroup);
+ if (found_eg != any_events_eventgroups.end()) {
+ std::set<client_t> its_any_event_subscribers =
+ its_any_event->get_subscribers(eventgroup);
+ for (const client_t subscriber : its_any_event_subscribers) {
+ its_event->add_subscriber(eventgroup, subscriber);
+ }
+ }
+ }
+ }
+ }
if(!_is_cache_placeholder) {
its_event->add_ref(_client, _is_provided);
}
@@ -265,24 +291,6 @@ void routing_manager_base::unregister_event(client_t _client, service_t _service
}
}
-std::shared_ptr<event> routing_manager_base::get_event(
- service_t _service, instance_t _instance, event_t _event) const {
- std::shared_ptr<event> its_event;
- std::lock_guard<std::mutex> its_lock(events_mutex_);
- auto found_service = events_.find(_service);
- if (found_service != events_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_event = found_instance->second.find(_event);
- if (found_event != found_instance->second.end()) {
- return found_event->second;
- }
- }
- }
- return its_event;
-}
-
-
std::set<std::shared_ptr<event>> routing_manager_base::find_events(
service_t _service, instance_t _instance,
eventgroup_t _eventgroup) const {
@@ -303,45 +311,30 @@ std::set<std::shared_ptr<event>> routing_manager_base::find_events(
void routing_manager_base::subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major,
+ major_version_t _major, event_t _event,
subscription_type_e _subscription_type) {
(void) _major;
(void) _subscription_type;
- bool inserted = insert_subscription(_service, _instance, _eventgroup, _client);
+ bool inserted = insert_subscription(_service, _instance, _eventgroup, _event, _client);
if (inserted) {
- auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
- if (its_eventgroup) {
- std::set<std::shared_ptr<event> > its_events
- = its_eventgroup->get_events();
- for (auto e : its_events) {
- if (e->is_field())
- e->notify_one(_client, true); // TODO: use _flush to send all events together!
- }
- }
+ notify_one_current_value(_client, _service, _instance, _eventgroup, _event);
}
}
void routing_manager_base::unsubscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto its_service = eventgroup_clients_.find(_service);
- if (its_service != eventgroup_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- auto its_group = its_instance->second.find(_eventgroup);
- if (its_group != its_instance->second.end()) {
- its_group->second.erase(_client);
- if (!its_group->second.size()) {
- its_instance->second.erase(_eventgroup);
- if (!its_instance->second.size()) {
- its_service->second.erase(_instance);
- if (!its_service->second.size()) {
- eventgroup_clients_.erase(_service);
- }
- }
- }
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
+ if (_event != ANY_EVENT) {
+ auto its_event = find_event(_service, _instance, _event);
+ if (its_event) {
+ its_event->remove_subscriber(_eventgroup, _client);
+ }
+ } else {
+ auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
+ if (its_eventgroup) {
+ for (auto e : its_eventgroup->get_events()) {
+ e->remove_subscriber(_eventgroup, _client);
}
}
}
@@ -378,20 +371,7 @@ void routing_manager_base::notify_one(service_t _service, instance_t _instance,
found_eventgroup = true;
valid_group = its_group;
if (find_local(_client)) {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto its_service = eventgroup_clients_.find(_service);
- if (its_service != eventgroup_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- auto found_eventgroup = its_instance->second.find(its_group);
- if (found_eventgroup != its_instance->second.end()) {
- auto its_client = found_eventgroup->second.find(_client);
- if (its_client != found_eventgroup->second.end()) {
- already_subscribed = true;
- }
- }
- }
- }
+ already_subscribed = its_event->has_subscriber(its_group, _client);
} else {
// Remotes always needs to be marked as subscribed here
already_subscribed = true;
@@ -445,36 +425,69 @@ void routing_manager_base::send_pending_notify_ones(service_t _service, instance
void routing_manager_base::unset_all_eventpayloads(service_t _service,
instance_t _instance) {
- std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- const auto found_service = eventgroups_.find(_service);
- if (found_service != eventgroups_.end()) {
- const auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- for (const auto &eventgroupinfo : found_instance->second) {
- for (const auto &event : eventgroupinfo.second->get_events()) {
- event->unset_payload(true);
+ std::set<std::shared_ptr<event>> its_events;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ const auto found_service = eventgroups_.find(_service);
+ if (found_service != eventgroups_.end()) {
+ const auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ for (const auto &eventgroupinfo : found_instance->second) {
+ for (const auto &event : eventgroupinfo.second->get_events()) {
+ its_events.insert(event);
+ }
}
}
}
}
+ for (const auto &e : its_events) {
+ e->unset_payload(true);
+ }
}
void routing_manager_base::unset_all_eventpayloads(service_t _service,
instance_t _instance,
eventgroup_t _eventgroup) {
- std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- const auto found_service = eventgroups_.find(_service);
- if (found_service != eventgroups_.end()) {
- const auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- const auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- for (const auto &event : found_eventgroup->second->get_events()) {
- event->unset_payload(true);
+ std::set<std::shared_ptr<event>> its_events;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ const auto found_service = eventgroups_.find(_service);
+ if (found_service != eventgroups_.end()) {
+ const auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ const auto found_eventgroup = found_instance->second.find(_eventgroup);
+ if (found_eventgroup != found_instance->second.end()) {
+ for (const auto &event : found_eventgroup->second->get_events()) {
+ its_events.insert(event);
+ }
}
}
}
}
+ for (const auto &e : its_events) {
+ e->unset_payload(true);
+ }
+}
+
+void routing_manager_base::notify_one_current_value(client_t _client,
+ service_t _service,
+ instance_t _instance,
+ eventgroup_t _eventgroup,
+ event_t _event) {
+ if (_event != ANY_EVENT) {
+ std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
+ if (its_event && its_event->is_field())
+ its_event->notify_one(_client, true);
+ } else {
+ auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
+ if (its_eventgroup) {
+ std::set<std::shared_ptr<event> > its_events = its_eventgroup->get_events();
+ for (auto e : its_events) {
+ if (e->is_field())
+ e->notify_one(_client, true); // TODO: use _flush to send all events together!
+ }
+ }
+ }
}
bool routing_manager_base::send(client_t its_client,
@@ -560,8 +573,14 @@ bool routing_manager_base::is_available(service_t _service, instance_t _instance
std::lock_guard<std::mutex> its_lock(local_services_mutex_);
auto its_service = local_services_.find(_service);
if (its_service != local_services_.end()) {
+ if (_instance == ANY_INSTANCE) {
+ return true;
+ }
auto its_instance = its_service->second.find(_instance);
if (its_instance != its_service->second.end()) {
+ if (_major == ANY_MAJOR) {
+ return true;
+ }
if (std::get<0>(its_instance->second) == _major) {
available = true;
}
@@ -658,7 +677,7 @@ void routing_manager_base::remove_local(client_t _client) {
host_->on_subscription(std::get<0>(its_subscription), std::get<1>(its_subscription),
std::get<2>(its_subscription), _client, false);
routing_manager_base::unsubscribe(_client, std::get<0>(its_subscription),
- std::get<1>(its_subscription), std::get<2>(its_subscription));
+ std::get<1>(its_subscription), std::get<2>(its_subscription), ANY_EVENT);
}
std::shared_ptr<endpoint> its_endpoint(find_local(_client));
@@ -675,8 +694,11 @@ void routing_manager_base::remove_local(client_t _client) {
std::set<std::pair<service_t, instance_t>> its_services;
for (auto& s : local_services_) {
for (auto& i : s.second) {
- if (std::get<2>(i.second) == _client)
+ if (std::get<2>(i.second) == _client) {
its_services.insert({ s.first, i.first });
+ host_->on_availability(s.first, i.first, false,
+ std::get<0>(i.second), std::get<1>(i.second));
+ }
}
}
@@ -777,23 +799,6 @@ void routing_manager_base::remove_eventgroup_info(service_t _service,
}
}
-std::set<client_t> routing_manager_base::find_local_clients(service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) {
- std::set<client_t> its_clients;
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto found_service = eventgroup_clients_.find(_service);
- if (found_service != eventgroup_clients_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- its_clients = found_eventgroup->second;
- }
- }
- }
- return (its_clients);
-}
-
bool routing_manager_base::send_local_notification(client_t _client,
const byte_t *_data, uint32_t _size, instance_t _instance,
bool _flush, bool _reliable) {
@@ -809,24 +814,21 @@ bool routing_manager_base::send_local_notification(client_t _client,
if (its_event && !its_event->is_shadow()) {
std::vector< byte_t > its_data;
- for (auto its_group : its_event->get_eventgroups()) {
+ for (auto its_client : its_event->get_subscribers()) {
// local
- auto its_local_clients = find_local_clients(its_service, _instance, its_group);
- for (auto its_local_client : its_local_clients) {
- if (its_local_client == VSOMEIP_ROUTING_CLIENT) {
- has_remote = true;
- continue;
- }
+ if (its_client == VSOMEIP_ROUTING_CLIENT) {
+ has_remote = true;
+ continue;
+ }
#ifdef USE_DLT
- else {
- has_local = true;
- }
+ else {
+ has_local = true;
+ }
#endif
- std::shared_ptr<endpoint> its_local_target = find_local(its_local_client);
- if (its_local_target) {
- send_local(its_local_target, _client, _data, _size,
- _instance, _flush, _reliable, VSOMEIP_SEND);
- }
+ std::shared_ptr<endpoint> its_local_target = find_local(its_client);
+ if (its_local_target) {
+ send_local(its_local_target, _client, _data, _size,
+ _instance, _flush, _reliable, VSOMEIP_SEND);
}
}
}
@@ -881,23 +883,56 @@ bool routing_manager_base::send_local(
bool routing_manager_base::insert_subscription(
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- client_t _client) {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto found_service = eventgroup_clients_.find(_service);
- if (found_service != eventgroup_clients_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- if (found_eventgroup->second.find(_client)
- != found_eventgroup->second.end())
- return false;
+ event_t _event, client_t _client) {
+ bool is_inserted(false);
+ if (_event != ANY_EVENT) { // subscribe to specific event
+ std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
+ if (its_event) {
+ is_inserted = its_event->add_subscriber(_eventgroup, _client);
+ } else {
+ VSOMEIP_WARNING << "routing_manager_base::insert_subscription("
+ << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event << "]"
+ << " received subscription for unknown (unrequested / "
+ << "unoffered) event. Creating placeholder event holding "
+ << "subscription until event is requested/offered.";
+ is_inserted = create_placeholder_event_and_subscribe(_service,
+ _instance, _eventgroup, _event, _client);
+ }
+ } else { // subscribe to all events of the eventgroup
+ std::shared_ptr<eventgroupinfo> its_eventgroup
+ = find_eventgroup(_service, _instance, _eventgroup);
+ bool create_place_holder(false);
+ if (its_eventgroup) {
+ std::set<std::shared_ptr<event>> its_events = its_eventgroup->get_events();
+ if (!its_events.size()) {
+ create_place_holder = true;
+ } else {
+ for (auto e : its_events) {
+ is_inserted = e->add_subscriber(_eventgroup, _client) || is_inserted;
+ }
}
+ } else {
+ create_place_holder = true;
+ }
+ if (create_place_holder) {
+ VSOMEIP_WARNING << "routing_manager_base::insert_subscription("
+ << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event << "]"
+ << " received subscription for unknown (unrequested / "
+ << "unoffered) eventgroup. Creating placeholder event holding "
+ << "subscription until event is requested/offered.";
+ is_inserted = create_placeholder_event_and_subscribe(_service,
+ _instance, _eventgroup, _event, _client);
}
}
-
- eventgroup_clients_[_service][_instance][_eventgroup].insert(_client);
- return true;
+ return is_inserted;
}
std::shared_ptr<deserializer> routing_manager_base::get_deserializer() {
@@ -934,33 +969,63 @@ void routing_manager_base::send_pending_subscriptions(service_t _service,
if (ps.service_ == _service &&
ps.instance_ == _instance && ps.major_ == _major) {
send_subscribe(client_, ps.service_, ps.instance_,
- ps.eventgroup_, ps.major_, ps.subscription_type_);
+ ps.eventgroup_, ps.major_, ps.event_, ps.subscription_type_);
}
}
}
void routing_manager_base::remove_pending_subscription(service_t _service,
- instance_t _instance) {
- auto it = pending_subscriptions_.begin();
- while (it != pending_subscriptions_.end()) {
- if (it->service_ == _service
- && it->instance_ == _instance) {
- break;
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
+ if (_eventgroup == 0xFFFF) {
+ for (auto it = pending_subscriptions_.begin();
+ it != pending_subscriptions_.end();) {
+ if (it->service_ == _service
+ && it->instance_ == _instance) {
+ it = pending_subscriptions_.erase(it);
+ } else {
+ it++;
+ }
+ }
+ } else if (_event == ANY_EVENT) {
+ for (auto it = pending_subscriptions_.begin();
+ it != pending_subscriptions_.end();) {
+ if (it->service_ == _service
+ && it->instance_ == _instance
+ && it->eventgroup_ == _eventgroup) {
+ it = pending_subscriptions_.erase(it);
+ } else {
+ it++;
+ }
+ }
+ } else {
+ for (auto it = pending_subscriptions_.begin();
+ it != pending_subscriptions_.end();) {
+ if (it->service_ == _service
+ && it->instance_ == _instance
+ && it->eventgroup_ == _eventgroup
+ && it->event_ == _event) {
+ it = pending_subscriptions_.erase(it);
+ break;
+ } else {
+ it++;
+ }
}
- it++;
}
- if (it != pending_subscriptions_.end()) pending_subscriptions_.erase(it);
}
-std::set<std::tuple<service_t, instance_t, eventgroup_t>> routing_manager_base::get_subscriptions(const client_t _client) {
+std::set<std::tuple<service_t, instance_t, eventgroup_t>>
+routing_manager_base::get_subscriptions(const client_t _client) {
std::set<std::tuple<service_t, instance_t, eventgroup_t>> result;
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- for (auto its_service : eventgroup_clients_) {
+ std::lock_guard<std::mutex> its_lock(events_mutex_);
+ for (auto its_service : events_) {
for (auto its_instance : its_service.second) {
- for (auto its_eventgroup : its_instance.second) {
- auto its_client = its_eventgroup.second.find(_client);
- if (its_client != its_eventgroup.second.end()) {
- result.insert(std::make_tuple(its_service.first, its_instance.first, its_eventgroup.first));
+ for (auto its_event : its_instance.second) {
+ auto its_eventgroups = its_event.second->get_eventgroups(_client);
+ for (auto e : its_eventgroups) {
+ result.insert(std::make_tuple(
+ its_service.first,
+ its_instance.first,
+ e));
}
}
}
@@ -968,5 +1033,4 @@ std::set<std::tuple<service_t, instance_t, eventgroup_t>> routing_manager_base::
return result;
}
-
} // namespace vsomeip
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index e78f400..145ff35 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -141,6 +141,7 @@ void routing_manager_impl::stop() {
watchdog_timer_.cancel();
}
sd_notify(0, "STOPPING=1");
+ VSOMEIP_INFO << "Sent STOPPING to systemd watchdog";
#endif
host_->on_state(state_type_e::ST_DEREGISTERED);
@@ -204,7 +205,7 @@ bool routing_manager_impl::offer_service(client_t _client, service_t _service,
if (ps.service_ == _service &&
ps.instance_ == _instance && ps.major_ == _major) {
insert_subscription(ps.service_, ps.instance_,
- ps.eventgroup_, client_);
+ ps.eventgroup_, ps.event_, client_);
}
}
send_pending_subscriptions(_service, _instance, _major);
@@ -300,6 +301,8 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
if (_client == get_client()) {
stub_->create_local_receiver();
}
+
+ stub_->on_request_service(_client, _service, _instance, _major, _minor);
}
void routing_manager_impl::release_service(client_t _client, service_t _service,
@@ -310,6 +313,10 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
+ if (host_->get_client() == _client) {
+ std::lock_guard<std::mutex> its_lock(pending_subscription_mutex_);
+ remove_pending_subscription(_service, _instance, 0xFFFF, ANY_EVENT);
+ }
routing_manager_base::release_service(_client, _service, _instance);
requested_service_remove(_client, _service, _instance);
@@ -323,8 +330,8 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
}
clear_client_endpoints(_service, _instance, true);
clear_client_endpoints(_service, _instance, false);
- clear_service_info(_service, _instance, true);
- clear_service_info(_service, _instance, false);
+ its_info->set_endpoint(nullptr, true);
+ its_info->set_endpoint(nullptr, false);
clear_identified_clients(_service, _instance);
clear_identifying_clients( _service, _instance);
unset_all_eventpayloads(_service, _instance);
@@ -343,28 +350,29 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
void routing_manager_impl::subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
- subscription_type_e _subscription_type) {
+ event_t _event, subscription_type_e _subscription_type) {
VSOMEIP_INFO << "SUBSCRIBE("
<< std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << _eventgroup << ":"
+ << std::hex << std::setw(4) << std::setfill('0') << _event << ":"
<< std::dec << (uint16_t)_major << "]";
if (get_client() == find_local_client(_service, _instance)) {
bool subscription_accepted = host_->on_subscription(_service, _instance, _eventgroup, _client, true);
(void) find_or_create_local(_client);
if (!subscription_accepted) {
- stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup);
+ stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup, _event);
VSOMEIP_INFO << "Subscription request from client: 0x" << std::hex
<< _client << std::dec << " for eventgroup: 0x" << _eventgroup
<< " rejected from application handler.";
return;
} else {
- stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup);
+ stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event);
}
- routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _subscription_type);
+ routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _event, _subscription_type);
send_pending_notify_ones(_service, _instance, _eventgroup, _client);
} else {
if (discovery_) {
@@ -395,33 +403,24 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
identify_for_subscribe(_client, _service, _instance, _major, _subscription_type);
}
}
- bool inserted = insert_subscription(_service, _instance, _eventgroup, _client);
+ bool inserted = insert_subscription(_service, _instance, _eventgroup, _event, _client);
if (inserted) {
if (0 == find_local_client(_service, _instance)) {
static const ttl_t configured_ttl(configuration_->get_sd_ttl());
+ notify_one_current_value(_client, _service, _instance, _eventgroup, _event);
discovery_->subscribe(_service, _instance, _eventgroup,
_major, configured_ttl, subscriber, _subscription_type);
- auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
- if (its_eventgroup) {
- std::set<std::shared_ptr<event> > its_events
- = its_eventgroup->get_events();
- for (auto e : its_events) {
- if (e->is_field())
- e->notify_one(_client, true); // TODO: use _flush to send all initial events together
- }
- }
} else {
- std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
if (is_available(_service, _instance, _major)) {
stub_->send_subscribe(find_local(_service, _instance),
- _client, _service, _instance, _eventgroup, _major, false);
+ _client, _service, _instance, _eventgroup, _major, _event, false);
}
}
}
- {
+ if (get_client() == _client) {
std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
- eventgroup_data_t subscription = { _service, _instance, _eventgroup, _major,
- _subscription_type};
+ subscription_data_t subscription = { _service, _instance, _eventgroup, _major,
+ _event, _subscription_type};
pending_subscriptions_.insert(subscription);
}
} else {
@@ -431,40 +430,32 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
}
void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) {
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
VSOMEIP_INFO << "UNSUBSCRIBE("
<< std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event << "]";
- bool last_subscriber_removed(false);
- if (discovery_) {
- {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto found_service = eventgroup_clients_.find(_service);
- if (found_service != eventgroup_clients_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_eventgroup = found_instance->second.find(
- _eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- found_eventgroup->second.erase(_client);
- if (!found_eventgroup->second.size()) {
- last_subscriber_removed = true;
- found_instance->second.erase(_eventgroup);
- if (!found_service->second.size()) {
- found_service->second.erase(_instance);
- if (!found_service->second.size()) {
- eventgroup_clients_.erase(_service);
- }
- }
- }
- }
- }
+ bool last_subscriber_removed(true);
+ std::shared_ptr<eventgroupinfo> its_info
+ = find_eventgroup(_service, _instance, _eventgroup);
+ if (its_info) {
+ for (auto e : its_info->get_events()) {
+ if (e->get_event() == _event || ANY_EVENT == _event)
+ e->remove_subscriber(_eventgroup, _client);
+ }
+ for (auto e : its_info->get_events()) {
+ if (e->has_subscriber(_eventgroup, ANY_CLIENT)) {
+ last_subscriber_removed = false;
+ break;
}
}
+ }
+
+ if (discovery_) {
host_->on_subscription(_service, _instance, _eventgroup, _client, false);
if (0 == find_local_client(_service, _instance)) {
client_t subscriber = VSOMEIP_ROUTING_CLIENT;
@@ -494,13 +485,15 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
discovery_->unsubscribe(_service, _instance, _eventgroup, subscriber);
}
} else {
- stub_->send_unsubscribe(find_local(_service, _instance),
- _client, _service, _instance, _eventgroup, false);
+ if (get_client() == _client) {
+ std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
+ remove_pending_subscription(_service, _instance, _eventgroup, _event);
+ stub_->send_unsubscribe(find_local(_service, _instance),
+ _client, _service, _instance, _eventgroup, _event, false);
+ }
}
clear_multicast_endpoints(_service, _instance);
- std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
- remove_pending_subscription(_service, _instance);
} else {
VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!";
}
@@ -515,21 +508,21 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
length_t _size, instance_t _instance,
bool _flush, bool _reliable) {
bool is_sent(false);
- std::shared_ptr<endpoint> its_target;
- bool is_request = utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS]);
- bool is_notification = utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS]);
+ if (_size > VSOMEIP_MESSAGE_TYPE_POS) {
+ std::shared_ptr<endpoint> its_target;
+ bool is_request = utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS]);
+ bool is_notification = utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS]);
- client_t its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
- _data[VSOMEIP_CLIENT_POS_MAX]);
- service_t its_service = VSOMEIP_BYTES_TO_WORD(
- _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
- method_t its_method = VSOMEIP_BYTES_TO_WORD(
- _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
+ client_t its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
+ _data[VSOMEIP_CLIENT_POS_MAX]);
+ service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
+ method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
- bool is_service_discovery = (its_service == vsomeip::sd::service
- && its_method == vsomeip::sd::method);
+ bool is_service_discovery = (its_service == vsomeip::sd::service
+ && its_method == vsomeip::sd::method);
- if (_size > VSOMEIP_MESSAGE_TYPE_POS) {
if (is_request) {
its_target = find_local(its_service, _instance);
} else if (!is_notification) {
@@ -768,6 +761,12 @@ void routing_manager_impl::register_event(
_epsilon_change_func, _is_provided, _is_shadow,
_is_cache_placeholder);
}
+ VSOMEIP_INFO << "REGISTER EVENT("
+ << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event
+ << ":is_provider=" << _is_provided << "]";
}
void routing_manager_impl::register_shadow_event(client_t _client,
@@ -788,19 +787,6 @@ void routing_manager_impl::unregister_shadow_event(client_t _client,
_event, _is_provided);
}
-void routing_manager_impl::notify(
- service_t _service, instance_t _instance, event_t _event,
- std::shared_ptr<payload> _payload, bool _force, bool _flush) {
- std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
- if (its_event) {
- its_event->set_payload(_payload, _force, _flush);
- } else {
- VSOMEIP_WARNING << "Attempt to update the undefined event/field ["
- << std::hex << _service << "." << _instance << "." << _event
- << "]";
- }
-}
-
void routing_manager_impl::notify_one(service_t _service, instance_t _instance,
event_t _event, std::shared_ptr<payload> _payload, client_t _client,
bool _force, bool _flush) {
@@ -1117,9 +1103,15 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) {
if (s.reliable_) {
stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, s.service_id_,
s.instance_id_, s.major_, s.minor_);
- if (discovery_) {
- discovery_->on_reliable_endpoint_connected(s.service_id_,
- s.instance_id_, s.endpoint_);
+ try {
+ io_.post(
+ std::bind(
+ &routing_manager_impl::call_sd_reliable_endpoint_connected,
+ std::dynamic_pointer_cast<routing_manager_impl>(
+ shared_from_this()), s.service_id_,
+ s.instance_id_, s.endpoint_));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR<< "routing_manager_impl::on_connect: " << e.what();
}
}
}
@@ -1215,16 +1207,7 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se
}
for (auto &e : events) {
e.second->unset_payload();
- }
- {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto its_service = eventgroup_clients_.find(_service);
- if (its_service != eventgroup_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- its_instance->second.clear();
- }
- }
+ e.second->clear_subscribers();
}
/**
@@ -1343,7 +1326,7 @@ bool routing_manager_impl::deliver_notification(
std::shared_ptr<event> its_event = find_event(_service, _instance, its_method);
if (its_event) {
std::vector< byte_t > its_data;
- std::set<client_t> its_local_client_set;
+
if(its_event->is_field() && !its_event->is_provided()) {
// store the current value of the remote field
const uint32_t its_length(utility::get_payload_size(_data, _length));
@@ -1353,14 +1336,8 @@ bool routing_manager_impl::deliver_notification(
its_length);
its_event->set_payload_dont_notify(its_payload);
}
- for (auto its_group : its_event->get_eventgroups()) {
- auto its_local_clients
- = find_local_clients(_service, _instance, its_group);
- its_local_client_set.insert(
- its_local_clients.begin(), its_local_clients.end());
- }
- for (auto its_local_client : its_local_client_set) {
+ for (const auto its_local_client : its_event->get_subscribers()) {
if (its_local_client == host_->get_client()) {
deliver_message(_data, _length, _instance, _reliable);
} else {
@@ -2207,7 +2184,7 @@ void routing_manager_impl::expire_subscriptions(const boost::asio::ip::address &
auto target = find_local(its_service.first, its_instance.first);
if (target) {
stub_->send_unsubscribe(target, VSOMEIP_ROUTING_CLIENT, its_service.first,
- its_instance.first, its_eventgroup.first, true);
+ its_instance.first, its_eventgroup.first, ANY_EVENT, true);
}
}
if(its_eventgroup.second->is_multicast() && its_invalid_endpoints.size() &&
@@ -2367,7 +2344,7 @@ void routing_manager_impl::on_subscribe(
}
}
stub_->send_subscribe(find_local(_service, _instance),
- client, _service, _instance, _eventgroup, its_eventgroup->get_major(), true);
+ client, _service, _instance, _eventgroup, its_eventgroup->get_major(), ANY_EVENT, true);
}
}
}
@@ -2395,7 +2372,7 @@ void routing_manager_impl::on_unsubscribe(service_t _service,
clear_remote_subscriber(_service, _instance, its_client, _target);
stub_->send_unsubscribe(find_local(_service, _instance),
- its_client, _service, _instance, _eventgroup, true);
+ its_client, _service, _instance, _eventgroup, ANY_EVENT, true);
host_->on_subscription(_service, _instance, _eventgroup, its_client, false);
@@ -2456,7 +2433,8 @@ void routing_manager_impl::on_subscribe_ack(service_t _service,
}
void routing_manager_impl::on_subscribe_ack(client_t _client,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup) {
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event) {
{
std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
auto found_service = specific_endpoint_clients_.find(_service);
@@ -2474,12 +2452,14 @@ void routing_manager_impl::on_subscribe_ack(client_t _client,
if (_client == get_client()) {
host_->on_subscription_error(_service, _instance, _eventgroup, 0x0 /*OK*/);
} else {
- stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup);
+ stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup,
+ _event);
}
}
void routing_manager_impl::on_subscribe_nack(client_t _client,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup) {
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event) {
{
std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
auto found_service = specific_endpoint_clients_.find(_service);
@@ -2497,7 +2477,8 @@ void routing_manager_impl::on_subscribe_nack(client_t _client,
if (_client == get_client()) {
host_->on_subscription_error(_service, _instance, _eventgroup, 0x7 /*Rejected*/);
} else {
- stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup);
+ stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup,
+ _event);
}
}
@@ -3003,7 +2984,7 @@ routing_manager_impl::expire_subscriptions() {
auto target = find_local(its_service.first, its_instance.first);
if (target) {
stub_->send_unsubscribe(target, VSOMEIP_ROUTING_CLIENT, its_service.first,
- its_instance.first, its_eventgroup.first, true);
+ its_instance.first, its_eventgroup.first, ANY_EVENT, true);
}
VSOMEIP_INFO << "Expired subscription ("
@@ -3053,11 +3034,14 @@ void routing_manager_impl::watchdog_cbk(boost::system::error_code const &_error)
if (is_ready) {
sd_notify(0, "WATCHDOG=1");
+ VSOMEIP_INFO << "Triggered systemd watchdog";
} else {
is_ready = true;
sd_notify(0, "READY=1");
+ VSOMEIP_INFO << "Sent READY to systemd watchdog";
if (0 < sd_watchdog_enabled(0, &its_interval)) {
has_interval = true;
+ VSOMEIP_INFO << "systemd watchdog is enabled";
}
}
@@ -3459,12 +3443,12 @@ void routing_manager_impl::unsubscribe_specific_client_at_sd(
void routing_manager_impl::send_subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
- subscription_type_e _subscription_type) {
+ event_t _event, subscription_type_e _subscription_type) {
(void)_subscription_type;
auto endpoint = find_local(_service, _instance);
if (endpoint) {
stub_->send_subscribe(endpoint, _client,
- _service, _instance, _eventgroup, _major, false);
+ _service, _instance, _eventgroup, _major, _event, false);
}
}
@@ -3491,23 +3475,15 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
if (find_local_client(s.first, i.first) != VSOMEIP_ROUTING_CLIENT) {
continue; //don't expire local services
}
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
-
- auto found_service = eventgroup_clients_.find(s.first);
- if (found_service != eventgroup_clients_.end()) {
- auto found_instance = found_service->second.find(i.first);
- if (found_instance != found_service->second.end()) {
- for (auto its_eventgroup : found_instance->second) {
- discovery_->unsubscribe(s.first, i.first, its_eventgroup.first, VSOMEIP_ROUTING_CLIENT);
- std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
- auto its_specific_service = specific_endpoint_clients_.find(s.first);
- if (its_specific_service != specific_endpoint_clients_.end()) {
- auto its_specific_instance = its_specific_service->second.find(i.first);
- if (its_specific_instance != its_specific_service->second.end()) {
- for (auto its_client : its_specific_instance->second) {
- discovery_->unsubscribe(s.first, i.first, its_eventgroup.first, its_client);
- }
- }
+ for (auto its_eventgroup : get_subscribed_eventgroups(s.first, i.first)) {
+ discovery_->unsubscribe(s.first, i.first, its_eventgroup, VSOMEIP_ROUTING_CLIENT);
+ std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
+ auto its_specific_service = specific_endpoint_clients_.find(s.first);
+ if (its_specific_service != specific_endpoint_clients_.end()) {
+ auto its_specific_instance = its_specific_service->second.find(i.first);
+ if (its_specific_instance != its_specific_service->second.end()) {
+ for (auto its_client : its_specific_instance->second) {
+ discovery_->unsubscribe(s.first, i.first, its_eventgroup, its_client);
}
}
}
@@ -3555,6 +3531,8 @@ void routing_manager_impl::start_ip_routing() {
init_service_info(its_service.first, its_service.second, true);
}
pending_sd_offers_.clear();
+
+ VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE;
}
void routing_manager_impl::requested_service_add(client_t _client,
@@ -3589,4 +3567,88 @@ void routing_manager_impl::requested_service_remove(client_t _client,
}
}
+std::set<eventgroup_t>
+routing_manager_impl::get_subscribed_eventgroups(
+ service_t _service, instance_t _instance) {
+ std::set<eventgroup_t> its_eventgroups;
+
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ auto found_service = eventgroups_.find(_service);
+ if (found_service != eventgroups_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ for (auto its_group : found_instance->second) {
+ for (auto its_event : its_group.second->get_events()) {
+ if (its_event->has_subscriber(its_group.first, ANY_CLIENT)) {
+ its_eventgroups.insert(its_group.first);
+ }
+ }
+ }
+ }
+ }
+
+ return its_eventgroups;
+}
+
+void routing_manager_impl::call_sd_reliable_endpoint_connected(
+ service_t _service, instance_t _instance,
+ std::shared_ptr<endpoint> _endpoint) {
+ if (discovery_) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(3));
+ discovery_->on_reliable_endpoint_connected(_service, _instance,
+ _endpoint);
+ }
+}
+
+bool routing_manager_impl::create_placeholder_event_and_subscribe(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event, client_t _client) {
+ bool is_inserted(false);
+ // we received a event which was not yet requested/offered
+ // create a placeholder field until someone requests/offers this event with
+ // full information like eventgroup, field or not etc.
+ std::set<eventgroup_t> its_eventgroups({_eventgroup});
+
+ const client_t its_local_client(find_local_client(_service, _instance));
+ if (its_local_client == host_->get_client()) {
+ // received subscription for event of a service instance hosted by
+ // application acting as rm_impl register with own client id and shadow = false
+ register_event(host_->get_client(), _service, _instance, _event,
+ its_eventgroups, true, std::chrono::milliseconds::zero(), false,
+ nullptr, false, false, true);
+ } else if (its_local_client != VSOMEIP_ROUTING_CLIENT) {
+ // received subscription for event of a service instance hosted on
+ // this node register with client id of local_client and set shadow to true
+ register_event(its_local_client, _service, _instance, _event,
+ its_eventgroups, true, std::chrono::milliseconds::zero(), false,
+ nullptr, false, true, true);
+ } else {
+ // received subscription for event of a unknown or remote service instance
+ std::shared_ptr<serviceinfo> its_info = find_service(_service,
+ _instance);
+ if (its_info && !its_info->is_local()) {
+ // remote service, register shadow event with client ID of subscriber
+ // which should have called register_event
+ register_event(_client, _service, _instance, _event,
+ its_eventgroups, true, std::chrono::milliseconds::zero(),
+ false, nullptr, false, true, true);
+ } else {
+ VSOMEIP_WARNING
+ << "routing_manager_impl::create_placeholder_event_and_subscribe("
+ << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event << "]"
+ << " received subscription for unknown service instance.";
+ }
+ }
+
+ std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
+ if (its_event) {
+ is_inserted = its_event->add_subscriber(_eventgroup, _client);
+ }
+ return is_inserted;
+}
+
} // namespace vsomeip
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index e12e962..79e85f2 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -265,6 +265,7 @@ void routing_manager_proxy::release_service(client_t _client,
routing_manager_base::release_service(_client, _service, _instance);
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
+ remove_pending_subscription(_service, _instance, 0xFFFF, ANY_EVENT);
if (state_ == inner_state_type_e::ST_REGISTERED) {
send_release_service(_client, _service, _instance);
}
@@ -321,6 +322,12 @@ void routing_manager_proxy::register_event(client_t _client,
_event, _eventgroups, _is_field, _is_provided);
}
}
+ VSOMEIP_INFO << "REGISTER EVENT("
+ << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event
+ << ":is_provider=" << _is_provided << "]";
}
void routing_manager_proxy::unregister_event(client_t _client,
@@ -383,22 +390,22 @@ bool routing_manager_proxy::is_field(service_t _service, instance_t _instance,
void routing_manager_proxy::subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
- subscription_type_e _subscription_type) {
+ event_t _event, subscription_type_e _subscription_type) {
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
if (state_ == inner_state_type_e::ST_REGISTERED && is_available(_service, _instance, _major)) {
send_subscribe(_client, _service, _instance, _eventgroup, _major,
- _subscription_type);
+ _event, _subscription_type);
}
- eventgroup_data_t subscription = { _service, _instance, _eventgroup, _major,
- _subscription_type};
+ subscription_data_t subscription = { _service, _instance, _eventgroup, _major,
+ _event, _subscription_type};
pending_subscriptions_.insert(subscription);
}
}
void routing_manager_proxy::send_subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
- subscription_type_e _subscription_type) {
+ event_t _event, subscription_type_e _subscription_type) {
(void)_client;
byte_t its_command[VSOMEIP_SUBSCRIBE_COMMAND_SIZE];
@@ -417,8 +424,10 @@ void routing_manager_proxy::send_subscribe(client_t _client, service_t _service,
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = _major;
- its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 7] = 0; // local subscriber
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_subscription_type,
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 7], &_event,
+ sizeof(_event));
+ its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9] = 0; // local subscriber
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 10], &_subscription_type,
sizeof(_subscription_type));
client_t target_client = find_local_client(_service, _instance);
@@ -434,7 +443,8 @@ void routing_manager_proxy::send_subscribe(client_t _client, service_t _service,
}
void routing_manager_proxy::send_subscribe_nack(client_t _subscriber,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup) {
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event) {
byte_t its_command[VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE];
uint32_t its_size = VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE
- VSOMEIP_COMMAND_HEADER_SIZE;
@@ -453,6 +463,8 @@ void routing_manager_proxy::send_subscribe_nack(client_t _subscriber,
sizeof(_eventgroup));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber,
sizeof(_subscriber));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_event,
+ sizeof(_event));
auto its_target = find_local(_subscriber);
if (its_target) {
@@ -466,7 +478,8 @@ void routing_manager_proxy::send_subscribe_nack(client_t _subscriber,
}
void routing_manager_proxy::send_subscribe_ack(client_t _subscriber,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup) {
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event) {
byte_t its_command[VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE];
uint32_t its_size = VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE
- VSOMEIP_COMMAND_HEADER_SIZE;
@@ -485,6 +498,8 @@ void routing_manager_proxy::send_subscribe_ack(client_t _subscriber,
sizeof(_eventgroup));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber,
sizeof(_subscriber));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_event,
+ sizeof(_event));
auto its_target = find_local(_subscriber);
if (its_target) {
@@ -498,10 +513,12 @@ void routing_manager_proxy::send_subscribe_ack(client_t _subscriber,
}
void routing_manager_proxy::unsubscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) {
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
(void)_client;
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
+ remove_pending_subscription(_service, _instance, _eventgroup, _event);
+
if (state_ == inner_state_type_e::ST_REGISTERED) {
byte_t its_command[VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE];
uint32_t its_size = VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE
@@ -518,7 +535,9 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service,
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
- its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = 0; // is_local
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_event,
+ sizeof(_event));
+ its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8] = 0; // is_local
auto its_target = find_local(_service, _instance);
if (its_target) {
@@ -530,7 +549,6 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service,
}
}
}
- remove_pending_subscription(_service, _instance);
}
}
@@ -721,7 +739,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
#if 0
std::stringstream msg;
msg << "rmp::on_message: ";
- for (int i = 0; i < _size; ++i)
+ for (length_t i = 0; i < _size; ++i)
msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
@@ -731,6 +749,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
service_t its_service;
instance_t its_instance;
eventgroup_t its_eventgroup;
+ event_t its_event;
major_version_t its_major;
uint8_t is_remote_subscriber;
client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host());
@@ -846,7 +865,9 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
sizeof(its_eventgroup));
std::memcpy(&its_major, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
sizeof(its_major));
- std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 7],
+ std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 7],
+ sizeof(its_event));
+ std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 9],
sizeof(is_remote_subscriber));
if (is_remote_subscriber) {
@@ -854,7 +875,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
(void)host_->on_subscription(its_service, its_instance,
its_eventgroup, its_client, true);
bool inserted = insert_subscription(its_service, its_instance, its_eventgroup,
- VSOMEIP_ROUTING_CLIENT);
+ its_event, VSOMEIP_ROUTING_CLIENT);
if (inserted) {
notify_remote_initally(its_service, its_instance, its_eventgroup);
}
@@ -874,17 +895,20 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
bool subscription_accepted = host_->on_subscription(its_service, its_instance,
its_eventgroup, its_client, true);
if (!subscription_accepted) {
- send_subscribe_nack(its_client, its_service, its_instance, its_eventgroup);
+ send_subscribe_nack(its_client, its_service,
+ its_instance, its_eventgroup, its_event);
} else {
+ send_subscribe_ack(its_client, its_service, its_instance,
+ its_eventgroup, its_event);
routing_manager_base::subscribe(its_client, its_service, its_instance,
- its_eventgroup, its_major, subscription_type_e::SU_RELIABLE_AND_UNRELIABLE);
- send_subscribe_ack(its_client, its_service, its_instance, its_eventgroup);
+ its_eventgroup, its_major, its_event,
+ subscription_type_e::SU_RELIABLE_AND_UNRELIABLE);
send_pending_notify_ones(its_service, its_instance, its_eventgroup, its_client);
}
} else {
// Local & not yet known subscriber ~> set pending until subscriber gets known!
- eventgroup_data_t subscription = { its_service, its_instance,
- its_eventgroup, its_major,
+ subscription_data_t subscription = { its_service, its_instance,
+ its_eventgroup, its_major, its_event,
subscription_type_e::SU_RELIABLE_AND_UNRELIABLE};
std::lock_guard<std::mutex> its_lock(pending_ingoing_subscripitons_mutex_);
pending_ingoing_subscripitons_[its_client].insert(subscription);
@@ -894,6 +918,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << ":"
+ << std::hex << std::setw(4) << std::setfill('0') << its_event << ":"
<< std::dec << (uint16_t)its_major << "]";
break;
@@ -908,24 +933,27 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
sizeof(its_instance));
std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
sizeof(its_eventgroup));
- std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ sizeof(its_event));
+ std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8],
sizeof(is_remote_subscriber));
host_->on_subscription(its_service, its_instance, its_eventgroup, its_client, false);
if (!is_remote_subscriber) {
// Local subscriber: withdraw subscription
- routing_manager_base::unsubscribe(its_client, its_service, its_instance, its_eventgroup);
+ routing_manager_base::unsubscribe(its_client, its_service, its_instance, its_eventgroup, its_event);
} else {
// Remote subscriber: withdraw subscription only if no more remote subscriber exists
if (!get_remote_subscriber_count(its_service, its_instance, its_eventgroup, false)) {
routing_manager_base::unsubscribe(VSOMEIP_ROUTING_CLIENT, its_service,
- its_instance, its_eventgroup);
+ its_instance, its_eventgroup, its_event);
}
}
VSOMEIP_INFO << "UNSUBSCRIBE("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event << "]";
break;
case VSOMEIP_SUBSCRIBE_NACK:
@@ -941,13 +969,16 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
sizeof(its_eventgroup));
std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
sizeof(its_subscriber));
+ std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8],
+ sizeof(its_event));
on_subscribe_nack(its_subscriber, its_service, its_instance, its_eventgroup);
VSOMEIP_INFO << "SUBSCRIBE NACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event << "]";
break;
case VSOMEIP_SUBSCRIBE_ACK:
@@ -963,13 +994,16 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
sizeof(its_eventgroup));
std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
sizeof(its_subscriber));
+ std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8],
+ sizeof(its_event));
on_subscribe_ack(its_subscriber, its_service, its_instance, its_eventgroup);
VSOMEIP_INFO << "SUBSCRIBE ACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event << "]";
break;
default:
@@ -987,167 +1021,177 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
- inner_state_type_e its_state(inner_state_type_e::ST_DEREGISTERED);
- bool restart_sender(_size == 1); // 1 indicates a routing master stop
- std::map<service_t, std::map<instance_t, std::tuple< major_version_t, minor_version_t, client_t> > > old_local_services;
- std::unordered_set<client_t> clients_to_delete;
- struct service_info {
- service_t service_id_;
- instance_t instance_id_;
- major_version_t major_;
- minor_version_t minor_;
- };
- std::forward_list<struct service_info> services_to_remove;
- std::forward_list<struct service_info> services_to_add;
- {
- std::lock_guard<std::mutex> its_lock(local_services_mutex_);
- old_local_services = local_services_;
- local_services_.clear();
- std::unordered_set<client_t> known_clients;
-
- uint32_t i = 0;
- while (i + sizeof(uint32_t) <= _size) {
- uint32_t its_client_size;
- std::memcpy(&its_client_size, &_data[i], sizeof(uint32_t));
- i += uint32_t(sizeof(uint32_t));
-
- if (i + sizeof(client_t) <= _size) {
- client_t its_client;
- std::memcpy(&its_client, &_data[i], sizeof(client_t));
- i += uint32_t(sizeof(client_t));
-
- if (its_client == client_) {
- its_state = inner_state_type_e::ST_REGISTERED;
- }
- known_clients.insert(its_client);
- uint32_t j = 0;
- while (j + sizeof(uint32_t) <= its_client_size) {
- uint32_t its_services_size;
- std::memcpy(&its_services_size, &_data[i + j], sizeof(uint32_t));
- j += uint32_t(sizeof(uint32_t));
+ bool restart_sender(_size == 1);
+ if (restart_sender && is_started_) {
+ // Handle restart to routing manager!
+ std::unordered_set<client_t> clients_to_delete;
+ {
+ std::lock_guard<std::mutex> its_lock(known_clients_mutex_);
+ for (auto its_client : known_clients_) {
+ if (its_client != get_client()) {
+ clients_to_delete.insert(its_client);
+ }
+ }
+ }
+ VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
+ << " is deregistered.";
- if (its_services_size >= sizeof(service_t) + sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t)) {
- its_services_size -= uint32_t(sizeof(service_t));
+ // inform host about its own registration state changes
+ host_->on_state(static_cast<state_type_e>(inner_state_type_e::ST_DEREGISTERED));
- service_t its_service;
- std::memcpy(&its_service, &_data[i + j], sizeof(service_t));
- j += uint32_t(sizeof(service_t));
+ {
+ std::lock_guard<std::mutex> its_lock(state_mutex_);
+ state_ = inner_state_type_e::ST_DEREGISTERED;
+ }
- while (its_services_size >= sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t)) {
- instance_t its_instance;
- std::memcpy(&its_instance, &_data[i + j], sizeof(instance_t));
- j += uint32_t(sizeof(instance_t));
+ // Notify stop() call about clean deregistration
+ state_condition_.notify_one();
- major_version_t its_major;
- std::memcpy(&its_major, &_data[i + j], sizeof(major_version_t));
- j += uint32_t(sizeof(major_version_t));
+ // Remove all local connections/endpoints
+ for (const auto client : clients_to_delete) {
+ if (client != VSOMEIP_ROUTING_CLIENT) {
+ remove_local(client);
+ }
+ }
- minor_version_t its_minor;
- std::memcpy(&its_minor, &_data[i + j], sizeof(minor_version_t));
- j += uint32_t(sizeof(minor_version_t));
+ VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
+ <<": Reconnecting to routing manager.";
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->restart();
+ }
- local_services_[its_service][its_instance] = std::make_tuple(its_major, its_minor, its_client);
+ // Abort due to routing manager has stopped
+ return;
+ }
- its_services_size -= uint32_t(sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t) );
- }
- }
+ uint32_t i = 0;
+ while (i + sizeof(uint32_t) + sizeof(routing_info_entry_e) <= _size) {
+ routing_info_entry_e routing_info_entry;
+ std::memcpy(&routing_info_entry, &_data[i], sizeof(routing_info_entry_e));
+ i += uint32_t(sizeof(routing_info_entry_e));
+
+ uint32_t its_client_size;
+ std::memcpy(&its_client_size, &_data[i], sizeof(uint32_t));
+ i += uint32_t(sizeof(uint32_t));
+
+ if (i + sizeof(client_t) <= _size) {
+ client_t its_client;
+ std::memcpy(&its_client, &_data[i], sizeof(client_t));
+ i += uint32_t(sizeof(client_t));
+
+ if (routing_info_entry == routing_info_entry_e::RIE_ADD_CLIENT) {
+ {
+ std::lock_guard<std::mutex> its_lock(known_clients_mutex_);
+ known_clients_.insert(its_client);
}
+ if (its_client == get_client()) {
+ VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
+ << " is registered.";
- i += j;
- }
- }
- // Which clients are no longer needed?!
- for (const auto client : get_connected_clients()) {
- if (known_clients.find(client) == known_clients.end()) {
- clients_to_delete.insert(client);
- }
- }
- {
- std::lock_guard<std::mutex> its_lock(known_clients_mutex_);
- known_clients_ = known_clients;
- }
+ // inform host about its own registration state changes
+ host_->on_state(static_cast<state_type_e>(inner_state_type_e::ST_REGISTERED));
- // Check for services that are no longer available
- for (const auto &i : old_local_services) {
- auto found_service = local_services_.find(i.first);
- if (found_service != local_services_.end()) {
- for (const auto &j : i.second) {
- auto found_instance = found_service->second.find(j.first);
- if (found_instance == found_service->second.end()) {
- services_to_remove.push_front(
- { i.first, j.first, std::get<0>(j.second),
- std::get<1>(j.second) });
+ {
+ std::lock_guard<std::mutex> its_lock(state_mutex_);
+ boost::system::error_code ec;
+ register_application_timer_.cancel(ec);
+ send_registered_ack();
+ send_pending_commands();
+ state_ = inner_state_type_e::ST_REGISTERED;
}
+
+ // Notify stop() call about clean deregistration
+ state_condition_.notify_one();
}
- } else {
- for (const auto &j : i.second) {
- services_to_remove.push_front(
- { i.first, j.first, std::get<0>(j.second),
- std::get<1>(j.second) });
+ } else if (routing_info_entry == routing_info_entry_e::RIE_DEL_CLIENT) {
+ {
+ std::lock_guard<std::mutex> its_lock(known_clients_mutex_);
+ known_clients_.erase(its_client);
}
- }
- }
+ if (its_client == get_client()) {
+ VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
+ << " is deregistered.";
+
+ // inform host about its own registration state changes
+ host_->on_state(static_cast<state_type_e>(inner_state_type_e::ST_DEREGISTERED));
- // Check for services that are newly available
- for (const auto &i : local_services_) {
- auto found_service = old_local_services.find(i.first);
- if (found_service != old_local_services.end()) {
- for (const auto &j : i.second) {
- auto found_instance = found_service->second.find(j.first);
- if (found_instance == found_service->second.end()) {
- services_to_add.push_front(
- { i.first, j.first, std::get<0>(j.second),
- std::get<1>(j.second) });
+ {
+ std::lock_guard<std::mutex> its_lock(state_mutex_);
+ state_ = inner_state_type_e::ST_DEREGISTERED;
}
- }
- } else {
- for (const auto &j : i.second) {
- services_to_add.push_front(
- { i.first, j.first, std::get<0>(j.second),
- std::get<1>(j.second) });
+
+ // Notify stop() call about clean deregistration
+ state_condition_.notify_one();
+ } else if (its_client != VSOMEIP_ROUTING_CLIENT) {
+ remove_local(its_client);
}
}
- }
- }
-
- if (state_ != its_state) {
- VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
- << (its_state == inner_state_type_e::ST_REGISTERED ?
- " is registered." : " is deregistered.");
- // inform host about its own registration state changes
- host_->on_state(static_cast<state_type_e>(its_state));
+ uint32_t j = 0;
+ while (j + sizeof(uint32_t) <= its_client_size) {
+ uint32_t its_services_size;
+ std::memcpy(&its_services_size, &_data[i + j], sizeof(uint32_t));
+ j += uint32_t(sizeof(uint32_t));
+
+ if (its_services_size >= sizeof(service_t) + sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t)) {
+ its_services_size -= uint32_t(sizeof(service_t));
+
+ service_t its_service;
+ std::memcpy(&its_service, &_data[i + j], sizeof(service_t));
+ j += uint32_t(sizeof(service_t));
+
+ while (its_services_size >= sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t)) {
+ instance_t its_instance;
+ std::memcpy(&its_instance, &_data[i + j], sizeof(instance_t));
+ j += uint32_t(sizeof(instance_t));
+
+ major_version_t its_major;
+ std::memcpy(&its_major, &_data[i + j], sizeof(major_version_t));
+ j += uint32_t(sizeof(major_version_t));
+
+ minor_version_t its_minor;
+ std::memcpy(&its_minor, &_data[i + j], sizeof(minor_version_t));
+ j += uint32_t(sizeof(minor_version_t));
+
+ if (routing_info_entry == routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE) {
+ {
+ std::lock_guard<std::mutex> its_lock(known_clients_mutex_);
+ known_clients_.insert(its_client);
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(local_services_mutex_);
+ local_services_[its_service][its_instance] = std::make_tuple(its_major, its_minor, its_client);
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(state_mutex_);
+ send_pending_subscriptions(its_service, its_instance, its_major);
+ }
+ host_->on_availability(its_service, its_instance, true, its_major, its_minor);
+ } else if (routing_info_entry == routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE) {
+ {
+ std::lock_guard<std::mutex> its_lock(local_services_mutex_);
+ auto found_service = local_services_.find(its_service);
+ if (found_service != local_services_.end()) {
+ found_service->second.erase(its_instance);
+ if (found_service->second.size() == 0) {
+ local_services_.erase(its_service);
+ }
+ }
+ }
+ on_stop_offer_service(its_service, its_instance, its_major, its_minor);
+ host_->on_availability(its_service, its_instance, false, its_major, its_minor);
+ }
- {
- std::lock_guard<std::mutex> its_lock(state_mutex_);
- if (its_state == inner_state_type_e::ST_REGISTERED) {
- boost::system::error_code ec;
- register_application_timer_.cancel(ec);
- send_registered_ack();
- send_pending_commands();
+ its_services_size -= uint32_t(sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t) );
+ }
+ }
}
- state_ = its_state;
- }
- // Notify stop() call about clean deregistration
- state_condition_.notify_one();
- }
-
- // Report services that are no longer available
- for (const service_info &sr : services_to_remove) {
- on_stop_offer_service(sr.service_id_, sr.instance_id_, sr.major_, sr.minor_);
- host_->on_availability(sr.service_id_, sr.instance_id_, false, sr.major_, sr.minor_);
- }
- // Report services that are newly available
- for (const service_info &sa : services_to_add) {
- {
- std::lock_guard<std::mutex> its_lock(state_mutex_);
- send_pending_subscriptions(sa.service_id_, sa.instance_id_, sa.major_);
+ i += j;
}
- host_->on_availability(sa.service_id_, sa.instance_id_, true, sa.major_, sa.minor_);
}
-
{
struct subscription_info {
service_t service_id_;
@@ -1155,6 +1199,7 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
eventgroup_t eventgroup_id_;
client_t client_id_;
major_version_t major_;
+ event_t event_;
};
std::lock_guard<std::mutex> its_lock(pending_ingoing_subscripitons_mutex_);
std::forward_list<struct subscription_info> subscription_actions;
@@ -1168,26 +1213,26 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
subscription_actions.push_front(
{ subscription.service_, subscription.instance_,
subscription.eventgroup_, client,
- subscription.major_ });
+ subscription.major_, subscription.event_ });
}
}
}
}
for (const subscription_info &si : subscription_actions) {
+ (void) find_or_create_local(si.client_id_);
bool subscription_accepted = host_->on_subscription(
si.service_id_, si.instance_id_, si.eventgroup_id_,
si.client_id_, true);
- (void) find_or_create_local(si.client_id_);
if (!subscription_accepted) {
send_subscribe_nack(si.client_id_, si.service_id_,
- si.instance_id_, si.eventgroup_id_);
+ si.instance_id_, si.eventgroup_id_, si.event_);
} else {
routing_manager_base::subscribe(si.client_id_,
si.service_id_, si.instance_id_, si.eventgroup_id_,
- si.major_,
+ si.major_, si.event_,
subscription_type_e::SU_RELIABLE_AND_UNRELIABLE);
send_subscribe_ack(si.client_id_, si.service_id_,
- si.instance_id_, si.eventgroup_id_);
+ si.instance_id_, si.eventgroup_id_, si.event_);
send_pending_notify_ones(si.service_id_,
si.instance_id_, si.eventgroup_id_, si.client_id_);
}
@@ -1195,21 +1240,6 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
}
}
}
-
- for (const auto client : clients_to_delete) {
- if (client != VSOMEIP_ROUTING_CLIENT) {
- remove_local(client);
- }
- }
-
- if (restart_sender && is_started_) {
- VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
- <<": Reconnecting to routing manager.";
- std::lock_guard<std::mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->start();
- }
- }
}
void routing_manager_proxy::register_application() {
@@ -1606,7 +1636,7 @@ void routing_manager_proxy::register_application_timeout_cbk(
VSOMEIP_WARNING << std::hex << "Client 0x" << get_client() << " register timeout!"
<< " : Restart route to stub!";
if (sender_) {
- sender_->start();
+ sender_->restart();
}
}
}
@@ -1632,4 +1662,23 @@ bool routing_manager_proxy::is_client_known(client_t _client) {
std::lock_guard<std::mutex> its_lock(known_clients_mutex_);
return (known_clients_.find(_client) != known_clients_.end());
}
+
+bool routing_manager_proxy::create_placeholder_event_and_subscribe(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event, client_t _client) {
+ bool is_inserted(false);
+ // we received a event which was not yet requested/offered
+ // create a placeholder field until someone requests/offers this event with
+ // full information like eventgroup, field or not etc.
+ std::set<eventgroup_t> its_eventgroups({ _eventgroup });
+ // routing_manager_proxy: Always register with own client id and shadow = false
+ register_event(host_->get_client(), _service, _instance, _event,
+ its_eventgroups, true, std::chrono::milliseconds::zero(), false,
+ nullptr, false, false, true);
+ std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
+ if (its_event) {
+ is_inserted = its_event->add_subscriber(_eventgroup, _client);
+ }
+ return is_inserted;
+}
} // namespace vsomeip
diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp
index c048fe6..4146f59 100644
--- a/implementation/routing/src/routing_manager_stub.cpp
+++ b/implementation/routing/src/routing_manager_stub.cpp
@@ -95,8 +95,11 @@ void routing_manager_stub::start() {
}
void routing_manager_stub::stop() {
- client_registration_running_ = false;
- client_registration_condition_.notify_one();
+ {
+ std::lock_guard<std::mutex> its_lock(client_registration_mutex_);
+ client_registration_running_ = false;
+ client_registration_condition_.notify_one();
+ }
if (client_registration_thread_->joinable()) {
client_registration_thread_->join();
}
@@ -134,7 +137,7 @@ void routing_manager_stub::stop() {
{
std::lock_guard<std::mutex> its_lock(routing_info_mutex_);
- broadcast_routing_info(true);
+ broadcast_routing_stop();
}
}
@@ -308,14 +311,16 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
sizeof(its_eventgroup));
std::memcpy(&its_major, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
sizeof(its_major));
- std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 7],
+ std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 7],
+ sizeof(its_event));
+ std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 9],
sizeof(is_remote_subscriber));
- std::memcpy(&its_subscription_type, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8],
+ std::memcpy(&its_subscription_type, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 10],
sizeof(its_subscription_type));
if (configuration_->is_client_allowed(its_client,
its_service, its_instance)) {
- host_->subscribe(its_client, its_service,
- its_instance, its_eventgroup, its_major, its_subscription_type);
+ host_->subscribe(its_client, its_service, its_instance,
+ its_eventgroup, its_major, its_event, its_subscription_type);
} else {
VSOMEIP_WARNING << "Security: Client " << std::hex
<< its_client << " subscribes to service/instance "
@@ -335,8 +340,11 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
sizeof(its_instance));
std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
sizeof(its_eventgroup));
+ std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ sizeof(its_event));
+
host_->unsubscribe(its_client, its_service,
- its_instance, its_eventgroup);
+ its_instance, its_eventgroup, its_event);
break;
case VSOMEIP_SUBSCRIBE_ACK:
@@ -352,12 +360,16 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
sizeof(its_eventgroup));
std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
sizeof(its_subscriber));
- host_->on_subscribe_ack(its_subscriber, its_service, its_instance, its_eventgroup);
+ std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8],
+ sizeof(its_event));
+ host_->on_subscribe_ack(its_subscriber, its_service,
+ its_instance, its_eventgroup, its_event);
VSOMEIP_INFO << "SUBSCRIBE ACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event << "]";
break;
case VSOMEIP_SUBSCRIBE_NACK:
@@ -373,12 +385,16 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
sizeof(its_eventgroup));
std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
sizeof(its_subscriber));
- host_->on_subscribe_nack(its_subscriber, its_service, its_instance, its_eventgroup);
+ std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8],
+ sizeof(its_event));
+ host_->on_subscribe_nack(its_subscriber, its_service,
+ its_instance, its_eventgroup, its_event);
VSOMEIP_INFO << "SUBSCRIBE NACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event << "]";
break;
case VSOMEIP_SEND: {
@@ -488,6 +504,7 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
std::memcpy(&its_instance,
&_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
sizeof(its_instance));
+
host_->release_service(its_client, its_service, its_instance);
break;
@@ -587,8 +604,6 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
case VSOMEIP_REGISTERED_ACK:
VSOMEIP_INFO << "REGISTERED_ACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << ")";
- std::lock_guard<std::mutex> its_guard(routing_info_mutex_);
- broadcast_routing_info(false, its_client);
break;
}
}
@@ -660,12 +675,30 @@ void routing_manager_stub::client_registration_func(void) {
// endpoint error to avoid writing in an already closed socket
if (b != registration_type_e::DEREGISTER_ERROR_CASE) {
std::lock_guard<std::mutex> its_guard(routing_info_mutex_);
- send_routing_info(r.first);
+ send_routing_info_delta(r.first,
+ b == registration_type_e::REGISTER ?
+ routing_info_entry_e::RIE_ADD_CLIENT :
+ routing_info_entry_e::RIE_DEL_CLIENT,
+ r.first);
}
if (b != registration_type_e::REGISTER) {
{
std::lock_guard<std::mutex> its_guard(routing_info_mutex_);
- broadcast_routing_info(false, r.first);
+ auto its_connection = connection_matrix_.find(r.first);
+ if (its_connection != connection_matrix_.end()) {
+ for (auto its_client : its_connection->second) {
+ if (its_client != r.first &&
+ its_client != VSOMEIP_ROUTING_CLIENT) {
+ send_routing_info_delta(its_client,
+ routing_info_entry_e::RIE_DEL_CLIENT, r.first);
+ }
+ }
+ connection_matrix_.erase(r.first);
+ }
+ for (auto its_client : connection_matrix_) {
+ connection_matrix_[its_client.first].erase(r.first);
+ }
+ service_requests_.erase(r.first);
}
host_->remove_local(r.first);
}
@@ -764,7 +797,8 @@ void routing_manager_stub::on_offer_service(client_t _client,
configuration_->is_offer_allowed(_client, _service, _instance)) {
std::lock_guard<std::mutex> its_guard(routing_info_mutex_);
routing_info_[_client].second[_service][_instance] = std::make_pair(_major, _minor);
- broadcast_routing_info();
+ inform_requesters(_client, _service, _instance, _major, _minor,
+ routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE, true);
} else {
VSOMEIP_WARNING << std::hex << "Security: Client 0x" << _client
<< " isn't allow to offer the following service/instance "
@@ -788,26 +822,30 @@ void routing_manager_stub::on_stop_offer_service(client_t _client,
if (0 == found_service->second.size()) {
found_client->second.second.erase(_service);
}
- broadcast_routing_info();
+ inform_requesters(_client, _service, _instance, _major, _minor,
+ routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE, false);
} else if( _major == DEFAULT_MAJOR && _minor == DEFAULT_MINOR) {
found_service->second.erase(_instance);
if (0 == found_service->second.size()) {
found_client->second.second.erase(_service);
}
- broadcast_routing_info();
+ inform_requesters(_client, _service, _instance, _major, _minor,
+ routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE, false);
}
}
}
}
}
-void routing_manager_stub::send_routing_info(client_t _client, bool _empty) {
- std::shared_ptr<endpoint> its_endpoint = host_->find_local(_client);
+void routing_manager_stub::send_routing_info_delta(client_t _target,
+ routing_info_entry_e _entry,
+ client_t _client, service_t _service, instance_t _instance,
+ major_version_t _major, minor_version_t _minor) {
+ std::shared_ptr<endpoint> its_endpoint = host_->find_local(_target);
if (its_endpoint) {
- // Create the command vector & reserve some bytes initially..
- // ..to avoid reallocation for smaller messages!
+ connection_matrix_[_target].insert(_client);
+
std::vector<byte_t> its_command;
- its_command.reserve(routingCommandSize_);
// Routing command
its_command.push_back(VSOMEIP_ROUTING_INFO);
@@ -825,77 +863,65 @@ void routing_manager_stub::send_routing_info(client_t _client, bool _empty) {
its_command.push_back(size_placeholder);
}
- // Routing info loop
- for (auto &info : routing_info_) {
- if (_empty) {
- its_command.push_back(0x0);
- break;
+ // Routing Info State Change
+ for (uint32_t i = 0; i < sizeof(routing_info_entry_e); ++i) {
+ its_command.push_back(
+ reinterpret_cast<const byte_t*>(&_entry)[i]);
+ }
+
+ std::size_t its_size_pos = its_command.size();
+ std::size_t its_entry_size = its_command.size();
+
+ // Client size placeholder
+ byte_t placeholder = 0x0;
+ for (uint32_t i = 0; i < sizeof(uint32_t); ++i) {
+ its_command.push_back(placeholder);
+ }
+ // Client
+ for (uint32_t i = 0; i < sizeof(client_t); ++i) {
+ its_command.push_back(
+ reinterpret_cast<const byte_t*>(&_client)[i]);
+ }
+
+ if (_entry == routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE ||
+ _entry == routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE) {
+ //Service
+ uint32_t its_service_entry_size = uint32_t(sizeof(service_t)
+ + sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t));
+ for (uint32_t i = 0; i < sizeof(its_service_entry_size); ++i) {
+ its_command.push_back(
+ reinterpret_cast<const byte_t*>(&its_service_entry_size)[i]);
}
- std::size_t its_size_pos = its_command.size();
- std::size_t its_entry_size = its_command.size();
- // Client size placeholder
- byte_t placeholder = 0x0;
- for (uint32_t i = 0; i < sizeof(uint32_t); ++i) {
- its_command.push_back(placeholder);
+ for (uint32_t i = 0; i < sizeof(service_t); ++i) {
+ its_command.push_back(
+ reinterpret_cast<const byte_t*>(&_service)[i]);
}
- // Client
- for (uint32_t i = 0; i < sizeof(client_t); ++i) {
- its_command.push_back(
- reinterpret_cast<const byte_t*>(&info.first)[i]);
+ // Instance
+ for (uint32_t i = 0; i < sizeof(instance_t); ++i) {
+ its_command.push_back(
+ reinterpret_cast<const byte_t*>(&_instance)[i]);
}
- // Iterate over all services
- for (auto &service : info.second.second) {
- // Service entry size
- uint32_t its_service_entry_size = uint32_t(sizeof(service_t)
- + service.second.size() * (sizeof(instance_t)
- + sizeof(major_version_t) + sizeof(minor_version_t)));
- for (uint32_t i = 0; i < sizeof(its_service_entry_size); ++i) {
- its_command.push_back(
- reinterpret_cast<const byte_t*>(&its_service_entry_size)[i]);
- }
- // Service
- for (uint32_t i = 0; i < sizeof(service_t); ++i) {
- its_command.push_back(
- reinterpret_cast<const byte_t*>(&service.first)[i]);
- }
- // Iterate over all instances
- for (auto &instance : service.second) {
- // Instance
- for (uint32_t i = 0; i < sizeof(instance_t); ++i) {
- its_command.push_back(
- reinterpret_cast<const byte_t*>(&instance)[i]);
- }
- // Major version
- for (uint32_t i = 0; i < sizeof(major_version_t); ++i) {
- its_command.push_back(
- reinterpret_cast<const byte_t*>(&instance.second.first)[i]);
- }
- // Minor version
- for (uint32_t i = 0; i < sizeof(minor_version_t); ++i) {
- its_command.push_back(
- reinterpret_cast<const byte_t*>(&instance.second.second)[i]);
- }
- }
+ // Major version
+ for (uint32_t i = 0; i < sizeof(major_version_t); ++i) {
+ its_command.push_back(
+ reinterpret_cast<const byte_t*>(&_major)[i]);
+ }
+ // Minor version
+ for (uint32_t i = 0; i < sizeof(minor_version_t); ++i) {
+ its_command.push_back(
+ reinterpret_cast<const byte_t*>(&_minor)[i]);
}
- // File client size
- its_entry_size = its_command.size() - its_entry_size - uint32_t(sizeof(uint32_t));
- std::memcpy(&its_command[its_size_pos], &its_entry_size, sizeof(uint32_t));
}
+ // File client size
+ its_entry_size = its_command.size() - its_entry_size - uint32_t(sizeof(uint32_t));
+ std::memcpy(&its_command[its_size_pos], &its_entry_size, sizeof(uint32_t));
+
// File overall size
std::size_t its_size = its_command.size() - VSOMEIP_COMMAND_PAYLOAD_POS;
- if (_empty) {
- its_size = 1; // Indicates stopping routing!
- }
std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(uint32_t));
its_size += VSOMEIP_COMMAND_PAYLOAD_POS;
- // Double init size until it fits into the actual size for next run
- size_t newInitSize;
- for (newInitSize = VSOMEIP_ROUTING_INFO_SIZE_INIT;
- newInitSize < its_size; newInitSize *= 2);
- routingCommandSize_ = newInitSize;
-
#if 0
std::stringstream msg;
msg << "rms::send_routing_info ";
@@ -914,12 +940,70 @@ void routing_manager_stub::send_routing_info(client_t _client, bool _empty) {
}
}
-void routing_manager_stub::broadcast_routing_info(bool _empty, client_t _ignore) {
+void routing_manager_stub::inform_requesters(client_t _hoster, service_t _service,
+ instance_t _instance, major_version_t _major, minor_version_t _minor,
+ routing_info_entry_e _entry, bool _inform_service) {
+ for (auto its_client : service_requests_) {
+ auto its_service = its_client.second.find(_service);
+ if (its_service != its_client.second.end()) {
+ bool send(false);
+ for (auto its_instance : its_service->second) {
+ if (its_instance.first == ANY_INSTANCE ||
+ its_instance.first == _instance) {
+ send = true;
+ }
+ }
+ if (send) {
+ if (_inform_service) {
+ if (_hoster != VSOMEIP_ROUTING_CLIENT &&
+ _hoster != host_->get_client()) {
+ if (!is_already_connected(_hoster, its_client.first)) {
+ send_routing_info_delta(_hoster,
+ routing_info_entry_e::RIE_ADD_CLIENT,
+ its_client.first);
+ }
+ }
+ }
+ send_routing_info_delta(its_client.first, _entry, _hoster,
+ _service, _instance, _major, _minor);
+ }
+ }
+ }
+}
+
+bool routing_manager_stub::is_already_connected(client_t _source, client_t _sink) {
+ return connection_matrix_[_source].find(_sink) != connection_matrix_[_source].end();
+}
+
+void routing_manager_stub::broadcast_routing_stop() {
+ std::vector<byte_t> its_command;
+
+ // Routing command
+ its_command.push_back(VSOMEIP_ROUTING_INFO);
+
+ // Sender client
+ client_t client = get_client();
+ for (uint32_t i = 0; i < sizeof(client_t); ++i) {
+ its_command.push_back(
+ reinterpret_cast<const byte_t*>(&client)[i]);
+ }
+
+ // Overall size ~> 1 indicates routing stop
+ uint32_t size = 0x1;
+ for (uint32_t i = 0; i < sizeof(uint32_t); ++i) {
+ its_command.push_back(
+ reinterpret_cast<const byte_t*>(&size)[i]);
+ }
+
+ // Stop Placeholder
+ its_command.push_back(0x0);
+
for (auto& info : routing_info_) {
- if (info.first != VSOMEIP_ROUTING_CLIENT &&
- info.first != host_->get_client() &&
- info.first != _ignore) {
- send_routing_info(info.first, _empty);
+ if (info.first != VSOMEIP_ROUTING_CLIENT && info.first != host_->get_client()) {
+ std::shared_ptr<endpoint> its_endpoint = host_->find_local(info.first);
+ if (its_endpoint) {
+ its_endpoint->send(&its_command[0], uint32_t(its_command.size()), true);
+ }
}
}
}
@@ -939,7 +1023,8 @@ void routing_manager_stub::broadcast(const std::vector<byte_t> &_command) const
void routing_manager_stub::send_subscribe(std::shared_ptr<vsomeip::endpoint> _target,
client_t _client, service_t _service, instance_t _instance,
- eventgroup_t _eventgroup, major_version_t _major, bool _is_remote_subscriber) {
+ eventgroup_t _eventgroup, major_version_t _major,
+ event_t _event, bool _is_remote_subscriber) {
if (_target) {
byte_t its_command[VSOMEIP_SUBSCRIBE_COMMAND_SIZE];
uint32_t its_size = VSOMEIP_SUBSCRIBE_COMMAND_SIZE
@@ -956,10 +1041,12 @@ void routing_manager_stub::send_subscribe(std::shared_ptr<vsomeip::endpoint> _ta
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = _major;
- its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 7] = _is_remote_subscriber;
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 7], &_event,
+ sizeof(_event));
+ its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9] = _is_remote_subscriber;
// set byte for subscription_type to zero. It's only used
// in subscribe messages sent from rm_proxies to rm_stub.
- its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8] = 0x0;
+ its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 10] = 0x0;
_target->send(its_command, sizeof(its_command));
}
@@ -967,7 +1054,7 @@ void routing_manager_stub::send_subscribe(std::shared_ptr<vsomeip::endpoint> _ta
void routing_manager_stub::send_unsubscribe(std::shared_ptr<vsomeip::endpoint> _target,
client_t _client, service_t _service, instance_t _instance,
- eventgroup_t _eventgroup, bool _is_remote_subscriber) {
+ eventgroup_t _eventgroup, event_t _event, bool _is_remote_subscriber) {
if (_target) {
byte_t its_command[VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE];
uint32_t its_size = VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE
@@ -983,7 +1070,9 @@ void routing_manager_stub::send_unsubscribe(std::shared_ptr<vsomeip::endpoint> _
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_is_remote_subscriber,
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_event,
+ sizeof(_event));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_is_remote_subscriber,
sizeof(_is_remote_subscriber));
_target->send(its_command, sizeof(its_command));
@@ -991,7 +1080,7 @@ void routing_manager_stub::send_unsubscribe(std::shared_ptr<vsomeip::endpoint> _
}
void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) {
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
std::shared_ptr<endpoint> its_endpoint = host_->find_local(_client);
if (its_endpoint) {
@@ -1013,13 +1102,15 @@ void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _servi
sizeof(_eventgroup));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_client,
sizeof(_client));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_event,
+ sizeof(_event));
its_endpoint->send(&its_command[0], sizeof(its_command), true);
}
}
void routing_manager_stub::send_subscribe_nack(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) {
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
std::shared_ptr<endpoint> its_endpoint = host_->find_local(_client);
if (its_endpoint) {
@@ -1041,6 +1132,8 @@ void routing_manager_stub::send_subscribe_nack(client_t _client, service_t _serv
sizeof(_eventgroup));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_client,
sizeof(_client));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_event,
+ sizeof(_event));
its_endpoint->send(&its_command[0], sizeof(its_command), true);
}
@@ -1355,6 +1448,61 @@ client_t routing_manager_stub::get_client() const {
return host_->get_client();
}
+void routing_manager_stub::on_request_service(client_t _client, service_t _service,
+ instance_t _instance, major_version_t _major,
+ minor_version_t _minor) {
+
+ std::lock_guard<std::mutex> its_guard(routing_info_mutex_);
+ service_requests_[_client][_service][_instance] = std::make_pair(_major, _minor);
+
+ for (auto found_client : routing_info_) {
+ auto found_service = found_client.second.second.find(_service);
+ if (found_service != found_client.second.second.end()) {
+ if (_instance == ANY_INSTANCE) {
+ if (found_client.first != VSOMEIP_ROUTING_CLIENT &&
+ found_client.first != host_->get_client()) {
+ if (!is_already_connected(found_client.first, _client)) {
+ send_routing_info_delta(found_client.first,
+ routing_info_entry_e::RIE_ADD_CLIENT, _client);
+ }
+ }
+ if (_client != VSOMEIP_ROUTING_CLIENT &&
+ _client != host_->get_client()) {
+ for (auto instance : found_service->second) {
+ send_routing_info_delta(_client,
+ routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE,
+ found_client.first, _service, instance.first,
+ instance.second.first, instance.second.second);
+ }
+ }
+
+ break;
+ } else {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ if (found_client.first != VSOMEIP_ROUTING_CLIENT &&
+ found_client.first != host_->get_client()) {
+ if (!is_already_connected(found_client.first, _client)) {
+ send_routing_info_delta(found_client.first,
+ routing_info_entry_e::RIE_ADD_CLIENT, _client);
+ }
+ }
+ if (_client != VSOMEIP_ROUTING_CLIENT &&
+ _client != host_->get_client()) {
+ send_routing_info_delta(_client,
+ routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE,
+ found_client.first, _service, _instance,
+ found_instance->second.first,
+ found_instance->second.second);
+ }
+
+ break;
+ }
+ }
+ }
+ }
+}
+
#ifndef _WIN32
bool routing_manager_stub::check_credentials(client_t _client, uid_t _uid, gid_t _gid) {
return configuration_->check_credentials(_client, _uid, _gid);
diff --git a/implementation/runtime/include/application_impl.hpp b/implementation/runtime/include/application_impl.hpp
index a651a9e..2dcc95b 100644
--- a/implementation/runtime/include/application_impl.hpp
+++ b/implementation/runtime/include/application_impl.hpp
@@ -85,6 +85,8 @@ public:
VSOMEIP_EXPORT void unsubscribe(service_t _service, instance_t _instance,
eventgroup_t _eventgroup);
+ VSOMEIP_EXPORT void unsubscribe(service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup, event_t _event);
VSOMEIP_EXPORT bool is_available(service_t _service, instance_t _instance,
major_version_t _major = DEFAULT_MAJOR, minor_version_t _minor = DEFAULT_MINOR) const;
@@ -213,7 +215,7 @@ private:
void dispatch();
void invoke_handler(std::shared_ptr<sync_handler> &_handler);
bool has_active_dispatcher();
- bool is_active_dispatcher(std::thread::id &_id);
+ bool is_active_dispatcher(const std::thread::id &_id);
void remove_elapsed_dispatchers();
void shutdown();
@@ -225,7 +227,9 @@ private:
bool *_send_back_cached_event,
bool *_send_back_cached_eventgroup);
void remove_subscription(service_t _service, instance_t _instance,
- eventgroup_t _eventgroup);
+ eventgroup_t _eventgroup, event_t _event);
+ bool check_for_active_subscription(service_t _service, instance_t _instance,
+ event_t _event);
//
// Attributes
//
@@ -288,7 +292,7 @@ private:
mutable std::mutex handlers_mutex_;
// Dispatching
- bool is_dispatching_;
+ std::atomic<bool> is_dispatching_;
// Dispatcher threads
std::map<std::thread::id, std::shared_ptr<std::thread>> dispatchers_;
// Dispatcher threads that elapsed and can be removed
@@ -320,9 +324,9 @@ private:
bool is_routing_manager_host_;
// Event subscriptions
- std::mutex event_subscriptions_mutex_;
- std::map<service_t, std::map<instance_t, std::map<event_t, bool>>> event_subscriptions_;
- std::map<service_t, std::map<instance_t, std::set<eventgroup_t>>> eventgroup_subscriptions_;
+ std::mutex subscriptions_mutex_;
+ std::map<service_t, std::map<instance_t,
+ std::map<event_t, std::map<eventgroup_t, bool>>>> subscriptions_;
std::thread::id stop_caller_id_;
std::thread::id start_caller_id_;
diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp
index 777c99a..d4d4ed7 100644
--- a/implementation/runtime/src/application_impl.cpp
+++ b/implementation/runtime/src/application_impl.cpp
@@ -242,17 +242,16 @@ void application_impl::start() {
stopped_called_ = false;
VSOMEIP_INFO << "Starting vsomeip application \"" << name_ << "\" using "
<< std::dec << io_thread_count << " threads";
+
+ start_caller_id_ = std::this_thread::get_id();
{
- std::lock_guard<std::mutex> its_lock(handlers_mutex_);
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
is_dispatching_ = true;
+ auto its_main_dispatcher = std::make_shared<std::thread>(
+ std::bind(&application_impl::main_dispatch, shared_from_this()));
+ dispatchers_[its_main_dispatcher->get_id()] = its_main_dispatcher;
}
- start_caller_id_ = std::this_thread::get_id();
-
- auto its_main_dispatcher = std::make_shared<std::thread>(
- std::bind(&application_impl::main_dispatch, this));
- dispatchers_[its_main_dispatcher->get_id()] = its_main_dispatcher;
-
if (stop_thread_.joinable()) {
stop_thread_.join();
}
@@ -264,7 +263,13 @@ void application_impl::start() {
for (size_t i = 0; i < io_thread_count - 1; i++) {
std::shared_ptr<std::thread> its_thread
= std::make_shared<std::thread>([this, i] {
+ try {
io_.run();
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "application_impl::start() "
+ "catched exception:" << e.what();
+ throw;
+ }
});
io_threads_.insert(its_thread);
}
@@ -273,8 +278,12 @@ void application_impl::start() {
app_counter_mutex__.lock();
app_counter__++;
app_counter_mutex__.unlock();
-
- io_.run();
+ try {
+ io_.run();
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "application_impl::start() catched exception:" << e.what();
+ throw;
+ }
if (stop_thread_.joinable()) {
stop_thread_.join();
@@ -384,22 +393,30 @@ void application_impl::subscribe(service_t _service, instance_t _instance,
bool send_back_cached_group(false);
check_send_back_cached_event(_service, _instance, _event, _eventgroup,
&send_back_cached, &send_back_cached_group);
+
if (send_back_cached) {
send_back_cached_event(_service, _instance, _event);
} else if(send_back_cached_group) {
- send_back_cached_eventgroup(_service, _instance, _event);
+ send_back_cached_eventgroup(_service, _instance, _eventgroup);
}
routing_->subscribe(client_, _service, _instance, _eventgroup, _major,
- _subscription_type);
+ _event, _subscription_type);
}
}
void application_impl::unsubscribe(service_t _service, instance_t _instance,
eventgroup_t _eventgroup) {
- remove_subscription(_service, _instance, _eventgroup);
+ remove_subscription(_service, _instance, _eventgroup, ANY_EVENT);
+ if (routing_)
+ routing_->unsubscribe(client_, _service, _instance, _eventgroup, ANY_EVENT);
+}
+
+void application_impl::unsubscribe(service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup, event_t _event) {
+ remove_subscription(_service, _instance, _eventgroup, _event);
if (routing_)
- routing_->unsubscribe(client_, _service, _instance, _eventgroup);
+ routing_->unsubscribe(client_, _service, _instance, _eventgroup, _event);
}
bool application_impl::is_available(
@@ -1047,13 +1064,15 @@ void application_impl::on_availability(service_t _service, instance_t _instance,
}
}
if (!_is_available) {
- std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_);
- auto found_service = event_subscriptions_.find(_service);
- if (found_service != event_subscriptions_.end()) {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+ auto found_service = subscriptions_.find(_service);
+ if (found_service != subscriptions_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
- for (auto &e : found_instance->second) {
- e.second = false;
+ for (auto &event : found_instance->second) {
+ for (auto &eventgroup : event.second) {
+ eventgroup.second = false;
+ }
}
}
}
@@ -1065,34 +1084,13 @@ void application_impl::on_availability(service_t _service, instance_t _instance,
}
void application_impl::on_message(const std::shared_ptr<message> &&_message) {
- service_t its_service = _message->get_service();
- instance_t its_instance = _message->get_instance();
- method_t its_method = _message->get_method();
+ const service_t its_service = _message->get_service();
+ const instance_t its_instance = _message->get_instance();
+ const method_t its_method = _message->get_method();
if (_message->get_message_type() == message_type_e::MT_NOTIFICATION) {
- std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_);
- auto found_service = event_subscriptions_.find(its_service);
- if(found_service != event_subscriptions_.end()) {
- auto found_instance = found_service->second.find(its_instance);
- if (found_instance != found_service->second.end()) {
- auto its_event = found_instance->second.find(its_method);
- if (its_event != found_instance->second.end()) {
- its_event->second = true;
- } else {
- // received a event which nobody yet subscribed to
- event_subscriptions_[its_service][its_instance][its_method] = true;
- // check if someone subscribed to ANY_EVENT
- auto its_any_event = found_instance->second.find(ANY_EVENT);
- if(its_any_event == found_instance->second.end()) {
- return;
- }
- }
- } else {
- // received a event from a service instance which nobody yet subscribed to
- return;
- }
- } else {
- // received a event from a service which nobody yet subscribed to
+ if (!check_for_active_subscription(its_service, its_instance,
+ static_cast<event_t>(its_method))) {
return;
}
}
@@ -1152,17 +1150,19 @@ void application_impl::on_message(const std::shared_ptr<message> &&_message) {
}
if (its_handlers.size()) {
- std::lock_guard<std::mutex> its_lock(handlers_mutex_);
- for (const auto &its_handler : its_handlers) {
- auto handler = its_handler.handler_;
- std::shared_ptr<sync_handler> its_sync_handler =
- std::make_shared<sync_handler>([handler, _message]() {
- handler(std::move(_message));
- });
- handlers_.push_back(its_sync_handler);
+ {
+ std::lock_guard<std::mutex> its_lock(handlers_mutex_);
+ for (const auto &its_handler : its_handlers) {
+ auto handler = its_handler.handler_;
+ std::shared_ptr<sync_handler> its_sync_handler =
+ std::make_shared<sync_handler>([handler, _message]() {
+ handler(std::move(_message));
+ });
+ handlers_.push_back(its_sync_handler);
+ }
}
+ dispatcher_condition_.notify_one();
}
- dispatcher_condition_.notify_one();
}
}
@@ -1196,19 +1196,20 @@ void application_impl::main_dispatch() {
remove_elapsed_dispatchers();
+#ifdef _WIN32
if(!is_dispatching_) {
its_lock.unlock();
-#ifdef _WIN32
return;
-#endif
}
+#endif
}
}
}
+ its_lock.unlock();
}
void application_impl::dispatch() {
- std::thread::id its_id = std::this_thread::get_id();
+ const std::thread::id its_id = std::this_thread::get_id();
while (is_active_dispatcher(its_id)) {
std::unique_lock<std::mutex> its_lock(handlers_mutex_);
if (is_dispatching_ && handlers_.empty()) {
@@ -1231,35 +1232,44 @@ void application_impl::dispatch() {
}
}
}
-
- std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
- elapsed_dispatchers_.insert(std::this_thread::get_id());
+ {
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
+ elapsed_dispatchers_.insert(its_id);
+ }
}
void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
- std::thread::id its_id = std::this_thread::get_id();
+ const std::thread::id its_id = std::this_thread::get_id();
boost::asio::steady_timer its_dispatcher_timer(io_);
its_dispatcher_timer.expires_from_now(std::chrono::milliseconds(max_dispatch_time_));
its_dispatcher_timer.async_wait([this, its_id](const boost::system::error_code &_error) {
if (!_error) {
VSOMEIP_INFO << "Blocking call detected. Client=" << std::hex << get_client();
- std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
- blocked_dispatchers_.insert(its_id);
-
- if (has_active_dispatcher()) {
+ bool active_dispatcher_available(false);
+ {
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
+ blocked_dispatchers_.insert(its_id);
+ active_dispatcher_available = has_active_dispatcher();
+ }
+ if (active_dispatcher_available) {
+ std::lock_guard<std::mutex> its_lock(handlers_mutex_);
dispatcher_condition_.notify_all();
- } else {
+ } else if (is_dispatching_) {
// If possible, create a new dispatcher thread to unblock.
- // If this is _not_ possible, dispatching is blocked until at least
- // one of the active handler calls returns.
+ // If this is _not_ possible, dispatching is blocked until
+ // at least one of the active handler calls returns.
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
if (dispatchers_.size() < max_dispatchers_) {
auto its_dispatcher = std::make_shared<std::thread>(
- std::bind(&application_impl::dispatch, this));
+ std::bind(&application_impl::dispatch, shared_from_this()));
dispatchers_[its_dispatcher->get_id()] = its_dispatcher;
} else {
VSOMEIP_ERROR << "Maximum number of dispatchers exceeded.";
}
+ } else {
+ VSOMEIP_INFO << "Won't start new dispatcher thread as Client="
+ << std::hex << get_client() << " is shutting down";
}
}
});
@@ -1273,7 +1283,7 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
}
bool application_impl::has_active_dispatcher() {
- for (auto d : dispatchers_) {
+ for (const auto &d : dispatchers_) {
if (blocked_dispatchers_.find(d.first) == blocked_dispatchers_.end() &&
elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) {
return true;
@@ -1282,9 +1292,9 @@ bool application_impl::has_active_dispatcher() {
return false;
}
-bool application_impl::is_active_dispatcher(std::thread::id &_id) {
+bool application_impl::is_active_dispatcher(const std::thread::id &_id) {
std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
- for (auto d : dispatchers_) {
+ for (const auto &d : dispatchers_) {
if (d.first != _id &&
blocked_dispatchers_.find(d.first) == blocked_dispatchers_.end() &&
elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) {
@@ -1328,7 +1338,7 @@ void application_impl::clear_all_handler() {
members_.clear();
}
{
- std::unique_lock<std::mutex> its_lock(handlers_mutex_);
+ std::lock_guard<std::mutex> its_lock(handlers_mutex_);
handlers_.clear();
}
}
@@ -1344,12 +1354,17 @@ void application_impl::shutdown() {
stop_cv_.wait(its_lock);
}
}
+ std::map<std::thread::id, std::shared_ptr<std::thread>> its_dispatchers;
{
- std::lock_guard<std::mutex> its_lock(handlers_mutex_);
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
+ its_dispatchers = dispatchers_;
+ }
+ {
+ std::lock_guard<std::mutex> its_handler_lock(handlers_mutex_);
is_dispatching_ = false;
+ dispatcher_condition_.notify_all();
}
- dispatcher_condition_.notify_all();
- for (auto its_dispatcher : dispatchers_) {
+ for (auto its_dispatcher : its_dispatchers) {
if (its_dispatcher.second->get_id() != stop_caller_id_) {
if (its_dispatcher.second->joinable()) {
its_dispatcher.second->join();
@@ -1392,7 +1407,7 @@ bool application_impl::is_routing() const {
void application_impl::send_back_cached_event(service_t _service,
instance_t _instance,
event_t _event) {
- std::shared_ptr<event> its_event = routing_->get_event(_service,
+ std::shared_ptr<event> its_event = routing_->find_event(_service,
_instance, _event);
if (its_event && its_event->is_field() && its_event->is_set()) {
std::shared_ptr<message> its_message = runtime_->create_notification();
@@ -1402,6 +1417,11 @@ void application_impl::send_back_cached_event(service_t _service,
its_message->set_payload(its_event->get_payload());
its_message->set_initial(true);
on_message(std::move(its_message));
+ VSOMEIP_INFO << "Sending back cached event ("
+ << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event << "]";
}
}
@@ -1413,12 +1433,20 @@ void application_impl::send_back_cached_eventgroup(service_t _service,
for(const auto &its_event : its_events) {
if (its_event && its_event->is_field() && its_event->is_set()) {
std::shared_ptr<message> its_message = runtime_->create_notification();
+ const event_t its_event_id(its_event->get_event());
its_message->set_service(_service);
- its_message->set_method(its_event->get_event());
+ its_message->set_method(its_event_id);
its_message->set_instance(_instance);
its_message->set_payload(its_event->get_payload());
its_message->set_initial(true);
on_message(std::move(its_message));
+ VSOMEIP_INFO << "Sending back cached event ("
+ << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event_id
+ << "] from eventgroup "
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup;
}
}
}
@@ -1432,91 +1460,114 @@ void application_impl::check_send_back_cached_event(
service_t _service, instance_t _instance, event_t _event,
eventgroup_t _eventgroup, bool *_send_back_cached_event,
bool *_send_back_cached_eventgroup) {
- std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_);
-
- bool already_subscibed(false);
- auto found_service = eventgroup_subscriptions_.find(_service);
- if(found_service != eventgroup_subscriptions_.end()) {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+ *_send_back_cached_event = false;
+ *_send_back_cached_eventgroup = false;
+ bool already_subscribed(false);
+ auto found_service = subscriptions_.find(_service);
+ if(found_service != subscriptions_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
- auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- already_subscibed = true;
- }
- }
- }
-
- if (already_subscibed) {
- auto found_service = event_subscriptions_.find(_service);
- if(found_service != event_subscriptions_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_event = found_instance->second.find(_event);
- if (found_event != found_instance->second.end()) {
- if(found_event->second) {
- // initial values for this event have already been sent,
- // send back cached value
+ auto found_event = found_instance->second.find(_event);
+ if (found_event != found_instance->second.end()) {
+ auto found_eventgroup = found_event->second.find(_eventgroup);
+ if (found_eventgroup != found_event->second.end()) {
+ already_subscribed = true;
+ if (found_eventgroup->second) {
+ // initial values for this event have already been
+ // received, send back cached value
if(_event == ANY_EVENT) {
*_send_back_cached_eventgroup = true;
} else {
*_send_back_cached_event = true;
}
}
- return;
}
}
}
}
- event_subscriptions_[_service][_instance][_event] = false;
- eventgroup_subscriptions_[_service][_instance].insert(_eventgroup);
+
+ if (!already_subscribed) {
+ subscriptions_[_service][_instance][_event][_eventgroup] = false;
+ }
}
void application_impl::remove_subscription(service_t _service,
instance_t _instance,
- eventgroup_t _eventgroup) {
- std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_);
+ eventgroup_t _eventgroup,
+ event_t _event) {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
- auto found_service1 = eventgroup_subscriptions_.find(_service);
- if(found_service1 != eventgroup_subscriptions_.end()) {
- auto found_instance = found_service1->second.find(_instance);
- if (found_instance != found_service1->second.end()) {
- auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- found_instance->second.erase(_eventgroup);
- if (!found_instance->second.size()) {
- found_service1->second.erase(_instance);
- if (!found_service1->second.size()) {
- eventgroup_subscriptions_.erase(_service);
+ auto found_service = subscriptions_.find(_service);
+ if(found_service != subscriptions_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ auto found_event = found_instance->second.find(_event);
+ if (found_event != found_instance->second.end()) {
+ if (found_event->second.erase(_eventgroup)) {
+ if (!found_event->second.size()) {
+ found_instance->second.erase(_event);
+ if (!found_instance->second.size()) {
+ found_service->second.erase(_instance);
+ if (!found_service->second.size()) {
+ subscriptions_.erase(_service);
+ }
+ }
}
}
}
}
}
+}
- auto found_service = event_subscriptions_.find(_service);
- if (found_service != event_subscriptions_.end()) {
+bool application_impl::check_for_active_subscription(service_t _service,
+ instance_t _instance,
+ event_t _event) {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+ auto found_service = subscriptions_.find(_service);
+ if(found_service != subscriptions_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
- if (routing_) {
- std::set<std::shared_ptr<event>> its_events =
- routing_->find_events(_service, _instance,
- _eventgroup);
- for (const auto &e : its_events) {
- const event_t its_event(e->get_event());
- auto found_event = found_instance->second.find(its_event);
- if (found_event != found_instance->second.end()) {
- found_instance->second.erase(its_event);
+ auto found_event = found_instance->second.find(_event);
+ if (found_event != found_instance->second.end()) {
+ if (found_event->second.size()) {
+ for (auto &eventgroup : found_event->second) {
+ eventgroup.second = true;
}
+ return true;
}
- if (!found_instance->second.size()) {
- found_service->second.erase(_instance);
- if (!found_service->second.size()) {
- event_subscriptions_.erase(_service);
+ } else {
+ // Received a event which nobody yet explicitly subscribed to.
+ // Check if someone subscribed to ANY_EVENT for one of
+ // the received event's eventgroups
+ auto found_any_event = found_instance->second.find(ANY_EVENT);
+ if (found_any_event != found_instance->second.end()) {
+ if (routing_) {
+ std::shared_ptr<event> its_event = routing_->find_event(
+ _service, _instance, _event);
+ if (its_event) {
+ for (const auto eg : its_event->get_eventgroups()) {
+ auto found_eventgroup = found_any_event->second.find(eg);
+ if (found_eventgroup != found_any_event->second.end()) {
+ // set the flag for initial event received to true
+ // even if we might not already received all of the
+ // eventgroups events.
+ found_eventgroup->second = true;
+ return true;
+ }
+ }
+ }
}
}
}
}
}
+ // Return false if an event was received from:
+ // - a service which nobody yet subscribed to
+ // - a service instance which nobody yet subscribed to
+ // - a service instance and nobody yet subscribed to one of the event's
+ // eventgroups
+ return false;
}
} // namespace vsomeip
diff --git a/implementation/service_discovery/include/defines.hpp b/implementation/service_discovery/include/defines.hpp
index 425580a..1f884d2 100644
--- a/implementation/service_discovery/include/defines.hpp
+++ b/implementation/service_discovery/include/defines.hpp
@@ -43,5 +43,7 @@
#define VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY 1000
#define VSOMEIP_SD_DEFAULT_REQUEST_RESPONSE_DELAY 2000
#define VSOMEIP_SD_DEFAULT_OFFER_DEBOUNCE_TIME 500
+#define VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME 500
+
#endif // VSOMEIP_SD_DEFINES_HPP
diff --git a/implementation/service_discovery/include/service_discovery.hpp b/implementation/service_discovery/include/service_discovery.hpp
index 43aa9cb..19f3c93 100644
--- a/implementation/service_discovery/include/service_discovery.hpp
+++ b/implementation/service_discovery/include/service_discovery.hpp
@@ -46,7 +46,7 @@ public:
virtual void unsubscribe_client(service_t _service, instance_t _instance,
client_t _client) = 0;
- virtual bool send(bool _is_announcing, bool _is_find = false) = 0;
+ virtual bool send(bool _is_announcing) = 0;
virtual void on_message(const byte_t *_data, length_t _length,
const boost::asio::ip::address &_sender,
diff --git a/implementation/service_discovery/include/service_discovery_host.hpp b/implementation/service_discovery/include/service_discovery_host.hpp
index f0ca34b..710e4e7 100644
--- a/implementation/service_discovery/include/service_discovery_host.hpp
+++ b/implementation/service_discovery/include/service_discovery_host.hpp
@@ -71,7 +71,8 @@ public:
const boost::asio::ip::address &_address, uint16_t _port) = 0;
virtual void on_subscribe_ack(client_t _client,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup) = 0;
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event) = 0;
virtual std::shared_ptr<endpoint> find_or_create_remote_client(
service_t _service, instance_t _instance,
@@ -85,7 +86,8 @@ public:
const std::chrono::steady_clock::time_point &_expiration) = 0;
virtual void on_subscribe_nack(client_t _client,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup) = 0;
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event) = 0;
virtual bool has_identified(client_t _client, service_t _service,
instance_t _instance, bool _reliable) = 0;
diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp
index 13540d2..09d4bd7 100644
--- a/implementation/service_discovery/include/service_discovery_impl.hpp
+++ b/implementation/service_discovery/include/service_discovery_impl.hpp
@@ -76,7 +76,7 @@ public:
void unsubscribe_client(service_t _service, instance_t _instance,
client_t _client);
- bool send(bool _is_announcing, bool _is_find);
+ bool send(bool _is_announcing);
void on_message(const byte_t *_data, length_t _length,
const boost::asio::ip::address &_sender,
@@ -104,7 +104,8 @@ private:
const boost::asio::ip::address &_address, uint16_t _port,
bool _is_reliable);
void insert_find_entries(std::shared_ptr<message_impl> &_message,
- uint32_t _start, uint32_t &_size, bool &_done);
+ const requests_t &_requests, uint32_t _start,
+ uint32_t &_size, bool &_done);
void insert_offer_entries(std::shared_ptr<message_impl> &_message,
const services_t &_services, uint32_t &_start,
uint32_t _size, bool &_done, bool _ignore_phase);
@@ -149,7 +150,8 @@ private:
std::shared_ptr<eventgroupentry_impl> &_entry,
const std::vector<std::shared_ptr<option_impl> > &_options,
std::shared_ptr < message_impl > &its_message_response,
- std::vector <accepted_subscriber_t> &accepted_subscribers);
+ std::vector <accepted_subscriber_t> &accepted_subscribers,
+ const boost::asio::ip::address &_destination);
void handle_eventgroup_subscription(service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
major_version_t _major, ttl_t _ttl, uint8_t _counter, uint16_t _reserved,
@@ -220,10 +222,19 @@ private:
void start_offer_debounce_timer(bool _first_start);
void on_offer_debounce_timer_expired(const boost::system::error_code &_error);
+
+ void start_find_debounce_timer(bool _first_start);
+ void on_find_debounce_timer_expired(const boost::system::error_code &_error);
+
+
void on_repetition_phase_timer_expired(
const boost::system::error_code &_error,
std::shared_ptr<boost::asio::steady_timer> _timer,
std::uint8_t _repetition, std::uint32_t _last_delay);
+ void on_find_repetition_phase_timer_expired(
+ const boost::system::error_code &_error,
+ std::shared_ptr<boost::asio::steady_timer> _timer,
+ std::uint8_t _repetition, std::uint32_t _last_delay);
void move_offers_into_main_phase(
const std::shared_ptr<boost::asio::steady_timer> &_timer);
@@ -233,6 +244,12 @@ private:
std::vector<std::shared_ptr<message_impl>> &_messages,
const services_t &_offers, bool _ignore_phase);
+ void fill_message_with_find_entries(
+ std::shared_ptr<runtime> _runtime,
+ std::shared_ptr<message_impl> _message,
+ std::vector<std::shared_ptr<message_impl>> &_messages,
+ const requests_t &_requests);
+
bool serialize_and_send_messages(
const std::vector<std::shared_ptr<message_impl>> &_messages);
@@ -313,11 +330,21 @@ private:
std::mutex collected_offers_mutex_;
services_t collected_offers_;
+ std::chrono::milliseconds find_debounce_time_;
+ std::mutex find_debounce_timer_mutex_;
+ boost::asio::steady_timer find_debounce_timer_;
+ requests_t collected_finds_;
+
// this map contains the offers and their timers currently in repetition phase
std::mutex repetition_phase_timers_mutex_;
std::map<std::shared_ptr<boost::asio::steady_timer>,
services_t> repetition_phase_timers_;
+ // this map contains the finds and their timers currently in repetition phase
+ std::mutex find_repetition_phase_timers_mutex_;
+ std::map<std::shared_ptr<boost::asio::steady_timer>,
+ requests_t> find_repetition_phase_timers_;
+
std::mutex main_phase_timer_mutex_;
boost::asio::steady_timer main_phase_timer_;
diff --git a/implementation/service_discovery/include/subscription.hpp b/implementation/service_discovery/include/subscription.hpp
index 2d23d4e..dc8222c 100644
--- a/implementation/service_discovery/include/subscription.hpp
+++ b/implementation/service_discovery/include/subscription.hpp
@@ -6,7 +6,6 @@
#ifndef VSOMEIP_SD_SUBSCRIPTION_HPP
#define VSOMEIP_SD_SUBSCRIPTION_HPP
-#include <chrono>
#include <memory>
#include <vsomeip/primitive_types.hpp>
@@ -24,8 +23,7 @@ public:
std::shared_ptr<endpoint> _reliable,
std::shared_ptr<endpoint> _unreliable,
subscription_type_e _subscription_type,
- uint8_t _counter,
- std::chrono::steady_clock::time_point _expiration);
+ uint8_t _counter);
~subscription();
major_version_t get_major() const;
@@ -44,9 +42,6 @@ public:
uint8_t get_counter() const;
- std::chrono::steady_clock::time_point get_expiration() const;
- void set_expiration(std::chrono::steady_clock::time_point _expiration);
-
private:
major_version_t major_;
ttl_t ttl_;
@@ -60,8 +55,6 @@ private:
subscription_type_e subscription_type_;
uint8_t counter_;
-
- std::chrono::steady_clock::time_point expiration_;
};
} // namespace sd
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index 0cc2d6f..9fc955d 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -59,6 +59,8 @@ service_discovery_impl::service_discovery_impl(service_discovery_host *_host)
repetitions_max_(VSOMEIP_SD_DEFAULT_REPETITIONS_MAX),
cyclic_offer_delay_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY),
offer_debounce_timer_(_host->get_io()),
+ find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME),
+ find_debounce_timer_(_host->get_io()),
main_phase_timer_(_host->get_io()),
is_suspended_(true) {
std::chrono::seconds smallest_ttl(DEFAULT_TTL);
@@ -140,22 +142,12 @@ void service_discovery_impl::start() {
VSOMEIP_ERROR << "Couldn't start service discovery";
return;
}
- // Send out pending find services messages if have any
- bool send_find(false);
- {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
- if (requested_.size()) {
- send_find = true;
- }
- }
- if (send_find) {
- send(false, true);
- }
}
is_suspended_ = false;
start_main_phase_timer();
start_offer_debounce_timer(true);
+ start_find_debounce_timer(true);
}
void service_discovery_impl::stop() {
@@ -170,36 +162,38 @@ void service_discovery_impl::stop() {
offer_debounce_timer_.cancel(ec);
}
{
+ std::lock_guard<std::mutex> its_lock(find_debounce_timer_mutex_);
+ find_debounce_timer_.cancel(ec);
+ }
+ {
std::lock_guard<std::mutex> its_lock(repetition_phase_timers_mutex_);
for(const auto &t : repetition_phase_timers_) {
t.first->cancel(ec);
}
}
+ {
+ std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_);
+ for(const auto &t : find_repetition_phase_timers_) {
+ t.first->cancel(ec);
+ }
+ }
+
}
void service_discovery_impl::request_service(service_t _service,
instance_t _instance, major_version_t _major, minor_version_t _minor,
ttl_t _ttl) {
- bool is_new_request(true);
- {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
- auto find_service = requested_.find(_service);
- if (find_service != requested_.end()) {
- auto find_instance = find_service->second.find(_instance);
- if (find_instance != find_service->second.end()) {
- is_new_request = false;
- // TODO: check version and report errors
- } else {
- find_service->second[_instance] = std::make_shared < request
- > (_major, _minor, _ttl);
- }
- } else {
- requested_[_service][_instance] = std::make_shared < request
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
+ auto find_service = requested_.find(_service);
+ if (find_service != requested_.end()) {
+ auto find_instance = find_service->second.find(_instance);
+ if (find_instance == find_service->second.end()) {
+ find_service->second[_instance] = std::make_shared < request
> (_major, _minor, _ttl);
}
- }
- if (is_new_request && !is_suspended_) {
- send(false, true);
+ } else {
+ requested_[_service][_instance] = std::make_shared < request
+ > (_major, _minor, _ttl);
}
}
@@ -239,11 +233,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
if (found_eventgroup != found_instance->second.end()) {
auto found_client = found_eventgroup->second.find(_client);
if (found_client != found_eventgroup->second.end()) {
- if (found_client->second->get_major() == _major) {
- found_client->second->set_ttl(_ttl);
- found_client->second->set_expiration(std::chrono::steady_clock::now()
- + std::chrono::seconds(_ttl));
- } else {
+ if (found_client->second->get_major() != _major) {
VSOMEIP_ERROR
<< "Subscriptions to different versions of the same "
"service instance are not supported!";
@@ -281,8 +271,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
// New subscription
std::shared_ptr < subscription > its_subscription = std::make_shared
< subscription > (_major, _ttl, its_reliable, its_unreliable,
- _subscription_type, subscribe_count,
- std::chrono::steady_clock::time_point() + std::chrono::seconds(_ttl));
+ _subscription_type, subscribe_count);
subscribed_[_service][_instance][_eventgroup][_client] = its_subscription;
if (has_address) {
@@ -774,44 +763,53 @@ void service_discovery_impl::insert_option(
}
void service_discovery_impl::insert_find_entries(
- std::shared_ptr<message_impl> &_message,
+ std::shared_ptr<message_impl> &_message, const requests_t &_requests,
uint32_t _start, uint32_t &_size, bool &_done) {
std::lock_guard<std::mutex> its_lock(requested_mutex_);
uint32_t its_size(0);
uint32_t i = 0;
_done = true;
- for (auto its_service : requested_) {
+ for (auto its_service : _requests) {
for (auto its_instance : its_service.second) {
auto its_request = its_instance.second;
- uint8_t its_sent_counter = its_request->get_sent_counter();
- if (its_sent_counter != repetitions_max_ + 1) {
- if (i >= _start) {
- if (its_size + VSOMEIP_SOMEIP_SD_ENTRY_SIZE <= max_message_size_) {
- std::shared_ptr < serviceentry_impl > its_entry =
- _message->create_service_entry();
- if (its_entry) {
- its_entry->set_type(entry_type_e::FIND_SERVICE);
- its_entry->set_service(its_service.first);
- its_entry->set_instance(its_instance.first);
- its_entry->set_major_version(its_request->get_major());
- its_entry->set_minor_version(its_request->get_minor());
- its_entry->set_ttl(its_request->get_ttl());
- its_size += VSOMEIP_SOMEIP_SD_ENTRY_SIZE;
- its_sent_counter++;
-
- its_request->set_sent_counter(its_sent_counter);
- } else {
- VSOMEIP_ERROR << "Failed to create service entry!";
+
+ // check if release_service was called
+ auto the_service = requested_.find(its_service.first);
+ if ( the_service != requested_.end() ) {
+ auto the_instance = the_service->second.find(its_instance.first);
+ if(the_instance != the_service->second.end() ) {
+ uint8_t its_sent_counter = its_request->get_sent_counter();
+ if (its_sent_counter != repetitions_max_ + 1) {
+ if (i >= _start) {
+ if (its_size + VSOMEIP_SOMEIP_SD_ENTRY_SIZE <= max_message_size_) {
+ std::shared_ptr < serviceentry_impl > its_entry =
+ _message->create_service_entry();
+ if (its_entry) {
+ its_entry->set_type(entry_type_e::FIND_SERVICE);
+ its_entry->set_service(its_service.first);
+ its_entry->set_instance(its_instance.first);
+ its_entry->set_major_version(its_request->get_major());
+ its_entry->set_minor_version(its_request->get_minor());
+ its_entry->set_ttl(its_request->get_ttl());
+ its_size += VSOMEIP_SOMEIP_SD_ENTRY_SIZE;
+ its_sent_counter++;
+
+ its_request->set_sent_counter(its_sent_counter);
+ } else {
+ VSOMEIP_ERROR << "Failed to create service entry!";
+ }
+ } else {
+ _done = false;
+ _size = its_size;
+ return;
+ }
}
- } else {
- _done = false;
- _size = its_size;
- return;
}
+ i++;
}
}
- i++;
+
}
}
_size = its_size;
@@ -912,7 +910,7 @@ void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared
std::shared_ptr < endpoint > its_endpoint;
its_endpoint = _subscription->get_endpoint(true);
- if (its_endpoint) {
+ if (its_endpoint && its_endpoint->is_connected()) {
insert_option(_message, its_stop_entry, unicast_,
its_endpoint->get_local_port(), true);
insert_option(_message, its_entry, unicast_,
@@ -931,6 +929,25 @@ void service_discovery_impl::insert_subscription_ack(
std::shared_ptr<message_impl> &_message, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, uint8_t _counter, major_version_t _major, uint16_t _reserved) {
+
+ for (auto its_entry : _message->get_entries()) {
+ if (its_entry->is_eventgroup_entry()) {
+ std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry =
+ std::dynamic_pointer_cast < eventgroupentry_impl
+ > (its_entry);
+ if(its_eventgroup_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
+ && its_eventgroup_entry->get_service() == _service
+ && its_eventgroup_entry->get_instance() == _instance
+ && its_eventgroup_entry->get_eventgroup() == _eventgroup
+ && its_eventgroup_entry->get_major_version() == _major
+ && its_eventgroup_entry->get_reserved() == _reserved
+ && its_eventgroup_entry->get_counter() == _counter
+ && its_eventgroup_entry->get_ttl() == _ttl) {
+ return;
+ }
+ }
+ }
+
std::shared_ptr < eventgroupentry_impl > its_entry =
_message->create_eventgroup_entry();
its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP_ACK);
@@ -968,36 +985,23 @@ void service_discovery_impl::insert_subscription_nack(
its_entry->set_ttl(0x0);
}
-bool service_discovery_impl::send(bool _is_announcing, bool _is_find) {
+bool service_discovery_impl::send(bool _is_announcing) {
std::shared_ptr < runtime > its_runtime = runtime_.lock();
if (its_runtime) {
std::vector< std::shared_ptr< message_impl > > its_messages;
std::shared_ptr < message_impl > its_message;
- if (_is_find || !_is_announcing) {
- uint32_t its_start(0);
- uint32_t its_size(0);
- bool is_done(false);
- while (!is_done) {
- its_message = its_runtime->create_message();
- its_messages.push_back(its_message);
-
- insert_find_entries(its_message, its_start, its_size, is_done);
- its_start += its_size / VSOMEIP_SOMEIP_SD_ENTRY_SIZE;
- };
- } else {
+ if(_is_announcing) {
its_message = its_runtime->create_message();
its_messages.push_back(its_message);
- }
- if (!_is_find) {
services_t its_offers = host_->get_offered_services();
fill_message_with_offer_entries(its_runtime, its_message,
its_messages, its_offers, false);
- }
- // Serialize and send
- return serialize_and_send_messages(its_messages);
+ // Serialize and send
+ return serialize_and_send_messages(its_messages);
+ }
}
return false;
}
@@ -1063,7 +1067,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry =
std::dynamic_pointer_cast < eventgroupentry_impl
> (its_entry);
- process_eventgroupentry( its_eventgroup_entry, its_options, its_message_response, accepted_subscribers);
+ process_eventgroupentry( its_eventgroup_entry, its_options, its_message_response, accepted_subscribers, _destination);
}
}
@@ -1182,6 +1186,8 @@ void service_discovery_impl::process_serviceentry(
} else {
std::shared_ptr<request> its_request = find_request(its_service, its_instance);
if (its_request) {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
+ // ID: SIP_SD_830
its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
}
unsubscribe_all(its_service, its_instance);
@@ -1203,8 +1209,10 @@ void service_discovery_impl::process_offerservice_serviceentry(
return;
std::shared_ptr<request> its_request = find_request(_service, _instance);
- if (its_request)
+ if (its_request) {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
+ }
smallest_ttl_ = host_->add_routing_info(_service, _instance,
_major, _minor, _ttl,
@@ -1547,7 +1555,8 @@ void service_discovery_impl::process_eventgroupentry(
std::shared_ptr<eventgroupentry_impl> &_entry,
const std::vector<std::shared_ptr<option_impl> > &_options,
std::shared_ptr < message_impl > &its_message_response,
- std::vector <accepted_subscriber_t> &accepted_subscribers) {
+ std::vector <accepted_subscriber_t> &accepted_subscribers,
+ const boost::asio::ip::address &_destination) {
service_t its_service = _entry->get_service();
instance_t its_instance = _entry->get_instance();
eventgroup_t its_eventgroup = _entry->get_eventgroup();
@@ -1567,6 +1576,14 @@ void service_discovery_impl::process_eventgroupentry(
}
if(its_type == entry_type_e::SUBSCRIBE_EVENTGROUP) {
+ if( _destination.is_multicast() ) {
+ VSOMEIP_ERROR << "Received a SubscribeEventGroup entry on multicast address";
+ if(its_ttl > 0) {
+ insert_subscription_nack(its_message_response, its_service, its_instance,
+ its_eventgroup, its_counter, its_major, its_reserved);
+ }
+ return;
+ }
if (_entry->get_num_options(1) == 0
&& _entry->get_num_options(2) == 0) {
VSOMEIP_ERROR << "Invalid number of options in SubscribeEventGroup entry";
@@ -1785,6 +1802,7 @@ void service_discovery_impl::process_eventgroupentry(
if(its_ttl > 0) {
insert_subscription_nack(its_message_response, its_service, its_instance,
its_eventgroup, its_counter, its_major, its_reserved);
+ return;
}
break;
}
@@ -1982,12 +2000,27 @@ void service_discovery_impl::handle_eventgroup_subscription_nack(service_t _serv
if (client.second->get_counter() == _counter) {
// Deliver nack
nackedClient = client.first;
- host_->on_subscribe_nack(client.first, _service, _instance, _eventgroup);
+ host_->on_subscribe_nack(client.first, _service,
+ _instance, _eventgroup, ANY_EVENT);
break;
}
}
- // Remove nacked subscription
- found_eventgroup->second.erase(nackedClient);
+
+ // Restart TCP connection only for non selective subscriptions
+ for (auto client : found_eventgroup->second) {
+ if( !client.second->is_acknowledged()
+ && client.first == VSOMEIP_ROUTING_CLIENT ) {
+ auto endpoint = client.second->get_endpoint(true);
+ if(endpoint) {
+ endpoint->restart();
+ }
+ }
+ }
+
+ // Remove nacked subscription only for selective events
+ if(nackedClient != VSOMEIP_ROUTING_CLIENT) {
+ found_eventgroup->second.erase(nackedClient);
+ }
}
}
}
@@ -2010,7 +2043,7 @@ void service_discovery_impl::handle_eventgroup_subscription_ack(
if (its_client.second->get_counter() == _counter) {
its_client.second->set_acknowledged(true);
host_->on_subscribe_ack(its_client.first, _service,
- _instance, _eventgroup);
+ _instance, _eventgroup, ANY_EVENT);
}
if (_address.is_multicast()) {
host_->on_subscribe_ack(_service, _instance, _address,
@@ -2294,6 +2327,92 @@ void service_discovery_impl::start_offer_debounce_timer(bool _first_start) {
this, std::placeholders::_1));
}
+
+
+void service_discovery_impl::start_find_debounce_timer(bool _first_start) {
+ std::lock_guard<std::mutex> its_lock(find_debounce_timer_mutex_);
+ boost::system::error_code ec;
+ if (_first_start) {
+ find_debounce_timer_.expires_from_now(initial_delay_, ec);
+ } else {
+ find_debounce_timer_.expires_from_now(find_debounce_time_, ec);
+ }
+ if (ec) {
+ VSOMEIP_ERROR<< "service_discovery_impl::start_find_debounce_timer "
+ "setting expiry time of timer failed: " << ec.message();
+ }
+ find_debounce_timer_.async_wait(
+ std::bind(
+ &service_discovery_impl::on_find_debounce_timer_expired,
+ this, std::placeholders::_1));
+}
+
+//initial delay
+void service_discovery_impl::on_find_debounce_timer_expired(
+ const boost::system::error_code &_error) {
+ if(_error) { // timer was canceled
+ return;
+ }
+ // only copy the accumulated requests of the initial wait phase
+ // if the sent counter for the request is zero.
+ requests_t repetition_phase_finds;
+ bool new_finds(false);
+ {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
+ for (const auto its_service : requested_) {
+ for (const auto its_instance : its_service.second) {
+ if( its_instance.second->get_sent_counter() == 0) {
+ repetition_phase_finds[its_service.first][its_instance.first] = its_instance.second;
+ }
+ }
+ }
+ if (repetition_phase_finds.size()) {
+ new_finds = true;
+ }
+ }
+
+ if (!new_finds) {
+ start_find_debounce_timer(false);
+ return;
+ }
+
+ // Sent out finds for the first time as initial wait phase ended
+ std::shared_ptr<runtime> its_runtime = runtime_.lock();
+ if (its_runtime) {
+ std::vector<std::shared_ptr<message_impl>> its_messages;
+ std::shared_ptr<message_impl> its_message =
+ its_runtime->create_message();
+ its_messages.push_back(its_message);
+ // Serialize and send FindService (increments sent counter in requested_ map)
+ fill_message_with_find_entries(its_runtime, its_message, its_messages,
+ repetition_phase_finds);
+ serialize_and_send_messages(its_messages);
+ }
+
+ std::chrono::milliseconds its_delay(repetitions_base_delay_);
+ std::uint8_t its_repetitions(1);
+
+ std::shared_ptr<boost::asio::steady_timer> its_timer = std::make_shared<
+ boost::asio::steady_timer>(host_->get_io());
+ {
+ std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_);
+ find_repetition_phase_timers_[its_timer] = repetition_phase_finds;
+ }
+
+ boost::system::error_code ec;
+ its_timer->expires_from_now(its_delay, ec);
+ if (ec) {
+ VSOMEIP_ERROR<< "service_discovery_impl::on_find_debounce_timer_expired "
+ "setting expiry time of timer failed: " << ec.message();
+ }
+ its_timer->async_wait(
+ std::bind(
+ &service_discovery_impl::on_find_repetition_phase_timer_expired,
+ this, std::placeholders::_1, its_timer, its_repetitions,
+ its_delay.count()));
+ start_find_debounce_timer(false);
+}
+
void service_discovery_impl::on_offer_debounce_timer_expired(
const boost::system::error_code &_error) {
if(_error) { // timer was canceled
@@ -2385,20 +2504,10 @@ void service_discovery_impl::on_repetition_phase_timer_expired(
if (its_timer_pair != repetition_phase_timers_.end()) {
std::chrono::milliseconds new_delay(0);
std::uint8_t repetition(0);
+ bool move_to_main(false);
if (_repetition <= repetitions_max_) {
// sent offers, double time to wait and start timer again.
- std::shared_ptr<runtime> its_runtime = runtime_.lock();
- if (its_runtime) {
- std::vector<std::shared_ptr<message_impl>> its_messages;
- std::shared_ptr<message_impl> its_message =
- its_runtime->create_message();
- its_messages.push_back(its_message);
- fill_message_with_offer_entries(its_runtime, its_message,
- its_messages, its_timer_pair->second, true);
-
- // Serialize and send
- serialize_and_send_messages(its_messages);
- }
+
new_delay = std::chrono::milliseconds(_last_delay * 2);
repetition = ++_repetition;
} else {
@@ -2410,13 +2519,28 @@ void service_discovery_impl::on_repetition_phase_timer_expired(
// the cyclic offer delay before moving the offers in to main
// phase
if (last_offer_shorter_half_offer_delay_ago()) {
- move_offers_into_main_phase(_timer);
- return;
+ move_to_main = true;
} else {
new_delay = cyclic_offer_delay_;
repetition = 0;
}
}
+ std::shared_ptr<runtime> its_runtime = runtime_.lock();
+ if (its_runtime) {
+ std::vector<std::shared_ptr<message_impl>> its_messages;
+ std::shared_ptr<message_impl> its_message =
+ its_runtime->create_message();
+ its_messages.push_back(its_message);
+ fill_message_with_offer_entries(its_runtime, its_message,
+ its_messages, its_timer_pair->second, true);
+
+ // Serialize and send
+ serialize_and_send_messages(its_messages);
+ }
+ if (move_to_main) {
+ move_offers_into_main_phase(_timer);
+ return;
+ }
boost::system::error_code ec;
its_timer_pair->first->expires_from_now(new_delay, ec);
if (ec) {
@@ -2433,6 +2557,55 @@ void service_discovery_impl::on_repetition_phase_timer_expired(
}
}
+
+void service_discovery_impl::on_find_repetition_phase_timer_expired(
+ const boost::system::error_code &_error,
+ std::shared_ptr<boost::asio::steady_timer> _timer,
+ std::uint8_t _repetition, std::uint32_t _last_delay) {
+ if (_error) {
+ return;
+ }
+
+ std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_);
+ auto its_timer_pair = find_repetition_phase_timers_.find(_timer);
+ if (its_timer_pair != find_repetition_phase_timers_.end()) {
+ std::chrono::milliseconds new_delay(0);
+ std::uint8_t repetition(0);
+ if (_repetition <= repetitions_max_) {
+ // sent findService entries in one message, double time to wait and start timer again.
+ std::shared_ptr<runtime> its_runtime = runtime_.lock();
+ if (its_runtime) {
+ std::vector<std::shared_ptr<message_impl>> its_messages;
+ std::shared_ptr<message_impl> its_message =
+ its_runtime->create_message();
+ its_messages.push_back(its_message);
+ fill_message_with_find_entries(its_runtime, its_message,
+ its_messages, its_timer_pair->second);
+ serialize_and_send_messages(its_messages);
+ }
+ new_delay = std::chrono::milliseconds(_last_delay * 2);
+ repetition = ++_repetition;
+ } else {
+ // repetition phase is now over, erase the timer on next expiry time
+ find_repetition_phase_timers_.erase(its_timer_pair);
+ return;
+ }
+ boost::system::error_code ec;
+ its_timer_pair->first->expires_from_now(new_delay, ec);
+ if (ec) {
+ VSOMEIP_ERROR <<
+ "service_discovery_impl::on_find_repetition_phase_timer_expired "
+ "setting expiry time of timer failed: " << ec.message();
+ }
+ its_timer_pair->first->async_wait(
+ std::bind(
+ &service_discovery_impl::on_find_repetition_phase_timer_expired,
+ this, std::placeholders::_1, its_timer_pair->first,
+ repetition, new_delay.count()));
+ }
+}
+
+
void service_discovery_impl::move_offers_into_main_phase(
const std::shared_ptr<boost::asio::steady_timer> &_timer) {
// HINT: make sure to lock the repetition_phase_timers_mutex_ before calling
@@ -2469,6 +2642,27 @@ void service_discovery_impl::fill_message_with_offer_entries(
}
}
+void service_discovery_impl::fill_message_with_find_entries(
+ std::shared_ptr<runtime> _runtime,
+ std::shared_ptr<message_impl> _message,
+ std::vector<std::shared_ptr<message_impl>> &_messages,
+ const requests_t &_requests) {
+ uint32_t its_start(0);
+ uint32_t its_size(0);
+ bool is_done(false);
+ while (!is_done) {
+ insert_find_entries(_message, _requests, its_start, its_size,
+ is_done);
+ its_start += its_size / VSOMEIP_SOMEIP_SD_ENTRY_SIZE;
+ if (!is_done) {
+ its_start = 0;
+ _message = _runtime->create_message();
+ _messages.push_back(_message);
+ }
+ };
+}
+
+
bool service_discovery_impl::serialize_and_send_messages(
const std::vector<std::shared_ptr<message_impl>> &_messages) {
bool has_sent(false);
@@ -2585,7 +2779,7 @@ void service_discovery_impl::on_main_phase_timer_expired(
if (_error) {
return;
}
- send(true, false);
+ send(true);
start_main_phase_timer();
}
diff --git a/implementation/service_discovery/src/subscription.cpp b/implementation/service_discovery/src/subscription.cpp
index 10e4a1c..c56b0d2 100644
--- a/implementation/service_discovery/src/subscription.cpp
+++ b/implementation/service_discovery/src/subscription.cpp
@@ -12,15 +12,13 @@ subscription::subscription(major_version_t _major, ttl_t _ttl,
std::shared_ptr<endpoint> _reliable,
std::shared_ptr<endpoint> _unreliable,
subscription_type_e _subscription_type,
- uint8_t _counter,
- std::chrono::steady_clock::time_point _expiration)
+ uint8_t _counter)
: major_(_major), ttl_(_ttl),
reliable_(_reliable), unreliable_(_unreliable),
is_acknowledged_(true),
tcp_connection_established_(false),
subscription_type_(_subscription_type),
- counter_(_counter),
- expiration_(_expiration) {
+ counter_(_counter) {
}
subscription::~subscription() {
@@ -73,13 +71,5 @@ uint8_t subscription::get_counter() const {
return counter_;
}
-std::chrono::steady_clock::time_point subscription::get_expiration() const {
- return expiration_;
-}
-
-void subscription::set_expiration(std::chrono::steady_clock::time_point _expiration) {
- expiration_ = _expiration;
-}
-
} // namespace sd
} // namespace vsomeip
diff --git a/implementation/utility/include/utility.hpp b/implementation/utility/include/utility.hpp
index f725517..a638860 100644
--- a/implementation/utility/include/utility.hpp
+++ b/implementation/utility/include/utility.hpp
@@ -108,6 +108,7 @@ public:
private:
static bool is_bigger_last_assigned_client_id(client_t _client);
static void set_client_id_lowbyte(client_t _client);
+ static void check_client_id_consistency();
};
} // namespace vsomeip
diff --git a/implementation/utility/src/utility.cpp b/implementation/utility/src/utility.cpp
index 93b9eac..8fc48db 100644
--- a/implementation/utility/src/utility.cpp
+++ b/implementation/utility/src/utility.cpp
@@ -279,6 +279,11 @@ bool utility::auto_configuration_init(const std::shared_ptr<configuration> &_con
if (0 != ret) {
VSOMEIP_ERROR << "pthread_mutexattr_setpshared() failed " << ret;
}
+ ret = pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
+ if (0 != ret) {
+ VSOMEIP_ERROR << "pthread_mutexattr_setrobust() failed " << ret;
+ }
+
} else {
VSOMEIP_ERROR << "pthread_mutexattr_init() failed " << ret;
}
@@ -357,7 +362,13 @@ bool utility::auto_configuration_init(const std::shared_ptr<configuration> &_con
} else {
the_configuration_data__ = configuration_data;
- pthread_mutex_lock(&the_configuration_data__->mutex_);
+ if (EOWNERDEAD == pthread_mutex_lock(&the_configuration_data__->mutex_)) {
+ VSOMEIP_WARNING << "utility::auto_configuration_init EOWNERDEAD";
+ check_client_id_consistency();
+ if (0 != pthread_mutex_consistent(&the_configuration_data__->mutex_)) {
+ VSOMEIP_ERROR << "pthread_mutex_consistent() failed ";
+ }
+ }
its_configuration_refs__++;
pthread_mutex_unlock(&the_configuration_data__->mutex_);
}
@@ -458,7 +469,13 @@ client_t utility::request_client_id(const std::shared_ptr<configuration> &_confi
assert(waitResult == WAIT_OBJECT_0);
(void)waitResult;
#else
- pthread_mutex_lock(&the_configuration_data__->mutex_);
+ if (EOWNERDEAD == pthread_mutex_lock(&the_configuration_data__->mutex_)) {
+ VSOMEIP_WARNING << "utility::request_client_id EOWNERDEAD";
+ check_client_id_consistency();
+ if (0 != pthread_mutex_consistent(&the_configuration_data__->mutex_)) {
+ VSOMEIP_ERROR << "pthread_mutex_consistent() failed ";
+ }
+ }
#endif
const std::string its_name = _config->get_routing_host();
bool set_client_as_manager_host(false);
@@ -580,7 +597,13 @@ void utility::release_client_id(client_t _client) {
#ifdef _WIN32
WaitForSingleObject(configuration_data_mutex, INFINITE);
#else
- pthread_mutex_lock(&the_configuration_data__->mutex_);
+ if (EOWNERDEAD == pthread_mutex_lock(&the_configuration_data__->mutex_)) {
+ VSOMEIP_WARNING << "utility::release_client_id EOWNERDEAD";
+ check_client_id_consistency();
+ if (0 != pthread_mutex_consistent(&the_configuration_data__->mutex_)) {
+ VSOMEIP_ERROR << "pthread_mutex_consistent() failed ";
+ }
+ }
#endif
int i = 0;
for (; i < the_configuration_data__->max_used_client_ids_index_; i++) {
@@ -590,11 +613,11 @@ void utility::release_client_id(client_t _client) {
}
if (i < the_configuration_data__->max_used_client_ids_index_) {
- the_configuration_data__->max_used_client_ids_index_--;
- for (; i < the_configuration_data__->max_used_client_ids_index_; i++) {
+ for (; i < (the_configuration_data__->max_used_client_ids_index_ - 1); i++) {
the_configuration_data__->used_client_ids_[i]
= the_configuration_data__->used_client_ids_[i+1];
}
+ the_configuration_data__->max_used_client_ids_index_--;
}
#ifdef _WIN32
ReleaseMutex(configuration_data_mutex);
@@ -613,7 +636,13 @@ bool utility::is_routing_manager_host(client_t _client) {
#ifdef _WIN32
WaitForSingleObject(configuration_data_mutex, INFINITE);
#else
- pthread_mutex_lock(&the_configuration_data__->mutex_);
+ if (EOWNERDEAD == pthread_mutex_lock(&the_configuration_data__->mutex_)) {
+ VSOMEIP_WARNING << "utility::is_routing_manager_host EOWNERDEAD";
+ check_client_id_consistency();
+ if (0 != pthread_mutex_consistent(&the_configuration_data__->mutex_)) {
+ VSOMEIP_ERROR << "pthread_mutex_consistent() failed ";
+ }
+ }
#endif
bool is_routing_manager = (the_configuration_data__->routing_manager_host_ == _client);
@@ -636,7 +665,13 @@ void utility::set_routing_manager_host(client_t _client) {
#ifdef _WIN32
WaitForSingleObject(configuration_data_mutex, INFINITE);
#else
- pthread_mutex_lock(&the_configuration_data__->mutex_);
+ if (EOWNERDEAD == pthread_mutex_lock(&the_configuration_data__->mutex_)) {
+ VSOMEIP_WARNING << "utility::set_routing_manager_host EOWNERDEAD";
+ check_client_id_consistency();
+ if (0 != pthread_mutex_consistent(&the_configuration_data__->mutex_)) {
+ VSOMEIP_ERROR << "pthread_mutex_consistent() failed ";
+ }
+ }
#endif
the_configuration_data__->routing_manager_host_ = _client;
@@ -661,4 +696,26 @@ void utility::set_client_id_lowbyte(client_t _client) {
% VSOMEIP_MAX_CLIENTS;
}
+void utility::check_client_id_consistency() {
+ if (1 < the_configuration_data__->max_used_client_ids_index_) {
+ client_t prevID = the_configuration_data__->used_client_ids_[0];
+ int i = 1;
+ for (; i < the_configuration_data__->max_used_client_ids_index_; i++) {
+ const client_t currID = the_configuration_data__->used_client_ids_[i];
+ if (prevID == currID) {
+ break;
+ }
+ prevID = currID;
+ }
+
+ if (i < the_configuration_data__->max_used_client_ids_index_) {
+ for (; i < (the_configuration_data__->max_used_client_ids_index_ - 1); i++) {
+ the_configuration_data__->used_client_ids_[i]
+ = the_configuration_data__->used_client_ids_[i+1];
+ }
+ the_configuration_data__->max_used_client_ids_index_--;
+ }
+ }
+}
+
} // namespace vsomeip
diff --git a/interface/vsomeip/application.hpp b/interface/vsomeip/application.hpp
index 6023d79..0b71329 100644
--- a/interface/vsomeip/application.hpp
+++ b/interface/vsomeip/application.hpp
@@ -291,7 +291,7 @@ public:
*
* A user application must call this function to subscribe to an eventgroup.
* Before calling subscribe it must register all events it interested in by
- * calls to @ref register_event. The method additionally allows to specify
+ * calls to @ref request_event. The method additionally allows to specify
* a specific event. If a specific event is specified, all other events of
* the eventgroup are not received by the application.
*
@@ -825,6 +825,20 @@ public:
* \param _routing_state the current routing state
*/
virtual void set_routing_state(routing_state_e _routing_state) = 0;
+
+ /**
+ *
+ * \brief Unsubscribes from an eventgroup.
+ *
+ * \param _service Service identifier of the service that contains the
+ * eventgroup.
+ * \param _instance Instance identifier of the service that contains the
+ * eventgroup.
+ * \param _eventgroup Eventgroup identifier of the eventgroup.
+ * \param _event Event to unsubscribe (pass ANY_EVENT for all events of the eventgroup)
+ */
+ virtual void unsubscribe(service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup, event_t _event) = 0;
};
/** @} */
diff --git a/interface/vsomeip/constants.hpp b/interface/vsomeip/constants.hpp
index 09e2f22..9aeee09 100644
--- a/interface/vsomeip/constants.hpp
+++ b/interface/vsomeip/constants.hpp
@@ -52,6 +52,7 @@ const byte_t SERVICE_COOKIE[] = { 0xFF, 0xFF, 0x80, 0x00, 0x00, 0x00, 0x00,
0x08, 0xDE, 0xAD, 0xBE, 0xEF, 0x01, 0x01, 0x02, 0x00 };
const event_t ANY_EVENT = 0xFFFF;
+const client_t ANY_CLIENT = 0xFFFF;
} // namespace vsomeip
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 91605ad..7f841af 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -1963,11 +1963,107 @@ if(NOT ${TESTS_BAT})
add_test(NAME ${TEST_INITIAL_EVENT_NAME}_diff_client_ids_partial_same_ports_both_tcp_and_udp
COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP_AND_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_PARTIAL_SAME_PORTS_MASTER_CONFIG_FILE})
set_tests_properties(${TEST_INITIAL_EVENT_NAME}_diff_client_ids_partial_same_ports_both_tcp_and_udp PROPERTIES TIMEOUT 120)
-
+
add_test(NAME ${TEST_INITIAL_EVENT_NAME}_diff_client_ids_diff_ports_same_service_id_udp
COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} UDP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_SAME_SERVICEID_MASTER_CONFIG_FILE} SAME_SERVICE_ID)
set_tests_properties(${TEST_INITIAL_EVENT_NAME}_diff_client_ids_diff_ports_same_service_id_udp PROPERTIES TIMEOUT 120)
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} UDP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_tcp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_tcp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_both_tcp_and_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP_AND_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_both_tcp_and_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_prefer_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} PREFER_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_prefer_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_prefer_tcp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} PREFER_TCP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_prefer_tcp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} UDP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_tcp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_tcp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_both_tcp_and_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP_AND_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_both_tcp_and_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_prefer_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} PREFER_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_prefer_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_prefer_tcp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} PREFER_TCP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_same_ports_prefer_tcp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_partial_same_ports_both_tcp_and_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP_AND_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_PARTIAL_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_partial_same_ports_both_tcp_and_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_same_service_id_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} UDP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_SAME_SERVICEID_MASTER_CONFIG_FILE} SAME_SERVICE_ID MULTIPLE_EVENTS)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_diff_client_ids_diff_ports_same_service_id_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} UDP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_tcp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_tcp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_both_tcp_and_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP_AND_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_both_tcp_and_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_prefer_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} PREFER_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_prefer_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_prefer_tcp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} PREFER_TCP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_prefer_tcp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} UDP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_tcp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_tcp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_both_tcp_and_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP_AND_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_both_tcp_and_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_prefer_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} PREFER_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_prefer_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_prefer_tcp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} PREFER_TCP ${TEST_INITIAL_EVENT_DIFF_IDS_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_same_ports_prefer_tcp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_partial_same_ports_both_tcp_and_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} TCP_AND_UDP ${TEST_INITIAL_EVENT_DIFF_IDS_PARTIAL_SAME_PORTS_MASTER_CONFIG_FILE} MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_partial_same_ports_both_tcp_and_udp PROPERTIES TIMEOUT 120)
+
+ add_test(NAME ${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_same_service_id_udp
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_INITIAL_EVENT_MASTER_STARTER} UDP ${TEST_INITIAL_EVENT_DIFF_IDS_DIFF_PORTS_SAME_SERVICEID_MASTER_CONFIG_FILE} SAME_SERVICE_ID MULTIPLE_EVENTS SUBSCRIBE_ON_AVAILABILITY)
+ set_tests_properties(${TEST_INITIAL_EVENT_NAME}_multiple_events_subscribe_on_availability_diff_client_ids_diff_ports_same_service_id_udp PROPERTIES TIMEOUT 120)
+
# offer tests
add_test(NAME ${TEST_OFFER_NAME}_local
COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_OFFER_LOCAL_STARTER})
diff --git a/test/application_tests/application_test.cpp b/test/application_tests/application_test.cpp
index 71ded2e..304a0cc 100644
--- a/test/application_tests/application_test.cpp
+++ b/test/application_tests/application_test.cpp
@@ -211,6 +211,8 @@ protected:
while(!is_registered_) {
cv_.wait(its_lock);
}
+ app_->request_service(vsomeip_test::TEST_SERVICE_SERVICE_ID,
+ vsomeip_test::TEST_SERVICE_INSTANCE_ID);
app_->offer_service(vsomeip_test::TEST_SERVICE_SERVICE_ID,
vsomeip_test::TEST_SERVICE_INSTANCE_ID);
while(!is_available_) {
diff --git a/test/big_payload_tests/big_payload_test_external_starter.sh b/test/big_payload_tests/big_payload_test_external_starter.sh
index 365eb68..ff5a33f 100755
--- a/test/big_payload_tests/big_payload_test_external_starter.sh
+++ b/test/big_payload_tests/big_payload_test_external_starter.sh
@@ -32,9 +32,15 @@ BIG_PAYLOAD_TEST_PID=$!
if [ ! -z "$USE_LXC_TEST" ]; then
echo "starting big payload test on slave LXC"
if [[ $# -gt 0 ]]; then
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./big_payload_test_service_external_start.sh $1\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./big_payload_test_service_external_start.sh $1\"" &
else
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP 'bash -ci "set -m; cd \$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./big_payload_test_service_external_start.sh"' &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP 'bash -ci "set -m; cd \$SANDBOX_TARGET_DIR/vsomeip/test; ./big_payload_test_service_external_start.sh"' &
+ fi
+elif [ ! -z "$USE_DOCKER" ]; then
+ if [[ $# -gt 0 ]]; then
+ docker run --name bpts --cap-add=NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && ./big_payload_test_service_external_start.sh $1" &
+ else
+ docker run --name bpts --cap-add=NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && ./big_payload_test_service_external_start.sh" &
fi
else
cat <<End-of-message
@@ -60,6 +66,11 @@ do
wait $job || ((FAIL+=1))
done
+if [ ! -z "$USE_DOCKER" ]; then
+ docker wait bpts
+ docker rm bpts
+fi
+
# Check if client and server both exited successfully
if [ $FAIL -eq 0 ]
then
diff --git a/test/client_id_tests/client_id_test_master_starter.sh b/test/client_id_tests/client_id_test_master_starter.sh
index 4f74fd0..c0b2a0b 100755
--- a/test/client_id_tests/client_id_test_master_starter.sh
+++ b/test/client_id_tests/client_id_test_master_starter.sh
@@ -42,7 +42,9 @@ sleep 1
if [ ! -z "$USE_LXC_TEST" ]; then
echo "starting client id test on slave LXC"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./client_id_test_slave_starter.sh $CLIENT_JSON_FILE\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./client_id_test_slave_starter.sh $CLIENT_JSON_FILE\"" &
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name citms --cap-add NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && ./client_id_test_slave_starter.sh $CLIENT_JSON_FILE" &
else
cat <<End-of-message
*******************************************************************************
@@ -69,6 +71,11 @@ do
fi
done
+if [ ! -z "$USE_DOCKER" ]; then
+ docker stop citms
+ docker rm citms
+fi
+
# Check if both exited successfully
if [ $FAIL -eq 0 ]
then
diff --git a/test/client_id_tests/client_id_test_service.cpp b/test/client_id_tests/client_id_test_service.cpp
index a051f99..9306bad 100644
--- a/test/client_id_tests/client_id_test_service.cpp
+++ b/test/client_id_tests/client_id_test_service.cpp
@@ -51,6 +51,7 @@ public:
|| (i.service_id == 0xFFFF && i.instance_id == 0xFFFF)) {
continue;
}
+ app_->request_service(i.service_id, i.instance_id);
app_->register_availability_handler(i.service_id, i.instance_id,
std::bind(&client_id_test_service::on_availability, this,
std::placeholders::_1, std::placeholders::_2,
diff --git a/test/cpu_load_tests/cpu_load_test_client.cpp b/test/cpu_load_tests/cpu_load_test_client.cpp
index 37cc73e..94fd98b 100644
--- a/test/cpu_load_tests/cpu_load_test_client.cpp
+++ b/test/cpu_load_tests/cpu_load_test_client.cpp
@@ -14,6 +14,7 @@
#include <iomanip>
#include <numeric>
#include <cmath> // for isfinite
+#include <atomic>
#include "cpu_load_test_globals.hpp"
#include "../../implementation/logging/include/logger.hpp"
@@ -51,12 +52,13 @@ public:
number_of_acknowledged_messages_(0),
payload_size_(_payload_size),
wait_for_all_msg_acknowledged_(true),
+ initialized_(false),
sender_(std::bind(&cpu_load_test_client::run, this)) {
if (!app_->init()) {
ADD_FAILURE() << "Couldn't initialize application";
return;
}
-
+ initialized_ = true;
app_->register_state_handler(
std::bind(&cpu_load_test_client::on_state, this,
std::placeholders::_1));
@@ -76,6 +78,16 @@ public:
}
~cpu_load_test_client() {
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ wait_for_availability_ = false;
+ condition_.notify_one();
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(all_msg_acknowledged_mutex_);
+ wait_for_all_msg_acknowledged_ = false;
+ all_msg_acknowledged_cv_.notify_one();
+ }
sender_.join();
}
@@ -193,7 +205,9 @@ private:
wait_for_availability_ = true;
stop();
- app_->stop();
+ if (initialized_) {
+ app_->stop();
+ }
}
@@ -287,6 +301,7 @@ private:
std::mutex all_msg_acknowledged_mutex_;
std::condition_variable all_msg_acknowledged_cv_;
std::vector<double> results_;
+ std::atomic<bool> initialized_;
std::thread sender_;
};
diff --git a/test/cpu_load_tests/cpu_load_test_master_starter.sh b/test/cpu_load_tests/cpu_load_test_master_starter.sh
index c297824..50bf007 100755
--- a/test/cpu_load_tests/cpu_load_test_master_starter.sh
+++ b/test/cpu_load_tests/cpu_load_test_master_starter.sh
@@ -19,7 +19,9 @@ sleep 1
if [ ! -z "$USE_LXC_TEST" ]; then
echo "starting cpu load test on slave LXC"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP 'bash -ci "set -m; cd \$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./cpu_load_test_slave_starter.sh"' &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP 'bash -ci "set -m; cd \$SANDBOX_TARGET_DIR/vsomeip/test; ./cpu_load_test_slave_starter.sh"' &
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name cltms --cap-add NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && ./cpu_load_test_slave_starter.sh" &
else
cat <<End-of-message
*******************************************************************************
@@ -64,6 +66,10 @@ do
wait $job || FAIL=$(($FAIL+1))
done
+if [ ! -z "$USE_DOCKER" ]; then
+ docker stop cltms
+ docker rm cltms
+fi
# Check if both exited successfully
if [ $FAIL -eq 0 ]
diff --git a/test/cpu_load_tests/cpu_load_test_service.cpp b/test/cpu_load_tests/cpu_load_test_service.cpp
index 7316c2a..b0b6ea6 100644
--- a/test/cpu_load_tests/cpu_load_test_service.cpp
+++ b/test/cpu_load_tests/cpu_load_test_service.cpp
@@ -36,6 +36,15 @@ public:
{
}
+ ~cpu_load_test_service() {
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ blocked_ = true;
+ condition_.notify_one();
+ }
+ offer_thread_.join();
+ }
+
bool init()
{
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -81,11 +90,6 @@ public:
app_->stop();
}
- void join_offer_thread()
- {
- offer_thread_.join();
- }
-
void on_state(vsomeip::state_type_e _state)
{
VSOMEIP_INFO << "Application " << app_->get_name() << " is "
@@ -192,7 +196,6 @@ TEST(someip_payload_test, send_response_for_every_request)
cpu_load_test_service test_service;
if (test_service.init()) {
test_service.start();
- test_service.join_offer_thread();
}
}
diff --git a/test/initial_event_tests/initial_event_test_client.cpp b/test/initial_event_tests/initial_event_test_client.cpp
index 2f49516..8f843c6 100644
--- a/test/initial_event_tests/initial_event_test_client.cpp
+++ b/test/initial_event_tests/initial_event_test_client.cpp
@@ -14,17 +14,27 @@
#include <gtest/gtest.h>
+#ifndef _WIN32
+#include <signal.h>
+#endif
+
#include <vsomeip/vsomeip.hpp>
#include "../../implementation/logging/include/logger.hpp"
#include "initial_event_test_globals.hpp"
+class initial_event_test_client;
+static initial_event_test_client* the_client;
+extern "C" void signal_handler(int _signum);
class initial_event_test_client {
public:
initial_event_test_client(int _client_number,
vsomeip::subscription_type_e _subscription_type,
- std::array<initial_event_test::service_info, 7> _service_infos) :
+ std::array<initial_event_test::service_info, 7> _service_infos,
+ bool _subscribe_on_available, std::uint32_t _events_to_subscribe,
+ bool _initial_event_strict_checking,
+ bool _dont_exit) :
client_number_(_client_number),
service_infos_(_service_infos),
subscription_type_(_subscription_type),
@@ -32,11 +42,27 @@ public:
wait_until_registered_(true),
wait_until_other_services_available_(true),
wait_for_stop_(true),
+ subscribe_on_available_(_subscribe_on_available),
+ events_to_subscribe_(_events_to_subscribe),
+ initial_event_strict_checking_(_initial_event_strict_checking),
+ dont_exit_(_dont_exit),
stop_thread_(std::bind(&initial_event_test_client::wait_for_stop, this)) {
if (!app_->init()) {
ADD_FAILURE() << "Couldn't initialize application";
return;
}
+
+ // register signal handler
+ the_client = this;
+ struct sigaction sa_new, sa_old;
+ sa_new.sa_handler = signal_handler;
+ sa_new.sa_flags = 0;
+ sigemptyset(&sa_new.sa_mask);
+ ::sigaction(SIGUSR1, &sa_new, &sa_old);
+ ::sigaction(SIGINT, &sa_new, &sa_old);
+ ::sigaction(SIGTERM, &sa_new, &sa_old);
+ ::sigaction(SIGABRT, &sa_new, &sa_old);
+
app_->register_state_handler(
std::bind(&initial_event_test_client::on_state, this,
std::placeholders::_1));
@@ -59,13 +85,26 @@ public:
std::set<vsomeip::eventgroup_t> its_eventgroups;
its_eventgroups.insert(i.eventgroup_id);
- app_->request_event(i.service_id, i.instance_id, i.event_id, its_eventgroups, true);
+ for (std::uint32_t j = 0; j < events_to_subscribe_; j++ ) {
+ app_->request_event(i.service_id, i.instance_id,
+ static_cast<vsomeip::event_t>(i.event_id + j),
+ its_eventgroups, true);
+ }
other_services_available_[std::make_pair(i.service_id, i.instance_id)] = false;
other_services_received_notification_[std::make_pair(i.service_id, i.method_id)] = 0;
-
- app_->subscribe(i.service_id, i.instance_id, i.eventgroup_id,
- vsomeip::DEFAULT_MAJOR, subscription_type_);
+ if (!subscribe_on_available_) {
+ if (events_to_subscribe_ == 1 ) {
+ app_->subscribe(i.service_id, i.instance_id, i.eventgroup_id,
+ vsomeip::DEFAULT_MAJOR, subscription_type_);
+ } else if (events_to_subscribe_ > 1) {
+ for (std::uint32_t j = 0; j < events_to_subscribe_; j++ ) {
+ app_->subscribe(i.service_id, i.instance_id, i.eventgroup_id,
+ vsomeip::DEFAULT_MAJOR, subscription_type_,
+ static_cast<vsomeip::event_t>(i.event_id + j));
+ }
+ }
+ }
}
app_->start();
@@ -109,6 +148,23 @@ public:
return v.second;})) {
VSOMEIP_INFO << "[" << std::setw(4) << std::setfill('0') << std::hex
<< client_number_ << "] all services are available.";
+ if (subscribe_on_available_) {
+ for(const auto& i : service_infos_) {
+ if (i.service_id == 0xFFFF && i.instance_id == 0xFFFF) {
+ continue;
+ }
+ if (events_to_subscribe_ == 1 ) {
+ app_->subscribe(i.service_id, i.instance_id, i.eventgroup_id,
+ vsomeip::DEFAULT_MAJOR, subscription_type_);
+ } else if (events_to_subscribe_ > 1) {
+ for (std::uint32_t j = 0; j < events_to_subscribe_; j++ ) {
+ app_->subscribe(i.service_id, i.instance_id, i.eventgroup_id,
+ vsomeip::DEFAULT_MAJOR, subscription_type_,
+ static_cast<vsomeip::event_t>(i.event_id + j));
+ }
+ }
+ }
+ }
}
}
}
@@ -141,18 +197,14 @@ public:
case vsomeip::subscription_type_e::SU_RELIABLE:
case vsomeip::subscription_type_e::SU_PREFER_UNRELIABLE:
case vsomeip::subscription_type_e::SU_PREFER_RELIABLE:
- if (all_notifications_received()) {
- notify = true;
- }
- break;
case vsomeip::subscription_type_e::SU_RELIABLE_AND_UNRELIABLE:
- if (all_notifications_received_tcp_and_udp()) {
+ if (all_notifications_received()) {
notify = true;
}
break;
}
- if(notify) {
+ if(notify && !dont_exit_) {
std::lock_guard<std::mutex> its_lock(stop_mutex_);
wait_for_stop_ = false;
stop_condition_.notify_one();
@@ -169,17 +221,26 @@ public:
{
if (v.second == initial_event_test::notifications_to_send) {
return true;
- } else if (v.second >= initial_event_test::notifications_to_send){
- VSOMEIP_WARNING
- << " Received multiple initial events from service/instance: "
- << std::setw(4) << std::setfill('0') << std::hex << v.first.first
- << "."
- << std::setw(4) << std::setfill('0') << std::hex << v.first.second
- << " number of received events: " << v.second
- << ". This is caused by StopSubscribe/Subscribe messages.";
- return true;
} else {
- return false;
+ if (initial_event_strict_checking_) {
+ return false;
+ } else {
+ if (v.second >= initial_event_test::notifications_to_send) {
+ VSOMEIP_WARNING
+ << "[" << std::setw(4) << std::setfill('0') << std::hex
+ << client_number_ << "] "
+ << " Received multiple initial events from service/instance: "
+ << std::setw(4) << std::setfill('0') << std::hex << v.first.first
+ << "."
+ << std::setw(4) << std::setfill('0') << std::hex << v.first.second
+ << " number of received events: " << v.second
+ << ". This is caused by StopSubscribe/Subscribe messages and/or"
+ << " service offered via UDP and TCP";
+ return true;
+ } else {
+ return false;
+ }
+ }
}
}
);
@@ -189,14 +250,17 @@ public:
std::uint32_t received_twice(0);
std::uint32_t received_normal(0);
for(const auto &v : other_services_received_notification_) {
- if (v.second > initial_event_test::notifications_to_send * 2) {
+ if (!initial_event_strict_checking_ &&
+ v.second > initial_event_test::notifications_to_send * 2) {
VSOMEIP_WARNING
+ << "[" << std::setw(4) << std::setfill('0') << std::hex
+ << client_number_ << "] "
<< " Received multiple initial events from service/instance: "
<< std::setw(4) << std::setfill('0') << std::hex << v.first.first
<< "."
<< std::setw(4) << std::setfill('0') << std::hex << v.first.second
- << " number of received events: " << v.second
- << ". This is caused by StopSubscribe/Subscribe messages.";
+ << ". This is caused by StopSubscribe/Subscribe messages and/or"
+ << " service offered via UDP and TCP";
received_twice++;
} else if (v.second == initial_event_test::notifications_to_send * 2) {
received_twice++;
@@ -205,8 +269,8 @@ public:
}
}
- if( received_twice == (service_infos_.size() - 1) / 2
- && received_normal == (service_infos_.size() - 1) / 2) {
+ if( received_twice == ((service_infos_.size() - 1) * events_to_subscribe_)/ 2
+ && received_normal == ((service_infos_.size() - 1) * events_to_subscribe_)/ 2) {
// routing manager stub receives the notification
// - twice from external nodes
// - and normal from all internal nodes
@@ -220,6 +284,15 @@ public:
return false;
}
+ void handle_signal(int _signum) {
+ VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
+ << client_number_ << "] Catched signal, going down ("
+ << std::dec <<_signum << ")";
+ std::lock_guard<std::mutex> its_lock(stop_mutex_);
+ wait_for_stop_ = false;
+ stop_condition_.notify_one();
+ }
+
void wait_for_stop() {
{
std::unique_lock<std::mutex> its_lock(stop_mutex_);
@@ -254,6 +327,12 @@ private:
std::condition_variable condition_;
bool wait_for_stop_;
+
+ bool subscribe_on_available_;
+ std::uint32_t events_to_subscribe_;
+ bool initial_event_strict_checking_;
+ bool dont_exit_;
+
std::mutex stop_mutex_;
std::condition_variable stop_condition_;
std::thread stop_thread_;
@@ -262,16 +341,26 @@ private:
static int client_number;
static vsomeip::subscription_type_e subscription_type;
static bool use_same_service_id;
+static bool subscribe_on_available;
+static std::uint32_t subscribe_multiple_events;
+static bool initial_event_strict_checking;
+static bool dont_exit;
+
+extern "C" void signal_handler(int signum) {
+ the_client->handle_signal(signum);
+}
TEST(someip_initial_event_test, wait_for_initial_events_of_all_services)
{
if(use_same_service_id) {
initial_event_test_client its_sample(client_number,
subscription_type,
- initial_event_test::service_infos_same_service_id);
+ initial_event_test::service_infos_same_service_id, subscribe_on_available,
+ subscribe_multiple_events, initial_event_strict_checking, dont_exit);
} else {
initial_event_test_client its_sample(client_number, subscription_type,
- initial_event_test::service_infos);
+ initial_event_test::service_infos, subscribe_on_available,
+ subscribe_multiple_events, initial_event_strict_checking, dont_exit);
}
}
@@ -280,11 +369,16 @@ int main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
if(argc < 3) {
- std::cerr << "Please specify a client number and subscription type, like: " << argv[0] << " 2 UDP SAME_SERVICE_ID" << std::endl;
+ std::cerr << "Please specify a client number and subscription type, like: " << argv[0] << " 2 UDP SUBSCRIBE_BEFORE_START SAME_SERVICE_ID" << std::endl;
std::cerr << "Valid client numbers are from 0 to 0xFFFF" << std::endl;
std::cerr << "Valid subscription types include:" << std::endl;
std::cerr << "[TCP_AND_UDP, PREFER_UDP, PREFER_TCP, UDP, TCP]" << std::endl;
- std::cerr << "If SAME_SERVICE_ID is specified as third parameter the test is run w/ multiple instances of the same service" << std::endl;
+ std::cerr << "After client number and subscription types one/multiple of these flags can be specified:";
+ std::cerr << " - Time of subscription, valid values: [SUBSCRIBE_ON_AVAILABILITY, SUBSCRIBE_BEFORE_START], default SUBSCRIBE_BEFORE_START" << std::endl;
+ std::cerr << " - SAME_SERVICE_ID flag. If set the test is run w/ multiple instances of the same service, default false" << std::endl;
+ std::cerr << " - MULTIPLE_EVENTS flag. If set the test will subscribe to multiple events in the eventgroup, default false" << std::endl;
+ std::cerr << " - STRICT_CHECKING flag. If set the test will only successfully finish if exactly the number of initial events were received (and not more). Default false" << std::endl;
+ std::cerr << " - DONT_EXIT flag. If set the test will not exit if all notifications have been received. Default false" << std::endl;
return 1;
}
@@ -307,11 +401,29 @@ int main(int argc, char** argv)
return 1;
}
- if (argc >= 4 && std::string("SAME_SERVICE_ID") == std::string(argv[3])) {
- use_same_service_id = true;
- } else {
- use_same_service_id = false;
+ subscribe_on_available = false;
+ initial_event_strict_checking = false;
+ use_same_service_id = false;
+ subscribe_multiple_events = 1;
+ dont_exit = false;
+ if (argc > 3) {
+ for (int i = 3; i < argc; i++) {
+ if (std::string("SUBSCRIBE_ON_AVAILABILITY") == std::string(argv[i])) {
+ subscribe_on_available = true;
+ } else if (std::string("SUBSCRIBE_BEFORE_START") == std::string(argv[i])) {
+ subscribe_on_available = false;
+ } else if (std::string("SAME_SERVICE_ID") == std::string(argv[i])) {
+ use_same_service_id = true;
+ } else if (std::string("MULTIPLE_EVENTS") == std::string(argv[i])) {
+ subscribe_multiple_events = 5;
+ } else if (std::string("STRICT_CHECKING") == std::string(argv[i])) {
+ initial_event_strict_checking = true;
+ } else if (std::string("DONT_EXIT") == std::string(argv[i])) {
+ dont_exit = true;
+ }
+ }
}
+
return RUN_ALL_TESTS();
}
#endif
diff --git a/test/initial_event_tests/initial_event_test_master_starter.sh b/test/initial_event_tests/initial_event_test_master_starter.sh
index 2ef37f7..9eee669 100755
--- a/test/initial_event_tests/initial_event_test_master_starter.sh
+++ b/test/initial_event_tests/initial_event_test_master_starter.sh
@@ -24,7 +24,9 @@ fi
PASSED_SUBSCRIPTION_TYPE=$1
PASSED_JSON_FILE=$2
-PASSED_SAME_SERVICE_ID_FLAG=$3
+# Remove processed options from $@
+shift 2
+REMAINING_OPTIONS="$@"
# Make sure only valid subscription types are passed to the script
SUBSCRIPTION_TYPES="TCP_AND_UDP PREFER_UDP PREFER_TCP UDP TCP"
@@ -48,14 +50,16 @@ fi
print_starter_message () {
if [ ! -z "$USE_LXC_TEST" ]; then
- echo "starting initial event test on slave LXC with params $PASSED_SUBSCRIPTION_TYPE $CLIENT_JSON_FILE $PASSED_SAME_SERVICE_ID_FLAG"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./initial_event_test_slave_starter.sh $PASSED_SUBSCRIPTION_TYPE $CLIENT_JSON_FILE $PASSED_SAME_SERVICE_ID_FLAG\"" &
+ echo "starting initial event test on slave LXC with params $PASSED_SUBSCRIPTION_TYPE $CLIENT_JSON_FILE $REMAINING_OPTIONS"
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./initial_event_test_slave_starter.sh $PASSED_SUBSCRIPTION_TYPE $CLIENT_JSON_FILE $REMAINING_OPTIONS\"" &
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name ietms --cap-add NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && ./initial_event_test_slave_starter.sh $PASSED_SUBSCRIPTION_TYPE $CLIENT_JSON_FILE $REMAINING_OPTIONS" &
else
cat <<End-of-message
*******************************************************************************
*******************************************************************************
** Please now run:
-** initial_event_test_slave_starter.sh $PASSED_SUBSCRIPTION_TYPE $CLIENT_JSON_FILE $PASSED_SAME_SERVICE_ID_FLAG
+** initial_event_test_slave_starter.sh $PASSED_SUBSCRIPTION_TYPE $CLIENT_JSON_FILE $REMAINING_OPTIONS
** from an external host to successfully complete this test.
**
** You probably will need to adapt the 'unicast' settings in
@@ -78,15 +82,15 @@ FAIL=0
export VSOMEIP_CONFIGURATION=$PASSED_JSON_FILE
export VSOMEIP_APPLICATION_NAME=initial_event_test_service_one
-./initial_event_test_service 1 $PASSED_SAME_SERVICE_ID_FLAG &
+./initial_event_test_service 1 $REMAINING_OPTIONS &
PID_SERVICE_ONE=$!
export VSOMEIP_APPLICATION_NAME=initial_event_test_service_two
-./initial_event_test_service 2 $PASSED_SAME_SERVICE_ID_FLAG &
+./initial_event_test_service 2 $REMAINING_OPTIONS &
PID_SERVICE_TWO=$!
export VSOMEIP_APPLICATION_NAME=initial_event_test_service_three
-./initial_event_test_service 3 $PASSED_SAME_SERVICE_ID_FLAG &
+./initial_event_test_service 3 $REMAINING_OPTIONS &
PID_SERVICE_THREE=$!
unset VSOMEIP_APPLICATION_NAME
@@ -94,55 +98,30 @@ unset VSOMEIP_APPLICATION_NAME
# Array for client pids
CLIENT_PIDS=()
-# Start some clients
-if [[ $PASSED_SUBSCRIPTION_TYPE == "TCP_AND_UDP" ]]
-then
- ./initial_event_test_client 9000 $PASSED_SUBSCRIPTION_TYPE $PASSED_SAME_SERVICE_ID_FLAG &
- FIRST_PID=$!
- sleep 1
- print_starter_message
- wait $FIRST_PID || FAIL=$(($FAIL+1))
-else
- for client_number in $(seq 9000 9009)
- do
- ./initial_event_test_client $client_number $PASSED_SUBSCRIPTION_TYPE $PASSED_SAME_SERVICE_ID_FLAG &
- CLIENT_PIDS+=($!)
- done
-fi
-
+# Start first client which subscribes remotely
+./initial_event_test_client 9000 $PASSED_SUBSCRIPTION_TYPE DONT_EXIT $REMAINING_OPTIONS &
+FIRST_PID=$!
# Start availability checker in order to wait until the services on the remote
# were started as well
-./initial_event_test_availability_checker 1234 $PASSED_SAME_SERVICE_ID_FLAG &
+./initial_event_test_availability_checker 1234 $REMAINING_OPTIONS &
PID_AVAILABILITY_CHECKER=$!
sleep 1
-if [ $PASSED_SUBSCRIPTION_TYPE != "TCP_AND_UDP" ]
-then
- print_starter_message
-fi
+print_starter_message
-# wait unti the services on the remote node were started as well
+
+# wait until the services on the remote node were started as well
wait $PID_AVAILABILITY_CHECKER
-# sleep to make sure the following started clients will have to get
-# the cached event from the routing manager daemon
sleep 2
-if [[ $PASSED_SUBSCRIPTION_TYPE == "TCP_AND_UDP" ]]
-then
- ./initial_event_test_client 9010 $PASSED_SUBSCRIPTION_TYPE $PASSED_SAME_SERVICE_ID_FLAG &
- FIRST_PID=$!
- wait $FIRST_PID || FAIL=$(($FAIL+1))
-else
- for client_number in $(seq 9010 9020)
- do
- ./initial_event_test_client $client_number $PASSED_SUBSCRIPTION_TYPE $PASSED_SAME_SERVICE_ID_FLAG &
- CLIENT_PIDS+=($!)
- done
-fi
-
+for client_number in $(seq 9001 9011)
+do
+ ./initial_event_test_client $client_number $PASSED_SUBSCRIPTION_TYPE STRICT_CHECKING $REMAINING_OPTIONS &
+ CLIENT_PIDS+=($!)
+done
# Wait until all clients are finished
for job in ${CLIENT_PIDS[*]}
@@ -156,7 +135,11 @@ done
PID_STOP_SERVICE=$!
wait $PID_STOP_SERVICE
-# kill the services
+# shutdown the first client
+kill $FIRST_PID
+wait $FIRST_PID || FAIL=$(($FAIL+1))
+
+# shutdown the services
kill $PID_SERVICE_THREE
kill $PID_SERVICE_TWO
kill $PID_SERVICE_ONE
@@ -164,6 +147,11 @@ kill $PID_SERVICE_ONE
sleep 1
echo ""
+if [ ! -z "$USE_DOCKER" ]; then
+ docker stop ietms
+ docker rm ietms
+fi
+
# Check if both exited successfully
if [ $FAIL -eq 0 ]
then
diff --git a/test/initial_event_tests/initial_event_test_service.cpp b/test/initial_event_tests/initial_event_test_service.cpp
index 5873ffe..037149e 100644
--- a/test/initial_event_tests/initial_event_test_service.cpp
+++ b/test/initial_event_tests/initial_event_test_service.cpp
@@ -22,10 +22,12 @@
class initial_event_test_service {
public:
- initial_event_test_service(struct initial_event_test::service_info _service_info) :
+ initial_event_test_service(struct initial_event_test::service_info _service_info,
+ std::uint32_t _events_to_offer) :
service_info_(_service_info),
app_(vsomeip::runtime::get()->create_application()),
wait_until_registered_(true),
+ events_to_offer_(_events_to_offer),
offer_thread_(std::bind(&initial_event_test_service::run, this)) {
if (!app_->init()) {
ADD_FAILURE() << "Couldn't initialize application";
@@ -38,8 +40,10 @@ public:
// offer field
std::set<vsomeip::eventgroup_t> its_eventgroups;
its_eventgroups.insert(service_info_.eventgroup_id);
- app_->offer_event(service_info_.service_id, service_info_.instance_id,
- service_info_.event_id, its_eventgroups, true);
+ for (std::uint16_t i = 0; i < events_to_offer_; i++) {
+ app_->offer_event(service_info_.service_id, service_info_.instance_id,
+ static_cast<vsomeip::event_t>(service_info_.event_id + i), its_eventgroups, true);
+ }
// set value to field
std::shared_ptr<vsomeip::payload> its_payload =
@@ -47,8 +51,10 @@ public:
vsomeip::byte_t its_data[2] = {static_cast<vsomeip::byte_t>((service_info_.service_id & 0xFF00) >> 8),
static_cast<vsomeip::byte_t>((service_info_.service_id & 0xFF))};
its_payload->set_data(its_data, 2);
- app_->notify(service_info_.service_id, service_info_.instance_id,
- service_info_.event_id, its_payload);
+ for (std::uint16_t i = 0; i < events_to_offer_; i++) {
+ app_->notify(service_info_.service_id, service_info_.instance_id,
+ static_cast<vsomeip::event_t>(service_info_.event_id + i), its_payload);
+ }
app_->start();
}
@@ -91,6 +97,7 @@ private:
std::shared_ptr<vsomeip::application> app_;
bool wait_until_registered_;
+ std::uint32_t events_to_offer_;
std::mutex mutex_;
std::condition_variable condition_;
std::thread offer_thread_;
@@ -98,15 +105,16 @@ private:
static int service_number;
static bool use_same_service_id;
+static std::uint32_t offer_multiple_events;
TEST(someip_initial_event_test, set_field_once)
{
if(use_same_service_id) {
initial_event_test_service its_sample(
- initial_event_test::service_infos_same_service_id[service_number]);
+ initial_event_test::service_infos_same_service_id[service_number], offer_multiple_events);
} else {
initial_event_test_service its_sample(
- initial_event_test::service_infos[service_number]);
+ initial_event_test::service_infos[service_number], offer_multiple_events);
}
}
@@ -117,16 +125,25 @@ int main(int argc, char** argv)
if(argc < 2) {
std::cerr << "Please specify a service number and subscription type, like: " << argv[0] << " 2 SAME_SERVICE_ID" << std::endl;
std::cerr << "Valid service numbers are in the range of [1,6]" << std::endl;
- std::cerr << "If SAME_SERVICE_ID is specified as third parameter the test is run w/ multiple instances of the same service" << std::endl;
+ std::cerr << "After the service number one/multiple of these flags can be specified:";
+ std::cerr << " - SAME_SERVICE_ID flag. If set the test is run w/ multiple instances of the same service, default false" << std::endl;
+ std::cerr << " - MULTIPLE_EVENTS flag. If set the test will offer to multiple events in the eventgroup, default false" << std::endl;
return 1;
}
service_number = std::stoi(std::string(argv[1]), nullptr);
- if (argc >= 3 && std::string("SAME_SERVICE_ID") == std::string(argv[2])) {
- use_same_service_id = true;
- } else {
- use_same_service_id = false;
+ offer_multiple_events = 1;
+ use_same_service_id = false;
+
+ if (argc > 2) {
+ for (int i = 2; i < argc; i++) {
+ if (std::string("SAME_SERVICE_ID") == std::string(argv[i])) {
+ use_same_service_id = true;
+ } else if (std::string("MULTIPLE_EVENTS") == std::string(argv[i])) {
+ offer_multiple_events = 5;
+ }
+ }
}
return RUN_ALL_TESTS();
}
diff --git a/test/initial_event_tests/initial_event_test_slave_starter.sh b/test/initial_event_tests/initial_event_test_slave_starter.sh
index 74371d0..69e96f7 100755
--- a/test/initial_event_tests/initial_event_test_slave_starter.sh
+++ b/test/initial_event_tests/initial_event_test_slave_starter.sh
@@ -24,7 +24,9 @@ fi
PASSED_SUBSCRIPTION_TYPE=$1
PASSED_JSON_FILE=$2
-PASSED_SAME_SERVICE_ID_FLAG=$3
+# Remove processed options from $@
+shift 2
+REMAINING_OPTIONS=$@
# Make sure only valid subscription types are passed to the script
SUBSCRIPTION_TYPES="TCP_AND_UDP PREFER_UDP PREFER_TCP UDP TCP"
@@ -52,15 +54,15 @@ export VSOMEIP_CONFIGURATION=$PASSED_JSON_FILE
# Start the services
export VSOMEIP_APPLICATION_NAME=initial_event_test_service_four
-./initial_event_test_service 4 $PASSED_SAME_SERVICE_ID_FLAG &
+./initial_event_test_service 4 $REMAINING_OPTIONS &
PID_SERVICE_FOUR=$!
export VSOMEIP_APPLICATION_NAME=initial_event_test_service_five
-./initial_event_test_service 5 $PASSED_SAME_SERVICE_ID_FLAG &
+./initial_event_test_service 5 $REMAINING_OPTIONS &
PID_SERVICE_FIVE=$!
export VSOMEIP_APPLICATION_NAME=initial_event_test_service_six
-./initial_event_test_service 6 $PASSED_SAME_SERVICE_ID_FLAG &
+./initial_event_test_service 6 $REMAINING_OPTIONS &
PID_SERVICE_SIX=$!
unset VSOMEIP_APPLICATION_NAME
@@ -68,44 +70,23 @@ unset VSOMEIP_APPLICATION_NAME
# Array for client pids
CLIENT_PIDS=()
-# Start some clients
-if [[ $PASSED_SUBSCRIPTION_TYPE == "TCP_AND_UDP" ]]
-then
- ./initial_event_test_client 9000 $PASSED_SUBSCRIPTION_TYPE $PASSED_SAME_SERVICE_ID_FLAG &
- FIRST_PID=$!
- wait $FIRST_PID || FAIL=$(($FAIL+1))
-else
- for client_number in $(seq 9000 9009)
- do
- ./initial_event_test_client $client_number $PASSED_SUBSCRIPTION_TYPE $PASSED_SAME_SERVICE_ID_FLAG &
- CLIENT_PIDS+=($!)
- done
-fi
+# Start first client which subscribes remotely
+./initial_event_test_client 9000 $PASSED_SUBSCRIPTION_TYPE DONT_EXIT $REMAINING_OPTIONS &
+FIRST_PID=$!
# Start availability checker in order to wait until the services on the remote
# were started as well
-./initial_event_test_availability_checker 1234 $PASSED_SAME_SERVICE_ID_FLAG &
+./initial_event_test_availability_checker 1234 $REMAINING_OPTIONS &
PID_AVAILABILITY_CHECKER=$!
-# wait unti the services on the remote node were started as well
+# wait until the services on the remote node were started as well
wait $PID_AVAILABILITY_CHECKER
-
-# sleep to make sure the following started clients will have to get
-# the cached event from the routing manager daemon
-sleep 2
-
-if [[ $PASSED_SUBSCRIPTION_TYPE == "TCP_AND_UDP" ]]
-then
- ./initial_event_test_client 9000 $PASSED_SUBSCRIPTION_TYPE $PASSED_SAME_SERVICE_ID_FLAG &
- FIRST_PID=$!
- wait $FIRST_PID || FAIL=$(($FAIL+1))
-else
- for client_number in $(seq 9010 9020)
- do
- ./initial_event_test_client $client_number $PASSED_SUBSCRIPTION_TYPE $PASSED_SAME_SERVICE_ID_FLAG &
- CLIENT_PIDS+=($!)
- done
-fi
+sleep 2;
+for client_number in $(seq 9001 9011)
+do
+ ./initial_event_test_client $client_number $PASSED_SUBSCRIPTION_TYPE STRICT_CHECKING $REMAINING_OPTIONS &
+ CLIENT_PIDS+=($!)
+done
# Wait until all clients are finished
for job in ${CLIENT_PIDS[*]}
@@ -119,7 +100,11 @@ done
PID_STOP_SERVICE=$!
wait $PID_STOP_SERVICE
-# kill the services
+# shutdown the first client
+kill $FIRST_PID
+wait $FIRST_PID || FAIL=$(($FAIL+1))
+
+# shutdown the services
kill $PID_SERVICE_SIX
kill $PID_SERVICE_FIVE
kill $PID_SERVICE_FOUR
diff --git a/test/magic_cookies_tests/magic_cookies_test_starter.sh b/test/magic_cookies_tests/magic_cookies_test_starter.sh
index ef158a2..631eef7 100755
--- a/test/magic_cookies_tests/magic_cookies_test_starter.sh
+++ b/test/magic_cookies_tests/magic_cookies_test_starter.sh
@@ -17,7 +17,9 @@ FAIL=0
if [ ! -z "$USE_LXC_TEST" ]; then
echo "starting magic cookies test on slave LXC"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./magic_cookies_test_client_start.sh\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./magic_cookies_test_client_start.sh\"" &
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run $DOCKER_IMAGE sh -c "cd $DOCKER_TESTS && ./magic_cookies_test_client_start.sh" &
else
cat <<End-of-message
*******************************************************************************
diff --git a/test/offer_tests/conf/offer_test_external_master_starter.sh.in b/test/offer_tests/conf/offer_test_external_master_starter.sh.in
index a8af719..0f8f104 100755
--- a/test/offer_tests/conf/offer_test_external_master_starter.sh.in
+++ b/test/offer_tests/conf/offer_test_external_master_starter.sh.in
@@ -40,8 +40,10 @@ if [ ! -z "$USE_LXC_TEST" ]; then
echo "Waiting for 5s"
sleep 5
echo "starting offer test on slave LXC offer_test_external_slave_starter.sh"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./offer_test_external_slave_starter.sh\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./offer_test_external_slave_starter.sh\"" &
echo "remote ssh pid: $!"
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name otems --cap-add NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && sleep 10; ./offer_test_external_slave_starter.sh" &
else
cat <<End-of-message
*******************************************************************************
@@ -70,6 +72,11 @@ done
kill $PID_VSOMEIPD
sleep 1
+if [ ! -z "$USE_DOCKER" ]; then
+ docker stop otems
+ docker rm otems
+fi
+
# wait for slave to finish
for job in $(jobs -p)
do
@@ -104,8 +111,10 @@ if [ ! -z "$USE_LXC_TEST" ]; then
echo "Waiting for 5s"
sleep 5
echo "starting offer test on slave LXC offer_test_external_sd_msg_sender"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./offer_test_external_sd_msg_sender $LXC_TEST_MASTER_IP\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./offer_test_external_sd_msg_sender $LXC_TEST_MASTER_IP\"" &
echo "remote ssh job id: $!"
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name otesms --cap-add NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && sleep 10; ./offer_test_external_sd_msg_sender $DOCKER_IP" &
else
cat <<End-of-message
*******************************************************************************
@@ -132,6 +141,11 @@ done
kill $PID_VSOMEIPD
sleep 1
+if [ ! -z "$USE_DOCKER" ]; then
+ docker stop otesms
+ docker rm otesms
+fi
+
# wait for slave to finish
for job in $(jobs -p)
do
diff --git a/test/payload_tests/external_local_payload_test_client_external_starter.sh b/test/payload_tests/external_local_payload_test_client_external_starter.sh
index 89541b9..da6ca08 100755
--- a/test/payload_tests/external_local_payload_test_client_external_starter.sh
+++ b/test/payload_tests/external_local_payload_test_client_external_starter.sh
@@ -58,8 +58,10 @@ SERIVCE_PID=$!
# to finish the test successfully
if [ ! -z "$USE_LXC_TEST" ]; then
echo "starting external local payload on slave LXC"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./external_local_payload_test_client_external_start.sh\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./external_local_payload_test_client_external_start.sh\"" &
echo "remote ssh job id: $!"
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name elptces $DOCKER_IMAGE sh -c "cd $DOCKER_TESTS && ./external_local_payload_test_client_external_start.sh" &
else
cat <<End-of-message
*******************************************************************************
@@ -110,6 +112,10 @@ check_tcp_udp_sockets_are_open $SERIVCE_PID 2
# with a non-zero exit code
wait $SERIVCE_PID || ((FAIL+=1))
+if [ ! -z "$USE_DOCKER" ]; then
+ docker wait elptces
+ docker rm elptces
+fi
# Check if server exited sucessfully
if [ $FAIL -eq 0 ]
diff --git a/test/payload_tests/external_local_payload_test_client_local_and_external_starter.sh b/test/payload_tests/external_local_payload_test_client_local_and_external_starter.sh
index 283f400..041abc4 100755
--- a/test/payload_tests/external_local_payload_test_client_local_and_external_starter.sh
+++ b/test/payload_tests/external_local_payload_test_client_local_and_external_starter.sh
@@ -75,8 +75,10 @@ wait $CLIENT_PID || ((FAIL+=1))
# to finish the test successfully
if [ ! -z "$USE_LXC_TEST" ]; then
echo "starting external local payload on slave LXC"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./external_local_payload_test_client_external_start.sh\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./external_local_payload_test_client_external_start.sh\"" &
echo "remote ssh job id: $!"
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name elptclaes $DOCKER_IMAGE sh -c "cd $DOCKER_TESTS && ./external_local_payload_test_client_external_start.sh" &
else
cat <<End-of-message
*******************************************************************************
@@ -127,6 +129,11 @@ check_tcp_udp_sockets_are_open $SERIVCE_PID 2
# with a non-zero exit code
wait $SERIVCE_PID || ((FAIL+=1))
+if [ ! -z "$USE_DOCKER" ]; then
+ docker wait elptclaes
+ docker rm elptclaes
+fi
+
# Check if client and server both exited sucessfully and the service didnt't
# have any open TCP/UDP sockets
if [ $FAIL -eq 0 ]
diff --git a/test/payload_tests/external_local_payload_test_client_local_starter.sh b/test/payload_tests/external_local_payload_test_client_local_starter.sh
index e2196d8..c73dd81 100755
--- a/test/payload_tests/external_local_payload_test_client_local_starter.sh
+++ b/test/payload_tests/external_local_payload_test_client_local_starter.sh
@@ -70,6 +70,10 @@ check_tcp_udp_sockets_are_open $SERIVCE_PID 2
# therefore he shouldn't have any open TCP/UDP sockets
check_tcp_udp_sockets_are_closed $CLIENT_PID
+if [ ! -z "$USE_DOCKER" ]; then
+ FAIL=0
+fi
+
# Wait until client and service are finished
for job in $(jobs -p)
do
diff --git a/test/routing_tests/external_local_routing_test_starter.sh b/test/routing_tests/external_local_routing_test_starter.sh
index 836d03e..af94ab6 100755
--- a/test/routing_tests/external_local_routing_test_starter.sh
+++ b/test/routing_tests/external_local_routing_test_starter.sh
@@ -78,8 +78,10 @@ if [ $CLIENT_STILL_THERE -ne 0 ]
then
if [ ! -z "$USE_LXC_TEST" ]; then
echo "starting external_local_routing_test_starter.sh on slave LXC"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./external_local_routing_test_client_external_start.sh\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./external_local_routing_test_client_external_start.sh\"" &
echo "remote ssh job id: $!"
+ elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name elrts --cap-add NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && ./external_local_routing_test_client_external_start.sh" &
else
cat <<End-of-message
*******************************************************************************
@@ -97,6 +99,10 @@ End-of-message
fi
fi
+if [ ! -z "$USE_DOCKER" ]; then
+ FAIL=0
+fi
+
# Wait until client and service are finished
for job in $(jobs -p)
do
@@ -105,6 +111,11 @@ do
wait $job || ((FAIL+=1))
done
+if [ ! -z "$USE_DOCKER" ]; then
+ docker stop elrts
+ docker rm elrts
+fi
+
# Check if client and server both exited sucessfully and the service didnt't
# have any open
if [ $FAIL -eq 0 ]
diff --git a/test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_master.json.in b/test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_master.json.in
index 7acf45b..f3aa9de 100644
--- a/test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_master.json.in
+++ b/test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_master.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_MASTER@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_slave.json.in b/test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_slave.json.in
index 7db7eed..c588106 100644
--- a/test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_slave.json.in
+++ b/test/subscribe_notify_one_tests/conf/subscribe_notify_one_test_diff_client_ids_diff_ports_slave.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_SLAVE@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_one_tests/subscribe_notify_one_test_master_starter.sh b/test/subscribe_notify_one_tests/subscribe_notify_one_test_master_starter.sh
index 9fa3ec7..c551b55 100755
--- a/test/subscribe_notify_one_tests/subscribe_notify_one_test_master_starter.sh
+++ b/test/subscribe_notify_one_tests/subscribe_notify_one_test_master_starter.sh
@@ -64,8 +64,10 @@ sleep 1
if [ ! -z "$USE_LXC_TEST" ]; then
echo "starting subscribe_notify_one_test_slave_starter.sh on slave LXC with parameters $1 $CLIENT_JSON_FILE"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./subscribe_notify_one_test_slave_starter.sh $1 $CLIENT_JSON_FILE\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./subscribe_notify_one_test_slave_starter.sh $1 $CLIENT_JSON_FILE\"" &
echo "remote ssh job id: $!"
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name snotms --cap-add NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && ./subscribe_notify_one_test_slave_starter.sh $1 $CLIENT_JSON_FILE" &
else
cat <<End-of-message
*******************************************************************************
@@ -90,6 +92,11 @@ do
wait $job || ((FAIL+=1))
done
+if [ ! -z "$USE_DOCKER" ]; then
+ docker stop snotms
+ docker rm snotms
+fi
+
# Check if both exited successfully
if [ $FAIL -eq 0 ]
then
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_master.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_master.json.in
index 332f55f..370cf98 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_master.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_master.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_MASTER@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_slave.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_slave.json.in
index ca25c33..46de8ff 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_slave.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_autoconfig_slave.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_SLAVE@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_master.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_master.json.in
index 87d1934..87299aa 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_master.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_master.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_MASTER@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_master.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_master.json.in
index a3cdab7..fa4c5a3 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_master.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_master.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_MASTER@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_slave.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_slave.json.in
index 1ee1123..66f1a83 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_slave.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_same_service_id_slave.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_SLAVE@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_slave.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_slave.json.in
index 43f0453..5b110aa 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_slave.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_diff_ports_slave.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_SLAVE@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_master.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_master.json.in
index fb7073f..81bb18e 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_master.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_master.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_MASTER@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_slave.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_slave.json.in
index 27f9f34..98ce9a6 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_slave.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_partial_same_ports_slave.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_SLAVE@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_master.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_master.json.in
index a60ee3d..253b55e 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_master.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_master.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_MASTER@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_slave.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_slave.json.in
index 9854b75..556b10f 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_slave.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_diff_client_ids_same_ports_slave.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_SLAVE@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_master.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_master.json.in
index 87d1934..87299aa 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_master.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_master.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_MASTER@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_slave.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_slave.json.in
index 3786f74..bc13f01 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_slave.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_diff_ports_slave.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_SLAVE@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_master.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_master.json.in
index a60ee3d..253b55e 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_master.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_master.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_MASTER@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_slave.json.in b/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_slave.json.in
index cbe671e..c203e80 100644
--- a/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_slave.json.in
+++ b/test/subscribe_notify_tests/conf/subscribe_notify_test_same_client_ids_same_ports_slave.json.in
@@ -2,7 +2,7 @@
"unicast":"@TEST_IP_SLAVE@",
"logging":
{
- "level":"info",
+ "level":"warning",
"console":"true",
"file":
{
diff --git a/test/subscribe_notify_tests/subscribe_notify_test_master_starter.sh b/test/subscribe_notify_tests/subscribe_notify_test_master_starter.sh
index 6df3053..1e41c8e 100755
--- a/test/subscribe_notify_tests/subscribe_notify_test_master_starter.sh
+++ b/test/subscribe_notify_tests/subscribe_notify_test_master_starter.sh
@@ -65,8 +65,10 @@ sleep 1
if [ ! -z "$USE_LXC_TEST" ]; then
echo "starting subscribe_notify_test_slave_starter.sh on slave LXC with parameters $1 $CLIENT_JSON_FILE $3"
- ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_ROOT_DIR/ctarget/vsomeip/test; ./subscribe_notify_test_slave_starter.sh $1 $CLIENT_JSON_FILE $3\"" &
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip/test; ./subscribe_notify_test_slave_starter.sh $1 $CLIENT_JSON_FILE $3\"" &
echo "remote ssh job id: $!"
+elif [ ! -z "$USE_DOCKER" ]; then
+ docker run --name sntms --cap-add NET_ADMIN $DOCKER_IMAGE sh -c "route add -net 224.0.0.0/4 dev eth0 && cd $DOCKER_TESTS && ./subscribe_notify_test_slave_starter.sh $1 $CLIENT_JSON_FILE $3" &
else
cat <<End-of-message
*******************************************************************************
@@ -83,6 +85,10 @@ else
End-of-message
fi
+if [ ! -z "$USE_DOCKER" ]; then
+ FAIL=0
+fi
+
# Wait until client and service are finished
for job in $(jobs -p)
do
@@ -91,6 +97,11 @@ do
wait $job || ((FAIL+=1))
done
+if [ ! -z "$USE_DOCKER" ]; then
+ docker stop sntms
+ docker rm sntms
+fi
+
# Check if both exited successfully
if [ $FAIL -eq 0 ]
then