summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2018-05-22 02:56:47 -0700
committerJuergen Gehring <juergen.gehring@bmw.de>2018-05-22 02:56:47 -0700
commit9fb9beecadf52083599302fa8ddee7efbec64a39 (patch)
tree2d673725855805ba72ec2cd73aa788be882e8b65
parent23b6a4bcc6632b7c217b854bb879eb9cd3bc2f29 (diff)
downloadvSomeIP-9fb9beecadf52083599302fa8ddee7efbec64a39.tar.gz
vsomeip 2.10.212.10.21
-rw-r--r--CHANGES14
-rw-r--r--CMakeLists.txt8
-rw-r--r--implementation/configuration/include/internal.hpp.in12
-rw-r--r--implementation/configuration/src/configuration_impl.cpp4
-rw-r--r--implementation/endpoints/include/client_endpoint_impl.hpp7
-rw-r--r--implementation/endpoints/include/endpoint.hpp4
-rw-r--r--implementation/endpoints/include/endpoint_definition.hpp9
-rw-r--r--implementation/endpoints/include/local_client_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/include/server_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/include/virtual_server_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp204
-rw-r--r--implementation/endpoints/src/endpoint_definition.cpp34
-rw-r--r--implementation/endpoints/src/local_client_endpoint_impl.cpp33
-rw-r--r--implementation/endpoints/src/server_endpoint_impl.cpp11
-rw-r--r--implementation/endpoints/src/virtual_server_endpoint_impl.cpp10
-rw-r--r--implementation/routing/include/event.hpp2
-rw-r--r--implementation/routing/include/routing_manager_host.hpp1
-rw-r--r--implementation/routing/src/event.cpp18
-rw-r--r--implementation/routing/src/eventgroupinfo.cpp10
-rw-r--r--implementation/routing/src/routing_manager_base.cpp59
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp41
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp24
-rw-r--r--implementation/routing/src/routing_manager_stub.cpp50
-rw-r--r--implementation/runtime/src/application_impl.cpp74
-rwxr-xr-ximplementation/service_discovery/src/message_impl.cpp34
-rw-r--r--test/CMakeLists.txt4
-rw-r--r--test/big_payload_tests/big_payload_test_client.cpp2
-rw-r--r--test/big_payload_tests/big_payload_test_local_queue_limited.json2
-rw-r--r--test/big_payload_tests/big_payload_test_service.cpp2
-rw-r--r--test/configuration_tests/configuration-test.cpp4
-rw-r--r--test/malicious_data_tests/malicious_data_test_msg_sender.cpp5
-rwxr-xr-xtest/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in2
-rw-r--r--test/pending_subscription_tests/pending_subscription_test_globals.hpp3
-rw-r--r--test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp239
-rw-r--r--test/pending_subscription_tests/pending_subscription_test_service.cpp18
35 files changed, 699 insertions, 251 deletions
diff --git a/CHANGES b/CHANGES
index e88362a..a73bd27 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,20 @@
Changes
=======
+v2.10.21
+- Improve memory usage of routing manager.
+- Improve handling of incoming SD messages with uncommon entry
+ combinations.
+- Name all threads under Linux and log thread IDs during startup.
+- Optimize memory allocation for internal message handling.
+- Ensure an (extra) dispatch thread is running in case the main
+ dispatch thread is (still) blocked.
+- Fix race condition which could lead to missing initial events for
+ local subscriptions if the application hosting the service called
+ application::offer_event and additionally application::request_event
+ for the same event.
+- Fixed crash
+
v2.10.20
- Add security config (i.e. vsomeip_security.json) to mandatory config files
- Enable local_routing_test_starter.sh to use externally defined configuration
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 04ae559..2c4b3f5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -8,7 +8,9 @@ project (vsomeip)
set (VSOMEIP_MAJOR_VERSION 2)
set (VSOMEIP_MINOR_VERSION 10)
-set (VSOMEIP_PATCH_VERSION 20)
+set (VSOMEIP_PATCH_VERSION 21)
+set (VSOMEIP_HOTFIX_VERSION 0)
+
set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION})
set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in
set (CMAKE_VERBOSE_MAKEFILE off)
@@ -148,7 +150,11 @@ file(GLOB_RECURSE vsomeip_e2e_SRC
list(SORT vsomeip_SRC)
list(SORT vsomeip_e2e_SRC)
+if (${VSOMEIP_HOTFIX_VERSION} EQUAL 0)
add_definitions(-DVSOMEIP_VERSION="${VSOMEIP_VERSION}")
+else()
+add_definitions(-DVSOMEIP_VERSION="${VSOMEIP_VERSION}.${VSOMEIP_HOTFIX_VERSION}")
+endif()
if (MSVC)
message("using MSVC Compiler")
diff --git a/implementation/configuration/include/internal.hpp.in b/implementation/configuration/include/internal.hpp.in
index 6135205..1cb9895 100644
--- a/implementation/configuration/include/internal.hpp.in
+++ b/implementation/configuration/include/internal.hpp.in
@@ -72,7 +72,7 @@
#define VSOMEIP_COMMAND_TYPE_POS 0
#define VSOMEIP_COMMAND_CLIENT_POS 1
#define VSOMEIP_COMMAND_SIZE_POS_MIN 3
-#define VSOMEIP_COMMAND_SIZE_POS_MAX 5
+#define VSOMEIP_COMMAND_SIZE_POS_MAX 6
#define VSOMEIP_COMMAND_PAYLOAD_POS 7
#define VSOMEIP_REGISTER_APPLICATION 0x00
@@ -105,6 +105,16 @@
#define VSOMEIP_OFFERED_SERVICES_RESPONSE 0x20
#define VSOMEIP_UNSUBSCRIBE_ACK 0x21
+#define VSOMEIP_SEND_COMMAND_SIZE 14
+#define VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN 7
+#define VSOMEIP_SEND_COMMAND_INSTANCE_POS_MAX 8
+#define VSOMEIP_SEND_COMMAND_FLUSH_POS 9
+#define VSOMEIP_SEND_COMMAND_RELIABLE_POS 10
+#define VSOMEIP_SEND_COMMAND_VALID_CRC_POS 11
+#define VSOMEIP_SEND_COMMAND_DST_CLIENT_POS_MIN 12
+#define VSOMEIP_SEND_COMMAND_DST_CLIENT_POS_MAX 13
+#define VSOMEIP_SEND_COMMAND_PAYLOAD_POS 14
+
#define VSOMEIP_OFFER_SERVICE_COMMAND_SIZE 16
#define VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE 17
#define VSOMEIP_RELEASE_SERVICE_COMMAND_SIZE 11
diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp
index aa18f2f..ccf3883 100644
--- a/implementation/configuration/src/configuration_impl.cpp
+++ b/implementation/configuration/src/configuration_impl.cpp
@@ -2400,9 +2400,7 @@ std::uint32_t configuration_impl::get_max_message_size_local() const {
// add sizes of the the routing_manager_proxy's messages
// to the routing_manager stub
- return std::uint32_t(its_max_message_size
- + VSOMEIP_COMMAND_HEADER_SIZE + sizeof(instance_t)
- + sizeof(bool) + sizeof(bool) + sizeof(client_t));
+ return std::uint32_t(its_max_message_size + VSOMEIP_SEND_COMMAND_SIZE);
}
std::uint32_t configuration_impl::get_max_message_size_reliable(
diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp
index 2b235d6..97819bc 100644
--- a/implementation/endpoints/include/client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/client_endpoint_impl.hpp
@@ -42,6 +42,8 @@ public:
virtual ~client_endpoint_impl();
bool send(const uint8_t *_data, uint32_t _size, bool _flush);
+ bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush = true);
bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush = true);
bool flush();
@@ -81,6 +83,11 @@ protected:
void shutdown_and_close_socket_unlocked(bool _recreate_socket);
void start_connect_timer();
+ bool check_message_size(std::uint32_t _size) const;
+ bool check_packetizer_space(std::uint32_t _size);
+ bool check_queue_limit(const uint8_t *_data, std::uint32_t _size) const;
+ void send_or_start_flush_timer(bool _flush, bool _queue_size_zero_on_entry);
+
mutable std::mutex socket_mutex_;
std::unique_ptr<socket_type> socket_;
endpoint_type remote_;
diff --git a/implementation/endpoints/include/endpoint.hpp b/implementation/endpoints/include/endpoint.hpp
index 8ee3f8f..f3bf510 100644
--- a/implementation/endpoints/include/endpoint.hpp
+++ b/implementation/endpoints/include/endpoint.hpp
@@ -10,6 +10,8 @@
#include <vsomeip/primitive_types.hpp>
+#include <vector>
+
namespace vsomeip {
class endpoint_definition;
@@ -27,6 +29,8 @@ public:
virtual bool send(const byte_t *_data, uint32_t _size,
bool _flush = true) = 0;
+ virtual bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush = true) = 0;
virtual bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush = true) = 0;
virtual void enable_magic_cookies() = 0;
diff --git a/implementation/endpoints/include/endpoint_definition.hpp b/implementation/endpoints/include/endpoint_definition.hpp
index 71309fd..860d2d2 100644
--- a/implementation/endpoints/include/endpoint_definition.hpp
+++ b/implementation/endpoints/include/endpoint_definition.hpp
@@ -43,12 +43,9 @@ private:
bool is_reliable_;
static std::mutex definitions_mutex_;
- static std::map<service_t,
- std::map<instance_t,
- std::map<boost::asio::ip::address,
- std::map<uint16_t,
- std::map<bool,
- std::shared_ptr<endpoint_definition> > > > > > definitions_;
+ static std::map<
+ std::tuple<service_t, instance_t, boost::asio::ip::address, uint16_t, bool>,
+ std::shared_ptr<endpoint_definition> > definitions_;
};
} // namespace vsomeip
diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp
index 59a9eee..214a6d0 100644
--- a/implementation/endpoints/include/local_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp
@@ -50,6 +50,8 @@ public:
void restart(bool _force);
void print_status();
+ bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush = true);
private:
void send_queued();
diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp
index 2b0ad60..60e8ff4 100644
--- a/implementation/endpoints/include/server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/server_endpoint_impl.hpp
@@ -41,6 +41,8 @@ public:
bool is_connected() const;
void set_connected(bool _connected);
bool send(const uint8_t *_data, uint32_t _size, bool _flush);
+ bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush = true);
virtual void stop();
bool flush(endpoint_type _target);
diff --git a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
index 4f4d6d4..58619cc 100644
--- a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
@@ -28,6 +28,8 @@ public:
void set_connected(bool _connected);
bool send(const byte_t *_data, uint32_t _size, bool _flush);
+ bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush);
bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush);
void enable_magic_cookies();
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 4423270..b4a6bf9 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -105,96 +105,37 @@ template<typename Protocol>
bool client_endpoint_impl<Protocol>::send(const uint8_t *_data,
uint32_t _size, bool _flush) {
std::lock_guard<std::mutex> its_lock(mutex_);
- if (endpoint_impl<Protocol>::sending_blocked_) {
- return false;
- }
-#if 0
- std::stringstream msg;
- msg << "cei::send: ";
- for (uint32_t i = 0; i < _size; i++)
- msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)_data[i] << " ";
- VSOMEIP_INFO << msg.str();
-#endif
-
- if (endpoint_impl<Protocol>::max_message_size_ != MESSAGE_SIZE_UNLIMITED
- && _size > endpoint_impl<Protocol>::max_message_size_) {
- VSOMEIP_ERROR << "cei::send: Dropping to big message (" << std::dec
- << _size << " Bytes). Maximum allowed message size is: "
- << endpoint_impl<Protocol>::max_message_size_ << " Bytes.";
- return false;
- }
-
+ bool ret(true);
const bool queue_size_zero_on_entry(queue_.empty());
- if (packetizer_->size() + _size < packetizer_->size()) {
- VSOMEIP_ERROR << "Overflow in packetizer addition ~> abort sending!";
- return false;
- }
- if (packetizer_->size() + _size > endpoint_impl<Protocol>::max_message_size_
- && !packetizer_->empty()) {
- queue_.push_back(packetizer_);
- queue_size_ += packetizer_->size();
- packetizer_ = std::make_shared<message_buffer_t>();
- }
-
- if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
- && queue_size_ + _size > endpoint_impl<Protocol>::queue_limit_) {
- service_t its_service(0);
- method_t its_method(0);
- client_t its_client(0);
- session_t its_session(0);
- if (_size >= VSOMEIP_SESSION_POS_MAX) {
- // this will yield wrong IDs for local communication as the commands
- // are prepended to the actual payload
- // it will print:
- // (lowbyte service ID + highbyte methoid)
- // [(Command + lowerbyte sender's client ID).
- // highbyte sender's client ID + lowbyte command size.
- // lowbyte methodid + highbyte vsomeipd length]
- its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
- _data[VSOMEIP_SERVICE_POS_MAX]);
- its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
- _data[VSOMEIP_METHOD_POS_MAX]);
- its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
- _data[VSOMEIP_CLIENT_POS_MAX]);
- its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
- _data[VSOMEIP_SESSION_POS_MAX]);
- }
- VSOMEIP_ERROR << "cei::send: queue size limit (" << std::dec
- << endpoint_impl<Protocol>::queue_limit_
- << ") reached. Dropping message ("
- << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_session << "] "
- << "queue_size: " << std::dec << queue_size_
- << " data size: " << std::dec << _size;
- return false;
- }
- packetizer_->insert(packetizer_->end(), _data, _data + _size);
-
- if (_flush) {
- flush_timer_.cancel();
- queue_.push_back(packetizer_);
- queue_size_ += packetizer_->size();
- packetizer_ = std::make_shared<message_buffer_t>();
+ if (endpoint_impl<Protocol>::sending_blocked_ ||
+ !check_message_size(_size) ||
+ !check_packetizer_space(_size) ||
+ !check_queue_limit(_data, _size)) {
+ ret = false;
} else {
- flush_timer_.expires_from_now(
- std::chrono::milliseconds(VSOMEIP_DEFAULT_FLUSH_TIMEOUT)); // TODO: use config variable
- flush_timer_.async_wait(
- std::bind(
- &client_endpoint_impl<Protocol>::flush_cbk,
- this->shared_from_this(),
- std::placeholders::_1
- )
- );
- }
-
- if (queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress
- send_queued();
+#if 0
+ std::stringstream msg;
+ msg << "cei::send: ";
+ for (uint32_t i = 0; i < _size; i++)
+ msg << std::hex << std::setw(2) << std::setfill('0')
+ << (int)_data[i] << " ";
+ VSOMEIP_INFO << msg.str();
+#endif
+ packetizer_->insert(packetizer_->end(), _data, _data + _size);
+ send_or_start_flush_timer(_flush, queue_size_zero_on_entry);
}
+ return ret;
+}
- return (true);
+template<typename Protocol>
+bool client_endpoint_impl<Protocol>::send(const std::vector<byte_t>& _cmd_header,
+ const byte_t *_data, uint32_t _size,
+ bool _flush) {
+ (void) _cmd_header;
+ (void) _data;
+ (void) _size;
+ (void) _flush;
+ return false;
}
template<typename Protocol>
@@ -427,6 +368,97 @@ void client_endpoint_impl<Protocol>::start_connect_timer() {
this->shared_from_this(), std::placeholders::_1));
}
+template<typename Protocol>
+bool client_endpoint_impl<Protocol>::check_message_size(std::uint32_t _size) const {
+ if (endpoint_impl<Protocol>::max_message_size_ != MESSAGE_SIZE_UNLIMITED
+ && _size > endpoint_impl<Protocol>::max_message_size_) {
+ VSOMEIP_ERROR << "cei::check_message_size: Dropping to big message ("
+ << std::dec << _size << " Bytes). Maximum allowed message size is: "
+ << endpoint_impl<Protocol>::max_message_size_ << " Bytes.";
+ return false;
+ }
+ return true;
+}
+
+template<typename Protocol>
+bool client_endpoint_impl<Protocol>::check_packetizer_space(std::uint32_t _size) {
+ if (packetizer_->size() + _size < packetizer_->size()) {
+ VSOMEIP_ERROR << "Overflow in packetizer addition ~> abort sending!";
+ return false;
+ }
+ if (packetizer_->size() + _size > endpoint_impl<Protocol>::max_message_size_
+ && !packetizer_->empty()) {
+ queue_.push_back(packetizer_);
+ queue_size_ += packetizer_->size();
+ packetizer_ = std::make_shared<message_buffer_t>();
+ }
+ return true;
+}
+
+template<typename Protocol>
+bool client_endpoint_impl<Protocol>::check_queue_limit(const uint8_t *_data, std::uint32_t _size) const {
+ if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
+ && queue_size_ + _size > endpoint_impl<Protocol>::queue_limit_) {
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_size >= VSOMEIP_SESSION_POS_MAX) {
+ // this will yield wrong IDs for local communication as the commands
+ // are prepended to the actual payload
+ // it will print:
+ // (lowbyte service ID + highbyte methoid)
+ // [(Command + lowerbyte sender's client ID).
+ // highbyte sender's client ID + lowbyte command size.
+ // lowbyte methodid + highbyte vsomeipd length]
+ its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
+ _data[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
+ _data[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
+ _data[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
+ _data[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_ERROR << "cei::check_queue_limit: queue size limit (" << std::dec
+ << endpoint_impl<Protocol>::queue_limit_
+ << ") reached. Dropping message ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "] "
+ << "queue_size: " << std::dec << queue_size_
+ << " data size: " << std::dec << _size;
+ return false;
+ }
+ return true;
+}
+
+template<typename Protocol>
+void client_endpoint_impl<Protocol>::send_or_start_flush_timer(
+ bool _flush, bool _queue_size_zero_on_entry) {
+ if (_flush) {
+ flush_timer_.cancel();
+ queue_.push_back(packetizer_);
+ queue_size_ += packetizer_->size();
+ packetizer_ = std::make_shared<message_buffer_t>();
+
+ if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress
+ send_queued();
+ }
+ } else {
+ flush_timer_.expires_from_now(
+ std::chrono::milliseconds(VSOMEIP_DEFAULT_FLUSH_TIMEOUT)); // TODO: use config variable
+ flush_timer_.async_wait(
+ std::bind(
+ &client_endpoint_impl<Protocol>::flush_cbk,
+ this->shared_from_this(),
+ std::placeholders::_1
+ )
+ );
+ }
+}
+
// Instantiate template
#ifndef _WIN32
template class client_endpoint_impl<boost::asio::local::stream_protocol>;
diff --git a/implementation/endpoints/src/endpoint_definition.cpp b/implementation/endpoints/src/endpoint_definition.cpp
index 91808fc..9ab7218 100644
--- a/implementation/endpoints/src/endpoint_definition.cpp
+++ b/implementation/endpoints/src/endpoint_definition.cpp
@@ -9,43 +9,29 @@
namespace vsomeip {
-std::map<service_t,
- std::map<instance_t,
- std::map<boost::asio::ip::address,
- std::map<uint16_t,
- std::map<bool,
- std::shared_ptr<endpoint_definition> > > > > > endpoint_definition::definitions_;
+std::map<std::tuple<service_t, instance_t, boost::asio::ip::address, uint16_t, bool>,
+ std::shared_ptr<endpoint_definition> > endpoint_definition::definitions_;
std::mutex endpoint_definition::definitions_mutex_;
std::shared_ptr<endpoint_definition>
endpoint_definition::get(const boost::asio::ip::address &_address,
uint16_t _port, bool _is_reliable, service_t _service, instance_t _instance) {
+ auto key = std::make_tuple(_service, _instance, _address, _port, _is_reliable);
std::lock_guard<std::mutex> its_lock(definitions_mutex_);
std::shared_ptr<endpoint_definition> its_result;
- auto find_service = definitions_.find(_service);
- if( find_service != definitions_.end()) {
- auto find_instance = find_service->second.find(_instance);
- if (find_instance != find_service->second.end()) {
- auto find_address = find_instance->second.find(_address);
- if (find_address != find_instance->second.end()) {
- auto find_port = find_address->second.find(_port);
- if (find_port != find_address->second.end()) {
- auto found_reliable = find_port->second.find(_is_reliable);
- if (found_reliable != find_port->second.end()) {
- its_result = found_reliable->second;
- }
- }
- }
- }
+ auto found_endpoint = definitions_.find(key);
+ if (found_endpoint != definitions_.end()) {
+ its_result = found_endpoint->second;
}
if (!its_result) {
- its_result = std::make_shared<endpoint_definition>(
- _address, _port, _is_reliable);
- definitions_[_service][_instance][_address][_port][_is_reliable] = its_result;
+ its_result = std::make_shared<endpoint_definition>(
+ _address, _port, _is_reliable);
+ definitions_[key] = its_result;
}
+
return its_result;
}
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp
index 4664fdb..29d861f 100644
--- a/implementation/endpoints/src/local_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp
@@ -171,8 +171,8 @@ void local_client_endpoint_impl::receive() {
}
void local_client_endpoint_impl::send_queued() {
- static byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 };
- static byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 };
+ static const byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 };
+ static const byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 };
std::vector<boost::asio::const_buffer> bufs;
message_buffer_ptr_t its_buffer;
@@ -290,4 +290,33 @@ std::string local_client_endpoint_impl::get_remote_information() const {
#endif
}
+bool local_client_endpoint_impl::send(const std::vector<byte_t>& _cmd_header,
+ const byte_t *_data, uint32_t _size,
+ bool _flush) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ bool ret(true);
+ const bool queue_size_zero_on_entry(queue_.empty());
+
+ if (endpoint_impl::sending_blocked_ ||
+ !check_message_size(static_cast<std::uint32_t>(_cmd_header.size() + _size)) ||
+ !check_packetizer_space(static_cast<std::uint32_t>(_cmd_header.size() + _size))||
+ !check_queue_limit(_data, static_cast<std::uint32_t>(_cmd_header.size() + _size))) {
+ ret = false;
+ } else {
+#if 0
+ std::stringstream msg;
+ msg << "lce::send: ";
+ for (uint32_t i = 0; i < _size; i++)
+ msg << std::hex << std::setw(2) << std::setfill('0')
+ << (int)_data[i] << " ";
+ VSOMEIP_INFO << msg.str();
+#endif
+ packetizer_->reserve(_cmd_header.size() + _size);
+ packetizer_->insert(packetizer_->end(), _cmd_header.begin(), _cmd_header.end());
+ packetizer_->insert(packetizer_->end(), _data, _data + _size);
+ send_or_start_flush_timer(_flush, queue_size_zero_on_entry);
+ }
+ return ret;
+}
+
} // namespace vsomeip
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp
index 005c203..6da5826 100644
--- a/implementation/endpoints/src/server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/server_endpoint_impl.cpp
@@ -116,6 +116,17 @@ template<typename Protocol> bool server_endpoint_impl<Protocol>::send(const uint
}
template<typename Protocol>
+bool server_endpoint_impl<Protocol>::send(
+ const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush) {
+ (void) _cmd_header;
+ (void) _data;
+ (void) _size;
+ (void) _flush;
+ return false;
+}
+
+template<typename Protocol>
bool server_endpoint_impl<Protocol>::send_intern(
endpoint_type _target, const byte_t *_data, uint32_t _size,
bool _flush) {
diff --git a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
index 5dbd69a..30862df 100644
--- a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
@@ -40,6 +40,16 @@ bool virtual_server_endpoint_impl::send(const byte_t *_data, uint32_t _size,
return false;
}
+bool virtual_server_endpoint_impl::send(const std::vector<byte_t>& _cmd_header,
+ const byte_t *_data, uint32_t _size,
+ bool _flush) {
+ (void)_cmd_header;
+ (void)_data;
+ (void)_size;
+ (void)_flush;
+ return false;
+}
+
bool virtual_server_endpoint_impl::send_to(
const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush) {
diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp
index ac483ab..674f41b 100644
--- a/implementation/routing/include/event.hpp
+++ b/implementation/routing/include/event.hpp
@@ -85,7 +85,7 @@ public:
void notify_one(const std::shared_ptr<endpoint_definition> &_target, bool _flush);
void notify_one(client_t _client, bool _flush);
- bool add_subscriber(eventgroup_t _eventgroup, client_t _client);
+ bool add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _force);
void remove_subscriber(eventgroup_t _eventgroup, client_t _client);
bool has_subscriber(eventgroup_t _eventgroup, client_t _client);
std::set<client_t> get_subscribers();
diff --git a/implementation/routing/include/routing_manager_host.hpp b/implementation/routing/include/routing_manager_host.hpp
index c9f8841..7aa767a 100644
--- a/implementation/routing/include/routing_manager_host.hpp
+++ b/implementation/routing/include/routing_manager_host.hpp
@@ -40,6 +40,7 @@ public:
eventgroup_t _eventgroup, event_t _event, uint16_t _error) = 0;
virtual void send(std::shared_ptr<message> _message, bool _flush) = 0;
virtual void on_offered_services_info(std::vector<std::pair<service_t, instance_t>> &_services) = 0;
+ virtual bool is_routing() const = 0;
};
} // namespace vsomeip
diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp
index ad228de..13cd36e 100644
--- a/implementation/routing/src/event.cpp
+++ b/implementation/routing/src/event.cpp
@@ -3,6 +3,8 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#include <iomanip>
+
#include <vsomeip/constants.hpp>
#include <vsomeip/defines.hpp>
#include <vsomeip/message.hpp>
@@ -334,9 +336,21 @@ bool event::has_ref() {
return refs_.size() != 0;
}
-bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client) {
+bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _force) {
std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- return eventgroups_[_eventgroup].insert(_client).second;
+ bool ret = false;
+ if (_force // remote events managed by rm_impl
+ || is_provided_ // events provided by rm_proxies
+ || is_shadow_ // local events managed by rm_impl
+ || is_cache_placeholder_) {
+ ret = eventgroups_[_eventgroup].insert(_client).second;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Didnt' insert client "
+ << std::hex << std::setw(4) << std::setfill('0') << _client
+ << "to eventgroup 0x"
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup;
+ }
+ return ret;
}
void event::remove_subscriber(eventgroup_t _eventgroup, client_t _client) {
diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp
index a4eb888..77bb228 100644
--- a/implementation/routing/src/eventgroupinfo.cpp
+++ b/implementation/routing/src/eventgroupinfo.cpp
@@ -236,6 +236,7 @@ pending_subscription_id_t eventgroupinfo::add_pending_subscription(
if (++subscription_id_ == DEFAULT_SUBSCRIPTION) {
subscription_id_++;
}
+ _pending_subscription.pending_subscription_id_ = subscription_id_;
pending_subscriptions_[subscription_id_] = _pending_subscription;
const auto remote_address_port = std::make_tuple(
@@ -278,10 +279,9 @@ std::vector<pending_subscription_t> eventgroupinfo::remove_pending_subscription(
pending_subscriptions_.erase(found_pending_subscription);
found_remote->second.erase(found_remote->second.begin());
- if (removed_is_subscribe) { // only subscriptions must be acked via SD
- its_pending_sub.pending_subscription_id_ = _subscription_id;
- its_pending_subscriptions.push_back(its_pending_sub);
- }
+ // return removed (un)subscription as first element
+ its_pending_subscriptions.push_back(its_pending_sub);
+
// retrieve all pending (un)subscriptions which arrived during
// the time the rm_proxy answered the currently processed subscription
for (auto iter = found_remote->second.begin();
@@ -291,7 +291,6 @@ std::vector<pending_subscription_t> eventgroupinfo::remove_pending_subscription(
const bool queued_is_subscribe = (other_pen_sub->second.ttl_ > 0);
if (removed_is_subscribe) {
its_pending_subscriptions.push_back(other_pen_sub->second);
- its_pending_subscriptions.back().pending_subscription_id_ = *iter;
if (!queued_is_subscribe) {
// unsubscribe was queued and needs to be sent to
// rm_proxy first before continuing processing
@@ -307,7 +306,6 @@ std::vector<pending_subscription_t> eventgroupinfo::remove_pending_subscription(
// rm_proxy first before continuing processing
// following queued (un)subscriptions
its_pending_subscriptions.push_back(other_pen_sub->second);
- its_pending_subscriptions.back().pending_subscription_id_ = *iter;
break;
} else {
// further queued unsubscriptions can be ignored
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp
index 9296f99..62ae6f2 100644
--- a/implementation/routing/src/routing_manager_base.cpp
+++ b/implementation/routing/src/routing_manager_base.cpp
@@ -356,7 +356,7 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
std::set<client_t> its_any_event_subscribers =
its_any_event->get_subscribers(eventgroup);
for (const client_t subscriber : its_any_event_subscribers) {
- its_event->add_subscriber(eventgroup, subscriber);
+ its_event->add_subscriber(eventgroup, subscriber, true);
}
}
}
@@ -998,36 +998,29 @@ bool routing_manager_base::send_local(
std::shared_ptr<endpoint>& _target, client_t _client,
const byte_t *_data, uint32_t _size, instance_t _instance,
bool _flush, bool _reliable, uint8_t _command, bool _is_valid_crc) const {
- std::size_t its_complete_size = _size + sizeof(instance_t)
- + sizeof(bool) + sizeof(bool) + sizeof(bool);
- client_t sender = get_client();
- if (_command == VSOMEIP_NOTIFY_ONE) {
- its_complete_size +=sizeof(client_t);
- }
- std::vector<byte_t> its_command(
- VSOMEIP_COMMAND_HEADER_SIZE + its_complete_size);
- its_command[VSOMEIP_COMMAND_TYPE_POS] = _command;
- std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &sender,
- sizeof(client_t));
- std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_complete_size,
- sizeof(_size));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], _data,
- _size);
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size],
+ const std::size_t its_complete_size = VSOMEIP_SEND_COMMAND_SIZE
+ - VSOMEIP_COMMAND_HEADER_SIZE + _size;
+ const client_t sender = get_client();
+
+ std::vector<byte_t> its_command_header(VSOMEIP_SEND_COMMAND_SIZE);
+ its_command_header[VSOMEIP_COMMAND_TYPE_POS] = _command;
+ std::memcpy(&its_command_header[VSOMEIP_COMMAND_CLIENT_POS],
+ &sender, sizeof(client_t));
+ std::memcpy(&its_command_header[VSOMEIP_COMMAND_SIZE_POS_MIN],
+ &its_complete_size, sizeof(_size));
+ std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN],
&_instance, sizeof(instance_t));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size
- + sizeof(instance_t)], &_flush, sizeof(bool));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size
- + sizeof(instance_t) + sizeof(bool)], &_reliable, sizeof(bool));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size
- + sizeof(instance_t) + sizeof(bool) + sizeof(bool)], &_is_valid_crc, sizeof(bool));
- if (_command == VSOMEIP_NOTIFY_ONE) {
- // Add target client
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size
- + sizeof(instance_t) + sizeof(bool) + sizeof(bool) + sizeof(bool)], &_client, sizeof(client_t));
- }
-
- return _target->send(&its_command[0], uint32_t(its_command.size()));
+ std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_FLUSH_POS],
+ &_flush, sizeof(bool));
+ std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_RELIABLE_POS],
+ &_reliable, sizeof(bool));
+ std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_VALID_CRC_POS],
+ &_is_valid_crc, sizeof(bool));
+ // Add target client, only relevant for selective notifications
+ std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_DST_CLIENT_POS_MIN],
+ &_client, sizeof(client_t));
+
+ return _target->send(its_command_header, _data, _size);
}
bool routing_manager_base::insert_subscription(
@@ -1037,7 +1030,8 @@ bool routing_manager_base::insert_subscription(
if (_event != ANY_EVENT) { // subscribe to specific event
std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
if (its_event) {
- is_inserted = its_event->add_subscriber(_eventgroup, _client);
+ is_inserted = its_event->add_subscriber(_eventgroup, _client,
+ host_->is_routing());
} else {
VSOMEIP_WARNING << "routing_manager_base::insert_subscription("
<< std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
@@ -1067,7 +1061,8 @@ bool routing_manager_base::insert_subscription(
// eventgroups
_already_subscribed_events->insert(e->get_event());
}
- is_inserted = e->add_subscriber(_eventgroup, _client) || is_inserted;
+ is_inserted = e->add_subscriber(_eventgroup, _client,
+ host_->is_routing()) || is_inserted;
}
}
} else {
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 733d75b..faf5282 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -1588,15 +1588,17 @@ bool routing_manager_impl::deliver_notification(
bool cache_event = false;
for (const auto eg : its_event->get_eventgroups()) {
std::shared_ptr<eventgroupinfo> egi = find_eventgroup(_service, _instance, eg);
- for (const auto &e : egi->get_events()) {
- cache_event = (e->get_subscribers().size() > 0);
+ if (egi) {
+ for (const auto &e : egi->get_events()) {
+ cache_event = (e->get_subscribers().size() > 0);
+ if (cache_event) {
+ break;
+ }
+ }
if (cache_event) {
break;
}
}
- if (cache_event) {
- break;
- }
}
if (!cache_event) {
return true; // as there is nothing to do
@@ -4322,7 +4324,7 @@ bool routing_manager_impl::create_placeholder_event_and_subscribe(
std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
if (its_event) {
- is_inserted = its_event->add_subscriber(_eventgroup, _client);
+ is_inserted = its_event->add_subscriber(_eventgroup, _client, false);
}
return is_inserted;
}
@@ -4586,12 +4588,27 @@ void routing_manager_impl::on_unsubscribe_ack(client_t _client, service_t _servi
std::vector<pending_subscription_t> its_pending_subscriptions =
its_eventgroup->remove_pending_subscription(_unsubscription_id);
for (const pending_subscription_t& its_sd_message_id : its_pending_subscriptions) {
- const pending_subscription_id_t its_subscription_id = its_sd_message_id.pending_subscription_id_;
- const client_t its_subscribing_client = its_sd_message_id.subscribing_client_;
- const client_t its_offering_client = find_local_client(_service, _instance);
- send_subscription(its_offering_client, its_subscribing_client, _service,
- _instance, _eventgroup, its_eventgroup->get_major(),
- its_subscription_id);
+ if (its_sd_message_id.pending_subscription_id_ == _unsubscription_id) {
+ its_eventgroup->remove_target(its_sd_message_id.target_);
+ clear_remote_subscriber(_service, _instance,
+ its_sd_message_id.subscribing_client_,
+ its_sd_message_id.target_);
+ if (its_eventgroup->get_targets().size() == 0) {
+ for (auto e : its_eventgroup->get_events()) {
+ if (e->is_shadow()) {
+ e->unset_payload();
+ }
+ }
+ }
+ } else {
+ const pending_subscription_id_t its_subscription_id =
+ its_sd_message_id.pending_subscription_id_;
+ const client_t its_subscribing_client = its_sd_message_id.subscribing_client_;
+ const client_t its_offering_client = find_local_client(_service, _instance);
+ send_subscription(its_offering_client, its_subscribing_client, _service,
+ _instance, _eventgroup, its_eventgroup->get_major(),
+ its_subscription_id);
+ }
}
}
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index 6d1dbca..8d48b25 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -878,23 +878,21 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
switch (its_command) {
case VSOMEIP_SEND: {
instance_t its_instance;
- std::memcpy(&its_instance,
- &_data[_size - sizeof(instance_t) - sizeof(bool)
- - sizeof(bool) - sizeof(bool)], sizeof(instance_t));
bool its_reliable;
- std::memcpy(&its_reliable, &_data[_size - sizeof(bool) - sizeof(bool)],
- sizeof(its_reliable));
bool its_is_vslid_crc;
- std::memcpy(&its_is_vslid_crc, &_data[_size - sizeof(bool)],
- sizeof(its_is_vslid_crc));
-
- // reduce by size of instance, flush, reliable and is_valid_crc flag
+ std::memcpy(&its_instance,&_data[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN],
+ sizeof(instance_t));
+ std::memcpy(&its_reliable, &_data[VSOMEIP_SEND_COMMAND_RELIABLE_POS],
+ sizeof(its_reliable));
+ std::memcpy(&its_is_vslid_crc, &_data[VSOMEIP_SEND_COMMAND_VALID_CRC_POS],
+ sizeof(its_is_vslid_crc));
+
+ // reduce by size of instance, flush, reliable, client and is_valid_crc flag
const std::uint32_t its_message_size = its_length -
- static_cast<uint32_t>(sizeof(its_instance)
- + sizeof(bool) + sizeof(bool) + sizeof(bool));
+ (VSOMEIP_SEND_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE);
auto a_deserializer = get_deserializer();
- a_deserializer->set_data(&_data[VSOMEIP_COMMAND_PAYLOAD_POS],
+ a_deserializer->set_data(&_data[VSOMEIP_SEND_COMMAND_PAYLOAD_POS],
its_message_size);
std::shared_ptr<message> its_message(a_deserializer->deserialize_message());
a_deserializer->reset();
@@ -1964,7 +1962,7 @@ bool routing_manager_proxy::create_placeholder_event_and_subscribe(
true);
std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
if (its_event) {
- is_inserted = its_event->add_subscriber(_eventgroup, _client);
+ is_inserted = its_event->add_subscriber(_eventgroup, _client, false);
}
return is_inserted;
}
diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp
index ffc8b09..528c729 100644
--- a/implementation/routing/src/routing_manager_stub.cpp
+++ b/implementation/routing/src/routing_manager_stub.cpp
@@ -422,18 +422,19 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
<< std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
break;
case VSOMEIP_SEND: {
- its_data = &_data[VSOMEIP_COMMAND_PAYLOAD_POS];
+ its_data = &_data[VSOMEIP_SEND_COMMAND_PAYLOAD_POS];
its_service = VSOMEIP_BYTES_TO_WORD(
its_data[VSOMEIP_SERVICE_POS_MIN],
its_data[VSOMEIP_SERVICE_POS_MAX]);
its_client_from_header = VSOMEIP_BYTES_TO_WORD(
its_data[VSOMEIP_CLIENT_POS_MIN],
its_data[VSOMEIP_CLIENT_POS_MAX]);
- std::memcpy(&its_instance, &_data[_size - sizeof(instance_t)
- - sizeof(bool) - sizeof(bool) - sizeof(bool)], sizeof(its_instance));
- std::memcpy(&its_reliable, &_data[_size - sizeof(bool) - sizeof(bool)], sizeof(its_reliable));
-
- std::memcpy(&its_is_valid_crc, &_data[_size - sizeof(bool)], sizeof(its_is_valid_crc));
+ std::memcpy(&its_instance, &_data[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN],
+ sizeof(its_instance));
+ std::memcpy(&its_reliable, &_data[VSOMEIP_SEND_COMMAND_RELIABLE_POS],
+ sizeof(its_reliable));
+ std::memcpy(&its_is_valid_crc, &_data[VSOMEIP_SEND_COMMAND_VALID_CRC_POS],
+ sizeof(its_is_valid_crc));
if (utility::is_request(its_data[VSOMEIP_MESSAGE_TYPE_POS])) {
if (!configuration_->is_client_allowed(its_client_from_header,
@@ -456,40 +457,37 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
return;
}
}
- // reduce by size of instance, flush, reliable and is_valid_crc flag
+ // reduce by size of instance, flush, reliable, client and is_valid_crc flag
const std::uint32_t its_message_size = its_size -
- static_cast<std::uint32_t>(sizeof(its_instance)
- + sizeof(bool) + sizeof(bool) + sizeof(bool));
+ (VSOMEIP_SEND_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE);
host_->on_message(its_service, its_instance, its_data, its_message_size, its_reliable, its_is_valid_crc);
break;
}
case VSOMEIP_NOTIFY: {
- its_data = &_data[VSOMEIP_COMMAND_PAYLOAD_POS];
+ its_data = &_data[VSOMEIP_SEND_COMMAND_PAYLOAD_POS];
its_service = VSOMEIP_BYTES_TO_WORD(
its_data[VSOMEIP_SERVICE_POS_MIN],
its_data[VSOMEIP_SERVICE_POS_MAX]);
- std::memcpy(&its_instance, &_data[_size - sizeof(instance_t)
- - sizeof(bool) - sizeof(bool) - sizeof(bool)], sizeof(its_instance));
- // reduce by size of instance, flush, reliable and is_valid_crc flag
+ std::memcpy(&its_instance, &_data[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN],
+ sizeof(its_instance));
+ // reduce by size of instance, flush, reliable, is_valid_crc flag and target client
const std::uint32_t its_message_size = its_size -
- static_cast<uint32_t>(sizeof(its_instance)
- + sizeof(bool) + sizeof(bool));
+ (VSOMEIP_SEND_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE);
host_->on_notification(VSOMEIP_ROUTING_CLIENT, its_service, its_instance, its_data, its_message_size);
break;
}
case VSOMEIP_NOTIFY_ONE: {
- its_data = &_data[VSOMEIP_COMMAND_PAYLOAD_POS];
+ its_data = &_data[VSOMEIP_SEND_COMMAND_PAYLOAD_POS];
its_service = VSOMEIP_BYTES_TO_WORD(
its_data[VSOMEIP_SERVICE_POS_MIN],
its_data[VSOMEIP_SERVICE_POS_MAX]);
- std::memcpy(&its_instance, &_data[_size - sizeof(instance_t) -
- sizeof(bool) - sizeof(bool) - sizeof(bool) - sizeof(client_t)],
- sizeof(its_instance));
- std::memcpy(&its_target_client, &_data[_size - sizeof(client_t)], sizeof(client_t));
+ std::memcpy(&its_instance, &_data[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN],
+ sizeof(its_instance));
+ std::memcpy(&its_target_client, &_data[VSOMEIP_SEND_COMMAND_DST_CLIENT_POS_MIN],
+ sizeof(client_t));
// reduce by size of instance, flush, reliable flag, is_valid_crc and target client
const std::uint32_t its_message_size = its_size -
- static_cast<uint32_t>(sizeof(its_instance)
- + sizeof(bool) + sizeof(bool) + sizeof(client_t));
+ (VSOMEIP_SEND_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE);
host_->on_notification(its_target_client, its_service, its_instance,
its_data, its_message_size, true);
break;
@@ -742,6 +740,14 @@ void routing_manager_stub::on_deregister_application(client_t _client) {
}
void routing_manager_stub::client_registration_func(void) {
+#ifndef _WIN32
+ {
+ std::stringstream s;
+ s << std::hex << std::setw(4) << std::setfill('0')
+ << host_->get_client() << "_client_reg";
+ pthread_setname_np(pthread_self(),s.str().c_str());
+ }
+#endif
std::unique_lock<std::mutex> its_lock(client_registration_mutex_);
while (client_registration_running_) {
while (!pending_client_registrations_.size() && client_registration_running_) {
diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp
index 5d4a80b..219c8f0 100644
--- a/implementation/runtime/src/application_impl.cpp
+++ b/implementation/runtime/src/application_impl.cpp
@@ -11,6 +11,7 @@
#ifndef _WIN32
#include <dlfcn.h>
+#include <sys/syscall.h>
#endif
#include <vsomeip/defines.hpp>
@@ -302,6 +303,15 @@ bool application_impl::init() {
}
void application_impl::start() {
+#ifndef _WIN32
+ if (getpid() != static_cast<pid_t>(syscall(SYS_gettid))) {
+ // only set threadname if calling thread isn't the main thread
+ std::stringstream s;
+ s << std::hex << std::setw(4) << std::setfill('0') << client_
+ << "_io" << std::setw(2) << std::setfill('0') << 0;
+ pthread_setname_np(pthread_self(),s.str().c_str());
+ }
+#endif
const size_t io_thread_count = configuration_->get_io_thread_count(name_);
{
std::lock_guard<std::mutex> its_lock(start_stop_mutex_);
@@ -352,7 +362,20 @@ void application_impl::start() {
VSOMEIP_INFO << "io thread id from application: "
<< std::hex << std::setw(4) << std::setfill('0')
<< client_ << " (" << name_ << ") is: " << std::hex
- << std::this_thread::get_id();
+ << std::this_thread::get_id()
+ #ifndef _WIN32
+ << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
+ #endif
+ ;
+ #ifndef _WIN32
+ {
+ std::stringstream s;
+ s << std::hex << std::setw(4) << std::setfill('0')
+ << client_ << "_io" << std::setw(2)
+ << std::setfill('0') << i+1;
+ pthread_setname_np(pthread_self(),s.str().c_str());
+ }
+ #endif
try {
io_.run();
#ifndef _WIN32
@@ -388,7 +411,11 @@ void application_impl::start() {
app_counter_mutex__.unlock();
VSOMEIP_INFO << "io thread id from application: "
<< std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
- << name_ << ") is: " << std::hex << std::this_thread::get_id();
+ << name_ << ") is: " << std::hex << std::this_thread::get_id()
+#ifndef _WIN32
+ << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
+#endif
+ ;
try {
io_.run();
#ifndef _WIN32
@@ -1554,10 +1581,22 @@ routing_manager * application_impl::get_routing_manager() const {
}
void application_impl::main_dispatch() {
+#ifndef _WIN32
+ {
+ std::stringstream s;
+ s << std::hex << std::setw(4) << std::setfill('0')
+ << client_ << "_m_dispatch";
+ pthread_setname_np(pthread_self(),s.str().c_str());
+ }
+#endif
const std::thread::id its_id = std::this_thread::get_id();
VSOMEIP_INFO << "main dispatch thread id from application: "
<< std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
- << name_ << ") is: " << std::hex << its_id;
+ << name_ << ") is: " << std::hex << its_id
+#ifndef _WIN32
+ << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
+#endif
+ ;
std::unique_lock<std::mutex> its_lock(handlers_mutex_);
while (is_dispatching_) {
if (handlers_.empty() || !is_active_dispatcher(its_id)) {
@@ -1595,15 +1634,28 @@ void application_impl::main_dispatch() {
}
void application_impl::dispatch() {
+#ifndef _WIN32
+ {
+ std::stringstream s;
+ s << std::hex << std::setw(4) << std::setfill('0')
+ << client_ << "_dispatch";
+ pthread_setname_np(pthread_self(),s.str().c_str());
+ }
+#endif
const std::thread::id its_id = std::this_thread::get_id();
VSOMEIP_INFO << "dispatch thread id from application: "
<< std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
- << name_ << ") is: " << std::hex << its_id;
+ << name_ << ") is: " << std::hex << its_id
+#ifndef _WIN32
+ << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
+#endif
+ ;
while (is_active_dispatcher(its_id)) {
std::unique_lock<std::mutex> its_lock(handlers_mutex_);
if (is_dispatching_ && handlers_.empty()) {
dispatcher_condition_.wait(its_lock);
- if (handlers_.empty()) { // Maybe woken up from main dispatcher
+ // Maybe woken up from main dispatcher
+ if (handlers_.empty() && !is_active_dispatcher(its_id)) {
if (!is_dispatching_) {
return;
}
@@ -1860,9 +1912,19 @@ void application_impl::clear_all_handler() {
void application_impl::shutdown() {
VSOMEIP_INFO << "shutdown thread id from application: "
<< std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
- << name_ << ") is: " << std::hex << std::this_thread::get_id();
+ << name_ << ") is: " << std::hex << std::this_thread::get_id()
+#ifndef _WIN32
+ << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
+#endif
+ ;
#ifndef _WIN32
boost::asio::detail::posix_signal_blocker blocker;
+ {
+ std::stringstream s;
+ s << std::hex << std::setw(4) << std::setfill('0')
+ << client_ << "_shutdown";
+ pthread_setname_np(pthread_self(),s.str().c_str());
+ }
#endif
{
diff --git a/implementation/service_discovery/src/message_impl.cpp b/implementation/service_discovery/src/message_impl.cpp
index 98ed9de..9d5c1ac 100755
--- a/implementation/service_discovery/src/message_impl.cpp
+++ b/implementation/service_discovery/src/message_impl.cpp
@@ -239,30 +239,16 @@ bool message_impl::deserialize(vsomeip::deserializer *_from) {
while (is_successful && _from->get_remaining()) {
std::shared_ptr < entry_impl > its_entry(deserialize_entry(_from));
if (its_entry) {
- if (its_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP) {
- bool is_unique(true);
- for (const auto& e : entries_) {
- if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP &&
- *(static_cast<eventgroupentry_impl*>(e.get())) ==
- *(static_cast<eventgroupentry_impl*>(its_entry.get()))) {
- is_unique = false;
- break;
- }
- }
- if (is_unique) {
- entries_.push_back(its_entry);
- if (its_entry->get_ttl() > 0) {
- const std::uint8_t num_options =
- static_cast<std::uint8_t>(
- its_entry->get_num_options(1) +
- its_entry->get_num_options(2));
- number_required_acks_ =
- static_cast<std::uint8_t>(number_required_acks_
- + num_options);
- }
- }
- } else {
- entries_.push_back(its_entry);
+ entries_.push_back(its_entry);
+ if (its_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP
+ && its_entry->get_ttl() > 0) {
+ const std::uint8_t num_options =
+ static_cast<std::uint8_t>(
+ its_entry->get_num_options(1) +
+ its_entry->get_num_options(2));
+ number_required_acks_ =
+ static_cast<std::uint8_t>(number_required_acks_
+ + num_options);
}
} else {
is_successful = false;
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 1ddd80d..d6e61bd 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -2864,6 +2864,10 @@ if(NOT ${TESTS_BAT})
COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_PENDING_SUBSCRIPTION_MASTER_STARTER} SUBSCRIBE_RESUBSCRIBE_MIXED)
set_tests_properties(${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_resubscribe_mixed PROPERTIES TIMEOUT 180)
+ add_test(NAME ${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_stopsubscribe_subscribe
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_PENDING_SUBSCRIPTION_MASTER_STARTER} SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE)
+ set_tests_properties(${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_stopsubscribe_subscribe PROPERTIES TIMEOUT 180)
+
# malicious data test
add_test(NAME ${TEST_MALICIOUS_DATA_NAME}
COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_MALICIOUS_DATA_MASTER_STARTER})
diff --git a/test/big_payload_tests/big_payload_test_client.cpp b/test/big_payload_tests/big_payload_test_client.cpp
index 9b50e79..23a7241 100644
--- a/test/big_payload_tests/big_payload_test_client.cpp
+++ b/test/big_payload_tests/big_payload_test_client.cpp
@@ -208,7 +208,7 @@ void big_payload_test_client::run()
|| test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) {
if (i % 2) {
// try to sent a too big payload for half of the messages
- its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 3,
+ its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 1,
big_payload_test::DATA_CLIENT_TO_SERVICE);
} else {
its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE,
diff --git a/test/big_payload_tests/big_payload_test_local_queue_limited.json b/test/big_payload_tests/big_payload_test_local_queue_limited.json
index 7252680..eac38b2 100644
--- a/test/big_payload_tests/big_payload_test_local_queue_limited.json
+++ b/test/big_payload_tests/big_payload_test_local_queue_limited.json
@@ -29,7 +29,7 @@
"instance":"0x5678"
}
],
- "endpoint-queue-limit-local" : "614428",
+ "endpoint-queue-limit-local" : "614430",
"routing":"big_payload_test_service",
"service-discovery":
{
diff --git a/test/big_payload_tests/big_payload_test_service.cpp b/test/big_payload_tests/big_payload_test_service.cpp
index 4031550..a6868cf 100644
--- a/test/big_payload_tests/big_payload_test_service.cpp
+++ b/test/big_payload_tests/big_payload_test_service.cpp
@@ -162,7 +162,7 @@ void big_payload_test_service::on_message(const std::shared_ptr<vsomeip::message
// this way the client will only get replies for a fourth of his sent
// requests as he tries to sent to big data for every second request
// as well
- its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 3,
+ its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 1,
big_payload_test::DATA_SERVICE_TO_CLIENT);
} else {
its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE,
diff --git a/test/configuration_tests/configuration-test.cpp b/test/configuration_tests/configuration-test.cpp
index f9717c5..bdf66e0 100644
--- a/test/configuration_tests/configuration-test.cpp
+++ b/test/configuration_tests/configuration-test.cpp
@@ -463,9 +463,7 @@ void check_file(const std::string &_config_file,
// use 17000 instead of 1500 as configured max-local-payload size will be
// increased to bigger max-reliable-payload-size
std::uint32_t max_local_message_size(
- 17000u + 16u + + VSOMEIP_COMMAND_HEADER_SIZE
- + sizeof(vsomeip::instance_t) + sizeof(bool) + sizeof(bool)
- + sizeof(vsomeip::client_t));
+ 17000u + 16u + + VSOMEIP_SEND_COMMAND_SIZE);
EXPECT_EQ(max_local_message_size, its_configuration->get_max_message_size_local());
EXPECT_EQ(11u, its_configuration->get_buffer_shrink_threshold());
EXPECT_EQ(14999u + 16u, its_configuration->get_max_message_size_reliable("10.10.10.10", 7777));
diff --git a/test/malicious_data_tests/malicious_data_test_msg_sender.cpp b/test/malicious_data_tests/malicious_data_test_msg_sender.cpp
index ad856b8..27a62c2 100644
--- a/test/malicious_data_tests/malicious_data_test_msg_sender.cpp
+++ b/test/malicious_data_tests/malicious_data_test_msg_sender.cpp
@@ -266,6 +266,11 @@ TEST_F(malicious_data, send_malicious_events)
send_thread.join();
receive_thread.join();
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both);
+ tcp_socket.shutdown(boost::asio::socket_base::shutdown_both);
+ udp_socket.close();
+ tcp_socket.close();
+
}
#ifndef _WIN32
diff --git a/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in b/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in
index beb5db2..2d9311c 100755
--- a/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in
+++ b/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in
@@ -17,7 +17,7 @@ then
echo "Please pass a test mode to this script."
echo "For example: $0 SUSCRIBE"
echo "Valid subscription types include:"
- echo " [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT, SUBSCRIBE_RESUBSCRIBE_MIXED]"
+ echo " [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT, SUBSCRIBE_RESUBSCRIBE_MIXED, SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE]"
exit 1
fi
TESTMODE=$1
diff --git a/test/pending_subscription_tests/pending_subscription_test_globals.hpp b/test/pending_subscription_tests/pending_subscription_test_globals.hpp
index ade1d6f..7fa5480 100644
--- a/test/pending_subscription_tests/pending_subscription_test_globals.hpp
+++ b/test/pending_subscription_tests/pending_subscription_test_globals.hpp
@@ -26,7 +26,8 @@ enum test_mode_e {
UNSUBSCRIBE,
SUBSCRIBE_UNSUBSCRIBE_NACK,
SUBSCRIBE_UNSUBSCRIBE_SAME_PORT,
- SUBSCRIBE_RESUBSCRIBE_MIXED
+ SUBSCRIBE_RESUBSCRIBE_MIXED,
+ SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE
};
}
diff --git a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
index dd7ed5a..d8108a5 100644
--- a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
+++ b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
@@ -57,6 +57,8 @@ TEST_F(pending_subscription, send_multiple_subscriptions)
boost::asio::ip::udp::socket udp_socket(io_,
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+
std::thread receive_thread([&](){
std::atomic<bool> keep_receiving(true);
std::function<void()> receive;
@@ -213,6 +215,9 @@ TEST_F(pending_subscription, send_multiple_subscriptions)
send_thread.join();
receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
}
TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe)
@@ -221,6 +226,8 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe)
boost::asio::ip::udp::socket udp_socket(io_,
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+
std::thread receive_thread([&](){
const std::uint32_t expected_acks(8);
std::atomic<std::uint32_t> acks_received(0);
@@ -404,6 +411,9 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe)
send_thread.join();
receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
}
TEST_F(pending_subscription, send_multiple_unsubscriptions)
@@ -412,6 +422,8 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions)
boost::asio::ip::udp::socket udp_socket(io_,
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+
std::thread receive_thread([&](){
const std::uint32_t expected_acks(2);
std::atomic<std::uint32_t> acks_received(0);
@@ -595,6 +607,9 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions)
send_thread.join();
receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
}
TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe)
@@ -603,6 +618,8 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe)
boost::asio::ip::udp::socket udp_socket(io_,
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+
std::thread receive_thread([&](){
const std::uint32_t expected_acks(8);
std::atomic<std::uint32_t> acks_received(0);
@@ -797,6 +814,9 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe)
send_thread.join();
receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
}
TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
@@ -805,8 +825,11 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
std::promise<void> tcp_connected;
boost::asio::ip::udp::socket udp_socket(io_,
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
boost::asio::ip::tcp::socket tcp_socket(io_,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 30490));
+ tcp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+
std::thread receive_thread([&](){
const std::uint32_t expected_acks(8);
std::atomic<std::uint32_t> acks_received(0);
@@ -1004,6 +1027,11 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
send_thread.join();
receive_thread.join();
+ boost::system::error_code ec;
+ tcp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ tcp_socket.close(ec);
+ udp_socket.close(ec);
}
/*
@@ -1018,6 +1046,8 @@ TEST_F(pending_subscription, subscribe_resubscribe_mixed)
boost::asio::ip::udp::socket udp_socket(io_,
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+
std::thread receive_thread([&](){
std::atomic<bool> keep_receiving(true);
std::function<void()> receive;
@@ -1201,6 +1231,213 @@ TEST_F(pending_subscription, subscribe_resubscribe_mixed)
send_thread.join();
receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
+}
+
+/*
+ * @test Send a SD message containing a Subscription followed by a StopSubscribe
+ * Subscribe entry to the same service. Check to receive an initial event
+ */
+TEST_F(pending_subscription, send_subscribe_stop_subscribe_subscribe)
+{
+ std::promise<bool> trigger_notifications;
+ std::promise<void> tcp_connected;
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ boost::asio::ip::tcp::socket tcp_socket(io_,
+ boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 30490));
+ tcp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+
+ std::thread receive_thread([&](){
+ const std::uint32_t expected_acks(1);
+ std::atomic<std::uint32_t> acks_received(0);
+
+ const std::uint32_t expected_responses(1);
+ std::atomic<std::uint32_t> responses_received(0);
+
+ const std::uint32_t expected_notifications(1);
+ std::atomic<std::uint32_t> notifications_received(0);
+
+ bool triggered_notifications(false);
+
+ std::function<void()> receive;
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+
+ boost::system::error_code ec;
+ tcp_socket.connect(boost::asio::ip::tcp::endpoint(
+ boost::asio::ip::address::from_string(remote_address), 40001), ec);
+ ASSERT_EQ(0, ec.value());
+ tcp_connected.set_value();
+
+ const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
+ const boost::system::error_code& error, std::size_t bytes_transferred) {
+ if (error) {
+ acks_received = expected_acks;
+ responses_received = expected_responses;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ return;
+ }
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
+ }
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+
+ vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[VSOMEIP_METHOD_POS_MAX]);
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(1u, sd_msg.get_entries().size());
+ for (auto e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(16u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ acks_received++;
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ EXPECT_EQ(1u, its_received_events.size());
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
+ }
+
+
+ if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+
+ if (!error && (acks_received != expected_acks ||
+ responses_received != expected_responses ||
+ notifications_received != expected_notifications)) {
+ receive();
+ }
+ };
+
+ receive = [&]() {
+ udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
+ receive_cbk);
+ };
+
+ receive();
+ while(acks_received < expected_acks ||
+ responses_received < expected_responses ||
+ notifications_received < expected_notifications) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ EXPECT_EQ(expected_acks, acks_received);
+ EXPECT_EQ(expected_responses, responses_received);
+ EXPECT_EQ(expected_notifications, notifications_received);
+ });
+
+ std::thread send_thread([&]() {
+ if (std::future_status::timeout == tcp_connected.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't establish tcp connection within time";
+ }
+
+ try {
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x50, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x30, // length entries array
+ 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x10, // Stop subscribe Eventgroup entry
+ 0x11, 0x22, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x10, 0x00,
+ 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry
+ 0x11, 0x22, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x10,
+ 0x00, 0x00, 0x10, 0x00,
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[80], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+
+ if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAcks within time";
+ } else {
+ // call notify method
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+ }
+
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+ send_thread.join();
+ receive_thread.join();
+ boost::system::error_code ec;
+ tcp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ tcp_socket.close(ec);
+ udp_socket.close(ec);
}
#ifndef _WIN32
@@ -1227,6 +1464,8 @@ int main(int argc, char** argv) {
::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_unsubscribe_same_port";
} else if (its_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) {
::testing::GTEST_FLAG(filter) = "*subscribe_resubscribe_mixed";
+ } else if (its_testmode == std::string("SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE")) {
+ ::testing::GTEST_FLAG(filter) = "*send_subscribe_stop_subscribe_subscribe";
}
return RUN_ALL_TESTS();
}
diff --git a/test/pending_subscription_tests/pending_subscription_test_service.cpp b/test/pending_subscription_tests/pending_subscription_test_service.cpp
index 59e7aa9..2c33a44 100644
--- a/test/pending_subscription_tests/pending_subscription_test_service.cpp
+++ b/test/pending_subscription_tests/pending_subscription_test_service.cpp
@@ -148,6 +148,8 @@ public:
;
} else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) {
;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE) {
+ ;
}
std::future<bool> itsFuture = notify_method_called_.get_future();
if (std::future_status::timeout == itsFuture.wait_for(std::chrono::seconds(10))) {
@@ -202,7 +204,7 @@ public:
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
if (_subscribed) {
- _cbk((count_subscribe % 2)); // nack every second subscription
+ _cbk(((count_subscribe + 1) % 2)); // nack every second subscription
} else {
_cbk(true);
}
@@ -227,6 +229,16 @@ public:
EXPECT_TRUE(_subscribed);
_cbk(true);
subscription_accepted_asynchronous_ = true;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE) {
+ static int was_called = 0;
+ was_called++;
+ EXPECT_EQ(1, was_called);
+ EXPECT_TRUE(_subscribed);
+ subscription_accepted_asynchronous_ = true;
+ // this test doesn't subscribe to the second eventgroup which is handled by the asynchronous
+ // subscription handler, set it to true here:
+ subscription_accepted_synchronous_ = true;
+ _cbk(true);
}
}
@@ -274,7 +286,7 @@ public:
subscription_accepted_synchronous_ = true;
}
if (_subscribed) {
- ret = (count_subscribed % 2); // nack every second subscription
+ ret = ((count_subscribed + 1) % 2); // nack every second subscription
} else {
ret = true;
}
@@ -348,6 +360,8 @@ int main(int argc, char** argv)
its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT;
} else if (its_pased_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) {
its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED;
+ } else if (its_pased_testmode == std::string("SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE")) {
+ its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE;
}
return RUN_ALL_TESTS();