diff options
author | Juergen Gehring <juergen.gehring@bmw.de> | 2018-05-22 02:56:47 -0700 |
---|---|---|
committer | Juergen Gehring <juergen.gehring@bmw.de> | 2018-05-22 02:56:47 -0700 |
commit | 9fb9beecadf52083599302fa8ddee7efbec64a39 (patch) | |
tree | 2d673725855805ba72ec2cd73aa788be882e8b65 | |
parent | 23b6a4bcc6632b7c217b854bb879eb9cd3bc2f29 (diff) | |
download | vSomeIP-9fb9beecadf52083599302fa8ddee7efbec64a39.tar.gz |
vsomeip 2.10.212.10.21
35 files changed, 699 insertions, 251 deletions
@@ -1,6 +1,20 @@ Changes ======= +v2.10.21 +- Improve memory usage of routing manager. +- Improve handling of incoming SD messages with uncommon entry + combinations. +- Name all threads under Linux and log thread IDs during startup. +- Optimize memory allocation for internal message handling. +- Ensure an (extra) dispatch thread is running in case the main + dispatch thread is (still) blocked. +- Fix race condition which could lead to missing initial events for + local subscriptions if the application hosting the service called + application::offer_event and additionally application::request_event + for the same event. +- Fixed crash + v2.10.20 - Add security config (i.e. vsomeip_security.json) to mandatory config files - Enable local_routing_test_starter.sh to use externally defined configuration diff --git a/CMakeLists.txt b/CMakeLists.txt index 04ae559..2c4b3f5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,9 @@ project (vsomeip) set (VSOMEIP_MAJOR_VERSION 2) set (VSOMEIP_MINOR_VERSION 10) -set (VSOMEIP_PATCH_VERSION 20) +set (VSOMEIP_PATCH_VERSION 21) +set (VSOMEIP_HOTFIX_VERSION 0) + set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION}) set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in set (CMAKE_VERBOSE_MAKEFILE off) @@ -148,7 +150,11 @@ file(GLOB_RECURSE vsomeip_e2e_SRC list(SORT vsomeip_SRC) list(SORT vsomeip_e2e_SRC) +if (${VSOMEIP_HOTFIX_VERSION} EQUAL 0) add_definitions(-DVSOMEIP_VERSION="${VSOMEIP_VERSION}") +else() +add_definitions(-DVSOMEIP_VERSION="${VSOMEIP_VERSION}.${VSOMEIP_HOTFIX_VERSION}") +endif() if (MSVC) message("using MSVC Compiler") diff --git a/implementation/configuration/include/internal.hpp.in b/implementation/configuration/include/internal.hpp.in index 6135205..1cb9895 100644 --- a/implementation/configuration/include/internal.hpp.in +++ b/implementation/configuration/include/internal.hpp.in @@ -72,7 +72,7 @@ #define VSOMEIP_COMMAND_TYPE_POS 0 #define VSOMEIP_COMMAND_CLIENT_POS 1 #define VSOMEIP_COMMAND_SIZE_POS_MIN 3 -#define VSOMEIP_COMMAND_SIZE_POS_MAX 5 +#define VSOMEIP_COMMAND_SIZE_POS_MAX 6 #define VSOMEIP_COMMAND_PAYLOAD_POS 7 #define VSOMEIP_REGISTER_APPLICATION 0x00 @@ -105,6 +105,16 @@ #define VSOMEIP_OFFERED_SERVICES_RESPONSE 0x20 #define VSOMEIP_UNSUBSCRIBE_ACK 0x21 +#define VSOMEIP_SEND_COMMAND_SIZE 14 +#define VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN 7 +#define VSOMEIP_SEND_COMMAND_INSTANCE_POS_MAX 8 +#define VSOMEIP_SEND_COMMAND_FLUSH_POS 9 +#define VSOMEIP_SEND_COMMAND_RELIABLE_POS 10 +#define VSOMEIP_SEND_COMMAND_VALID_CRC_POS 11 +#define VSOMEIP_SEND_COMMAND_DST_CLIENT_POS_MIN 12 +#define VSOMEIP_SEND_COMMAND_DST_CLIENT_POS_MAX 13 +#define VSOMEIP_SEND_COMMAND_PAYLOAD_POS 14 + #define VSOMEIP_OFFER_SERVICE_COMMAND_SIZE 16 #define VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE 17 #define VSOMEIP_RELEASE_SERVICE_COMMAND_SIZE 11 diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp index aa18f2f..ccf3883 100644 --- a/implementation/configuration/src/configuration_impl.cpp +++ b/implementation/configuration/src/configuration_impl.cpp @@ -2400,9 +2400,7 @@ std::uint32_t configuration_impl::get_max_message_size_local() const { // add sizes of the the routing_manager_proxy's messages // to the routing_manager stub - return std::uint32_t(its_max_message_size - + VSOMEIP_COMMAND_HEADER_SIZE + sizeof(instance_t) - + sizeof(bool) + sizeof(bool) + sizeof(client_t)); + return std::uint32_t(its_max_message_size + VSOMEIP_SEND_COMMAND_SIZE); } std::uint32_t configuration_impl::get_max_message_size_reliable( diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index 2b235d6..97819bc 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -42,6 +42,8 @@ public: virtual ~client_endpoint_impl();
bool send(const uint8_t *_data, uint32_t _size, bool _flush);
+ bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush = true);
bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush = true);
bool flush();
@@ -81,6 +83,11 @@ protected: void shutdown_and_close_socket_unlocked(bool _recreate_socket);
void start_connect_timer();
+ bool check_message_size(std::uint32_t _size) const;
+ bool check_packetizer_space(std::uint32_t _size);
+ bool check_queue_limit(const uint8_t *_data, std::uint32_t _size) const;
+ void send_or_start_flush_timer(bool _flush, bool _queue_size_zero_on_entry);
+
mutable std::mutex socket_mutex_;
std::unique_ptr<socket_type> socket_;
endpoint_type remote_;
diff --git a/implementation/endpoints/include/endpoint.hpp b/implementation/endpoints/include/endpoint.hpp index 8ee3f8f..f3bf510 100644 --- a/implementation/endpoints/include/endpoint.hpp +++ b/implementation/endpoints/include/endpoint.hpp @@ -10,6 +10,8 @@ #include <vsomeip/primitive_types.hpp>
+#include <vector>
+
namespace vsomeip {
class endpoint_definition;
@@ -27,6 +29,8 @@ public: virtual bool send(const byte_t *_data, uint32_t _size,
bool _flush = true) = 0;
+ virtual bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush = true) = 0;
virtual bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush = true) = 0;
virtual void enable_magic_cookies() = 0;
diff --git a/implementation/endpoints/include/endpoint_definition.hpp b/implementation/endpoints/include/endpoint_definition.hpp index 71309fd..860d2d2 100644 --- a/implementation/endpoints/include/endpoint_definition.hpp +++ b/implementation/endpoints/include/endpoint_definition.hpp @@ -43,12 +43,9 @@ private: bool is_reliable_; static std::mutex definitions_mutex_; - static std::map<service_t, - std::map<instance_t, - std::map<boost::asio::ip::address, - std::map<uint16_t, - std::map<bool, - std::shared_ptr<endpoint_definition> > > > > > definitions_; + static std::map< + std::tuple<service_t, instance_t, boost::asio::ip::address, uint16_t, bool>, + std::shared_ptr<endpoint_definition> > definitions_; }; } // namespace vsomeip diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp index 59a9eee..214a6d0 100644 --- a/implementation/endpoints/include/local_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp @@ -50,6 +50,8 @@ public: void restart(bool _force); void print_status(); + bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data, + uint32_t _size, bool _flush = true); private: void send_queued(); diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp index 2b0ad60..60e8ff4 100644 --- a/implementation/endpoints/include/server_endpoint_impl.hpp +++ b/implementation/endpoints/include/server_endpoint_impl.hpp @@ -41,6 +41,8 @@ public: bool is_connected() const;
void set_connected(bool _connected);
bool send(const uint8_t *_data, uint32_t _size, bool _flush);
+ bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush = true);
virtual void stop();
bool flush(endpoint_type _target);
diff --git a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp index 4f4d6d4..58619cc 100644 --- a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp @@ -28,6 +28,8 @@ public: void set_connected(bool _connected); bool send(const byte_t *_data, uint32_t _size, bool _flush); + bool send(const std::vector<byte_t>& _cmd_header, const byte_t *_data, + uint32_t _size, bool _flush); bool send_to(const std::shared_ptr<endpoint_definition> _target, const byte_t *_data, uint32_t _size, bool _flush); void enable_magic_cookies(); diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index 4423270..b4a6bf9 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -105,96 +105,37 @@ template<typename Protocol> bool client_endpoint_impl<Protocol>::send(const uint8_t *_data,
uint32_t _size, bool _flush) {
std::lock_guard<std::mutex> its_lock(mutex_);
- if (endpoint_impl<Protocol>::sending_blocked_) {
- return false;
- }
-#if 0
- std::stringstream msg;
- msg << "cei::send: ";
- for (uint32_t i = 0; i < _size; i++)
- msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)_data[i] << " ";
- VSOMEIP_INFO << msg.str();
-#endif
-
- if (endpoint_impl<Protocol>::max_message_size_ != MESSAGE_SIZE_UNLIMITED
- && _size > endpoint_impl<Protocol>::max_message_size_) {
- VSOMEIP_ERROR << "cei::send: Dropping to big message (" << std::dec
- << _size << " Bytes). Maximum allowed message size is: "
- << endpoint_impl<Protocol>::max_message_size_ << " Bytes.";
- return false;
- }
-
+ bool ret(true);
const bool queue_size_zero_on_entry(queue_.empty());
- if (packetizer_->size() + _size < packetizer_->size()) {
- VSOMEIP_ERROR << "Overflow in packetizer addition ~> abort sending!";
- return false;
- }
- if (packetizer_->size() + _size > endpoint_impl<Protocol>::max_message_size_
- && !packetizer_->empty()) {
- queue_.push_back(packetizer_);
- queue_size_ += packetizer_->size();
- packetizer_ = std::make_shared<message_buffer_t>();
- }
-
- if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
- && queue_size_ + _size > endpoint_impl<Protocol>::queue_limit_) {
- service_t its_service(0);
- method_t its_method(0);
- client_t its_client(0);
- session_t its_session(0);
- if (_size >= VSOMEIP_SESSION_POS_MAX) {
- // this will yield wrong IDs for local communication as the commands
- // are prepended to the actual payload
- // it will print:
- // (lowbyte service ID + highbyte methoid)
- // [(Command + lowerbyte sender's client ID).
- // highbyte sender's client ID + lowbyte command size.
- // lowbyte methodid + highbyte vsomeipd length]
- its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
- _data[VSOMEIP_SERVICE_POS_MAX]);
- its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
- _data[VSOMEIP_METHOD_POS_MAX]);
- its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
- _data[VSOMEIP_CLIENT_POS_MAX]);
- its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
- _data[VSOMEIP_SESSION_POS_MAX]);
- }
- VSOMEIP_ERROR << "cei::send: queue size limit (" << std::dec
- << endpoint_impl<Protocol>::queue_limit_
- << ") reached. Dropping message ("
- << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_session << "] "
- << "queue_size: " << std::dec << queue_size_
- << " data size: " << std::dec << _size;
- return false;
- }
- packetizer_->insert(packetizer_->end(), _data, _data + _size);
-
- if (_flush) {
- flush_timer_.cancel();
- queue_.push_back(packetizer_);
- queue_size_ += packetizer_->size();
- packetizer_ = std::make_shared<message_buffer_t>();
+ if (endpoint_impl<Protocol>::sending_blocked_ ||
+ !check_message_size(_size) ||
+ !check_packetizer_space(_size) ||
+ !check_queue_limit(_data, _size)) {
+ ret = false;
} else {
- flush_timer_.expires_from_now(
- std::chrono::milliseconds(VSOMEIP_DEFAULT_FLUSH_TIMEOUT)); // TODO: use config variable
- flush_timer_.async_wait(
- std::bind(
- &client_endpoint_impl<Protocol>::flush_cbk,
- this->shared_from_this(),
- std::placeholders::_1
- )
- );
- }
-
- if (queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress
- send_queued();
+#if 0
+ std::stringstream msg;
+ msg << "cei::send: ";
+ for (uint32_t i = 0; i < _size; i++)
+ msg << std::hex << std::setw(2) << std::setfill('0')
+ << (int)_data[i] << " ";
+ VSOMEIP_INFO << msg.str();
+#endif
+ packetizer_->insert(packetizer_->end(), _data, _data + _size);
+ send_or_start_flush_timer(_flush, queue_size_zero_on_entry);
}
+ return ret;
+}
- return (true);
+template<typename Protocol>
+bool client_endpoint_impl<Protocol>::send(const std::vector<byte_t>& _cmd_header,
+ const byte_t *_data, uint32_t _size,
+ bool _flush) {
+ (void) _cmd_header;
+ (void) _data;
+ (void) _size;
+ (void) _flush;
+ return false;
}
template<typename Protocol>
@@ -427,6 +368,97 @@ void client_endpoint_impl<Protocol>::start_connect_timer() { this->shared_from_this(), std::placeholders::_1));
}
+template<typename Protocol>
+bool client_endpoint_impl<Protocol>::check_message_size(std::uint32_t _size) const {
+ if (endpoint_impl<Protocol>::max_message_size_ != MESSAGE_SIZE_UNLIMITED
+ && _size > endpoint_impl<Protocol>::max_message_size_) {
+ VSOMEIP_ERROR << "cei::check_message_size: Dropping to big message ("
+ << std::dec << _size << " Bytes). Maximum allowed message size is: "
+ << endpoint_impl<Protocol>::max_message_size_ << " Bytes.";
+ return false;
+ }
+ return true;
+}
+
+template<typename Protocol>
+bool client_endpoint_impl<Protocol>::check_packetizer_space(std::uint32_t _size) {
+ if (packetizer_->size() + _size < packetizer_->size()) {
+ VSOMEIP_ERROR << "Overflow in packetizer addition ~> abort sending!";
+ return false;
+ }
+ if (packetizer_->size() + _size > endpoint_impl<Protocol>::max_message_size_
+ && !packetizer_->empty()) {
+ queue_.push_back(packetizer_);
+ queue_size_ += packetizer_->size();
+ packetizer_ = std::make_shared<message_buffer_t>();
+ }
+ return true;
+}
+
+template<typename Protocol>
+bool client_endpoint_impl<Protocol>::check_queue_limit(const uint8_t *_data, std::uint32_t _size) const {
+ if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
+ && queue_size_ + _size > endpoint_impl<Protocol>::queue_limit_) {
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_size >= VSOMEIP_SESSION_POS_MAX) {
+ // this will yield wrong IDs for local communication as the commands
+ // are prepended to the actual payload
+ // it will print:
+ // (lowbyte service ID + highbyte methoid)
+ // [(Command + lowerbyte sender's client ID).
+ // highbyte sender's client ID + lowbyte command size.
+ // lowbyte methodid + highbyte vsomeipd length]
+ its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
+ _data[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
+ _data[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
+ _data[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
+ _data[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_ERROR << "cei::check_queue_limit: queue size limit (" << std::dec
+ << endpoint_impl<Protocol>::queue_limit_
+ << ") reached. Dropping message ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "] "
+ << "queue_size: " << std::dec << queue_size_
+ << " data size: " << std::dec << _size;
+ return false;
+ }
+ return true;
+}
+
+template<typename Protocol>
+void client_endpoint_impl<Protocol>::send_or_start_flush_timer(
+ bool _flush, bool _queue_size_zero_on_entry) {
+ if (_flush) {
+ flush_timer_.cancel();
+ queue_.push_back(packetizer_);
+ queue_size_ += packetizer_->size();
+ packetizer_ = std::make_shared<message_buffer_t>();
+
+ if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress
+ send_queued();
+ }
+ } else {
+ flush_timer_.expires_from_now(
+ std::chrono::milliseconds(VSOMEIP_DEFAULT_FLUSH_TIMEOUT)); // TODO: use config variable
+ flush_timer_.async_wait(
+ std::bind(
+ &client_endpoint_impl<Protocol>::flush_cbk,
+ this->shared_from_this(),
+ std::placeholders::_1
+ )
+ );
+ }
+}
+
// Instantiate template
#ifndef _WIN32
template class client_endpoint_impl<boost::asio::local::stream_protocol>;
diff --git a/implementation/endpoints/src/endpoint_definition.cpp b/implementation/endpoints/src/endpoint_definition.cpp index 91808fc..9ab7218 100644 --- a/implementation/endpoints/src/endpoint_definition.cpp +++ b/implementation/endpoints/src/endpoint_definition.cpp @@ -9,43 +9,29 @@ namespace vsomeip { -std::map<service_t, - std::map<instance_t, - std::map<boost::asio::ip::address, - std::map<uint16_t, - std::map<bool, - std::shared_ptr<endpoint_definition> > > > > > endpoint_definition::definitions_; +std::map<std::tuple<service_t, instance_t, boost::asio::ip::address, uint16_t, bool>, + std::shared_ptr<endpoint_definition> > endpoint_definition::definitions_; std::mutex endpoint_definition::definitions_mutex_; std::shared_ptr<endpoint_definition> endpoint_definition::get(const boost::asio::ip::address &_address, uint16_t _port, bool _is_reliable, service_t _service, instance_t _instance) { + auto key = std::make_tuple(_service, _instance, _address, _port, _is_reliable); std::lock_guard<std::mutex> its_lock(definitions_mutex_); std::shared_ptr<endpoint_definition> its_result; - auto find_service = definitions_.find(_service); - if( find_service != definitions_.end()) { - auto find_instance = find_service->second.find(_instance); - if (find_instance != find_service->second.end()) { - auto find_address = find_instance->second.find(_address); - if (find_address != find_instance->second.end()) { - auto find_port = find_address->second.find(_port); - if (find_port != find_address->second.end()) { - auto found_reliable = find_port->second.find(_is_reliable); - if (found_reliable != find_port->second.end()) { - its_result = found_reliable->second; - } - } - } - } + auto found_endpoint = definitions_.find(key); + if (found_endpoint != definitions_.end()) { + its_result = found_endpoint->second; } if (!its_result) { - its_result = std::make_shared<endpoint_definition>( - _address, _port, _is_reliable); - definitions_[_service][_instance][_address][_port][_is_reliable] = its_result; + its_result = std::make_shared<endpoint_definition>( + _address, _port, _is_reliable); + definitions_[key] = its_result; } + return its_result; } diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index 4664fdb..29d861f 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -171,8 +171,8 @@ void local_client_endpoint_impl::receive() { }
void local_client_endpoint_impl::send_queued() {
- static byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 };
- static byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 };
+ static const byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 };
+ static const byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 };
std::vector<boost::asio::const_buffer> bufs;
message_buffer_ptr_t its_buffer;
@@ -290,4 +290,33 @@ std::string local_client_endpoint_impl::get_remote_information() const { #endif
}
+bool local_client_endpoint_impl::send(const std::vector<byte_t>& _cmd_header,
+ const byte_t *_data, uint32_t _size,
+ bool _flush) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ bool ret(true);
+ const bool queue_size_zero_on_entry(queue_.empty());
+
+ if (endpoint_impl::sending_blocked_ ||
+ !check_message_size(static_cast<std::uint32_t>(_cmd_header.size() + _size)) ||
+ !check_packetizer_space(static_cast<std::uint32_t>(_cmd_header.size() + _size))||
+ !check_queue_limit(_data, static_cast<std::uint32_t>(_cmd_header.size() + _size))) {
+ ret = false;
+ } else {
+#if 0
+ std::stringstream msg;
+ msg << "lce::send: ";
+ for (uint32_t i = 0; i < _size; i++)
+ msg << std::hex << std::setw(2) << std::setfill('0')
+ << (int)_data[i] << " ";
+ VSOMEIP_INFO << msg.str();
+#endif
+ packetizer_->reserve(_cmd_header.size() + _size);
+ packetizer_->insert(packetizer_->end(), _cmd_header.begin(), _cmd_header.end());
+ packetizer_->insert(packetizer_->end(), _data, _data + _size);
+ send_or_start_flush_timer(_flush, queue_size_zero_on_entry);
+ }
+ return ret;
+}
+
} // namespace vsomeip
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp index 005c203..6da5826 100644 --- a/implementation/endpoints/src/server_endpoint_impl.cpp +++ b/implementation/endpoints/src/server_endpoint_impl.cpp @@ -116,6 +116,17 @@ template<typename Protocol>
bool server_endpoint_impl<Protocol>::send(const uint }
template<typename Protocol>
+bool server_endpoint_impl<Protocol>::send(
+ const std::vector<byte_t>& _cmd_header, const byte_t *_data,
+ uint32_t _size, bool _flush) {
+ (void) _cmd_header;
+ (void) _data;
+ (void) _size;
+ (void) _flush;
+ return false;
+}
+
+template<typename Protocol>
bool server_endpoint_impl<Protocol>::send_intern(
endpoint_type _target, const byte_t *_data, uint32_t _size,
bool _flush) {
diff --git a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp index 5dbd69a..30862df 100644 --- a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp @@ -40,6 +40,16 @@ bool virtual_server_endpoint_impl::send(const byte_t *_data, uint32_t _size, return false; } +bool virtual_server_endpoint_impl::send(const std::vector<byte_t>& _cmd_header, + const byte_t *_data, uint32_t _size, + bool _flush) { + (void)_cmd_header; + (void)_data; + (void)_size; + (void)_flush; + return false; +} + bool virtual_server_endpoint_impl::send_to( const std::shared_ptr<endpoint_definition> _target, const byte_t *_data, uint32_t _size, bool _flush) { diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp index ac483ab..674f41b 100644 --- a/implementation/routing/include/event.hpp +++ b/implementation/routing/include/event.hpp @@ -85,7 +85,7 @@ public: void notify_one(const std::shared_ptr<endpoint_definition> &_target, bool _flush); void notify_one(client_t _client, bool _flush); - bool add_subscriber(eventgroup_t _eventgroup, client_t _client); + bool add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _force); void remove_subscriber(eventgroup_t _eventgroup, client_t _client); bool has_subscriber(eventgroup_t _eventgroup, client_t _client); std::set<client_t> get_subscribers(); diff --git a/implementation/routing/include/routing_manager_host.hpp b/implementation/routing/include/routing_manager_host.hpp index c9f8841..7aa767a 100644 --- a/implementation/routing/include/routing_manager_host.hpp +++ b/implementation/routing/include/routing_manager_host.hpp @@ -40,6 +40,7 @@ public: eventgroup_t _eventgroup, event_t _event, uint16_t _error) = 0; virtual void send(std::shared_ptr<message> _message, bool _flush) = 0; virtual void on_offered_services_info(std::vector<std::pair<service_t, instance_t>> &_services) = 0; + virtual bool is_routing() const = 0; }; } // namespace vsomeip diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp index ad228de..13cd36e 100644 --- a/implementation/routing/src/event.cpp +++ b/implementation/routing/src/event.cpp @@ -3,6 +3,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. +#include <iomanip> + #include <vsomeip/constants.hpp> #include <vsomeip/defines.hpp> #include <vsomeip/message.hpp> @@ -334,9 +336,21 @@ bool event::has_ref() { return refs_.size() != 0; } -bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client) { +bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _force) { std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - return eventgroups_[_eventgroup].insert(_client).second; + bool ret = false; + if (_force // remote events managed by rm_impl + || is_provided_ // events provided by rm_proxies + || is_shadow_ // local events managed by rm_impl + || is_cache_placeholder_) { + ret = eventgroups_[_eventgroup].insert(_client).second; + } else { + VSOMEIP_WARNING << __func__ << ": Didnt' insert client " + << std::hex << std::setw(4) << std::setfill('0') << _client + << "to eventgroup 0x" + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup; + } + return ret; } void event::remove_subscriber(eventgroup_t _eventgroup, client_t _client) { diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp index a4eb888..77bb228 100644 --- a/implementation/routing/src/eventgroupinfo.cpp +++ b/implementation/routing/src/eventgroupinfo.cpp @@ -236,6 +236,7 @@ pending_subscription_id_t eventgroupinfo::add_pending_subscription( if (++subscription_id_ == DEFAULT_SUBSCRIPTION) { subscription_id_++; } + _pending_subscription.pending_subscription_id_ = subscription_id_; pending_subscriptions_[subscription_id_] = _pending_subscription; const auto remote_address_port = std::make_tuple( @@ -278,10 +279,9 @@ std::vector<pending_subscription_t> eventgroupinfo::remove_pending_subscription( pending_subscriptions_.erase(found_pending_subscription); found_remote->second.erase(found_remote->second.begin()); - if (removed_is_subscribe) { // only subscriptions must be acked via SD - its_pending_sub.pending_subscription_id_ = _subscription_id; - its_pending_subscriptions.push_back(its_pending_sub); - } + // return removed (un)subscription as first element + its_pending_subscriptions.push_back(its_pending_sub); + // retrieve all pending (un)subscriptions which arrived during // the time the rm_proxy answered the currently processed subscription for (auto iter = found_remote->second.begin(); @@ -291,7 +291,6 @@ std::vector<pending_subscription_t> eventgroupinfo::remove_pending_subscription( const bool queued_is_subscribe = (other_pen_sub->second.ttl_ > 0); if (removed_is_subscribe) { its_pending_subscriptions.push_back(other_pen_sub->second); - its_pending_subscriptions.back().pending_subscription_id_ = *iter; if (!queued_is_subscribe) { // unsubscribe was queued and needs to be sent to // rm_proxy first before continuing processing @@ -307,7 +306,6 @@ std::vector<pending_subscription_t> eventgroupinfo::remove_pending_subscription( // rm_proxy first before continuing processing // following queued (un)subscriptions its_pending_subscriptions.push_back(other_pen_sub->second); - its_pending_subscriptions.back().pending_subscription_id_ = *iter; break; } else { // further queued unsubscriptions can be ignored diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index 9296f99..62ae6f2 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -356,7 +356,7 @@ void routing_manager_base::register_event(client_t _client, service_t _service, std::set<client_t> its_any_event_subscribers = its_any_event->get_subscribers(eventgroup); for (const client_t subscriber : its_any_event_subscribers) { - its_event->add_subscriber(eventgroup, subscriber); + its_event->add_subscriber(eventgroup, subscriber, true); } } } @@ -998,36 +998,29 @@ bool routing_manager_base::send_local( std::shared_ptr<endpoint>& _target, client_t _client, const byte_t *_data, uint32_t _size, instance_t _instance, bool _flush, bool _reliable, uint8_t _command, bool _is_valid_crc) const { - std::size_t its_complete_size = _size + sizeof(instance_t) - + sizeof(bool) + sizeof(bool) + sizeof(bool); - client_t sender = get_client(); - if (_command == VSOMEIP_NOTIFY_ONE) { - its_complete_size +=sizeof(client_t); - } - std::vector<byte_t> its_command( - VSOMEIP_COMMAND_HEADER_SIZE + its_complete_size); - its_command[VSOMEIP_COMMAND_TYPE_POS] = _command; - std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &sender, - sizeof(client_t)); - std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_complete_size, - sizeof(_size)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], _data, - _size); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size], + const std::size_t its_complete_size = VSOMEIP_SEND_COMMAND_SIZE + - VSOMEIP_COMMAND_HEADER_SIZE + _size; + const client_t sender = get_client(); + + std::vector<byte_t> its_command_header(VSOMEIP_SEND_COMMAND_SIZE); + its_command_header[VSOMEIP_COMMAND_TYPE_POS] = _command; + std::memcpy(&its_command_header[VSOMEIP_COMMAND_CLIENT_POS], + &sender, sizeof(client_t)); + std::memcpy(&its_command_header[VSOMEIP_COMMAND_SIZE_POS_MIN], + &its_complete_size, sizeof(_size)); + std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN], &_instance, sizeof(instance_t)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size - + sizeof(instance_t)], &_flush, sizeof(bool)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size - + sizeof(instance_t) + sizeof(bool)], &_reliable, sizeof(bool)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size - + sizeof(instance_t) + sizeof(bool) + sizeof(bool)], &_is_valid_crc, sizeof(bool)); - if (_command == VSOMEIP_NOTIFY_ONE) { - // Add target client - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size - + sizeof(instance_t) + sizeof(bool) + sizeof(bool) + sizeof(bool)], &_client, sizeof(client_t)); - } - - return _target->send(&its_command[0], uint32_t(its_command.size())); + std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_FLUSH_POS], + &_flush, sizeof(bool)); + std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_RELIABLE_POS], + &_reliable, sizeof(bool)); + std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_VALID_CRC_POS], + &_is_valid_crc, sizeof(bool)); + // Add target client, only relevant for selective notifications + std::memcpy(&its_command_header[VSOMEIP_SEND_COMMAND_DST_CLIENT_POS_MIN], + &_client, sizeof(client_t)); + + return _target->send(its_command_header, _data, _size); } bool routing_manager_base::insert_subscription( @@ -1037,7 +1030,8 @@ bool routing_manager_base::insert_subscription( if (_event != ANY_EVENT) { // subscribe to specific event std::shared_ptr<event> its_event = find_event(_service, _instance, _event); if (its_event) { - is_inserted = its_event->add_subscriber(_eventgroup, _client); + is_inserted = its_event->add_subscriber(_eventgroup, _client, + host_->is_routing()); } else { VSOMEIP_WARNING << "routing_manager_base::insert_subscription(" << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" @@ -1067,7 +1061,8 @@ bool routing_manager_base::insert_subscription( // eventgroups _already_subscribed_events->insert(e->get_event()); } - is_inserted = e->add_subscriber(_eventgroup, _client) || is_inserted; + is_inserted = e->add_subscriber(_eventgroup, _client, + host_->is_routing()) || is_inserted; } } } else { diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 733d75b..faf5282 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -1588,15 +1588,17 @@ bool routing_manager_impl::deliver_notification( bool cache_event = false; for (const auto eg : its_event->get_eventgroups()) { std::shared_ptr<eventgroupinfo> egi = find_eventgroup(_service, _instance, eg); - for (const auto &e : egi->get_events()) { - cache_event = (e->get_subscribers().size() > 0); + if (egi) { + for (const auto &e : egi->get_events()) { + cache_event = (e->get_subscribers().size() > 0); + if (cache_event) { + break; + } + } if (cache_event) { break; } } - if (cache_event) { - break; - } } if (!cache_event) { return true; // as there is nothing to do @@ -4322,7 +4324,7 @@ bool routing_manager_impl::create_placeholder_event_and_subscribe( std::shared_ptr<event> its_event = find_event(_service, _instance, _event); if (its_event) { - is_inserted = its_event->add_subscriber(_eventgroup, _client); + is_inserted = its_event->add_subscriber(_eventgroup, _client, false); } return is_inserted; } @@ -4586,12 +4588,27 @@ void routing_manager_impl::on_unsubscribe_ack(client_t _client, service_t _servi std::vector<pending_subscription_t> its_pending_subscriptions = its_eventgroup->remove_pending_subscription(_unsubscription_id); for (const pending_subscription_t& its_sd_message_id : its_pending_subscriptions) { - const pending_subscription_id_t its_subscription_id = its_sd_message_id.pending_subscription_id_; - const client_t its_subscribing_client = its_sd_message_id.subscribing_client_; - const client_t its_offering_client = find_local_client(_service, _instance); - send_subscription(its_offering_client, its_subscribing_client, _service, - _instance, _eventgroup, its_eventgroup->get_major(), - its_subscription_id); + if (its_sd_message_id.pending_subscription_id_ == _unsubscription_id) { + its_eventgroup->remove_target(its_sd_message_id.target_); + clear_remote_subscriber(_service, _instance, + its_sd_message_id.subscribing_client_, + its_sd_message_id.target_); + if (its_eventgroup->get_targets().size() == 0) { + for (auto e : its_eventgroup->get_events()) { + if (e->is_shadow()) { + e->unset_payload(); + } + } + } + } else { + const pending_subscription_id_t its_subscription_id = + its_sd_message_id.pending_subscription_id_; + const client_t its_subscribing_client = its_sd_message_id.subscribing_client_; + const client_t its_offering_client = find_local_client(_service, _instance); + send_subscription(its_offering_client, its_subscribing_client, _service, + _instance, _eventgroup, its_eventgroup->get_major(), + its_subscription_id); + } } } diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index 6d1dbca..8d48b25 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -878,23 +878,21 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, switch (its_command) { case VSOMEIP_SEND: { instance_t its_instance; - std::memcpy(&its_instance, - &_data[_size - sizeof(instance_t) - sizeof(bool) - - sizeof(bool) - sizeof(bool)], sizeof(instance_t)); bool its_reliable; - std::memcpy(&its_reliable, &_data[_size - sizeof(bool) - sizeof(bool)], - sizeof(its_reliable)); bool its_is_vslid_crc; - std::memcpy(&its_is_vslid_crc, &_data[_size - sizeof(bool)], - sizeof(its_is_vslid_crc)); - - // reduce by size of instance, flush, reliable and is_valid_crc flag + std::memcpy(&its_instance,&_data[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN], + sizeof(instance_t)); + std::memcpy(&its_reliable, &_data[VSOMEIP_SEND_COMMAND_RELIABLE_POS], + sizeof(its_reliable)); + std::memcpy(&its_is_vslid_crc, &_data[VSOMEIP_SEND_COMMAND_VALID_CRC_POS], + sizeof(its_is_vslid_crc)); + + // reduce by size of instance, flush, reliable, client and is_valid_crc flag const std::uint32_t its_message_size = its_length - - static_cast<uint32_t>(sizeof(its_instance) - + sizeof(bool) + sizeof(bool) + sizeof(bool)); + (VSOMEIP_SEND_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE); auto a_deserializer = get_deserializer(); - a_deserializer->set_data(&_data[VSOMEIP_COMMAND_PAYLOAD_POS], + a_deserializer->set_data(&_data[VSOMEIP_SEND_COMMAND_PAYLOAD_POS], its_message_size); std::shared_ptr<message> its_message(a_deserializer->deserialize_message()); a_deserializer->reset(); @@ -1964,7 +1962,7 @@ bool routing_manager_proxy::create_placeholder_event_and_subscribe( true); std::shared_ptr<event> its_event = find_event(_service, _instance, _event); if (its_event) { - is_inserted = its_event->add_subscriber(_eventgroup, _client); + is_inserted = its_event->add_subscriber(_eventgroup, _client, false); } return is_inserted; } diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp index ffc8b09..528c729 100644 --- a/implementation/routing/src/routing_manager_stub.cpp +++ b/implementation/routing/src/routing_manager_stub.cpp @@ -422,18 +422,19 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]"; break; case VSOMEIP_SEND: { - its_data = &_data[VSOMEIP_COMMAND_PAYLOAD_POS]; + its_data = &_data[VSOMEIP_SEND_COMMAND_PAYLOAD_POS]; its_service = VSOMEIP_BYTES_TO_WORD( its_data[VSOMEIP_SERVICE_POS_MIN], its_data[VSOMEIP_SERVICE_POS_MAX]); its_client_from_header = VSOMEIP_BYTES_TO_WORD( its_data[VSOMEIP_CLIENT_POS_MIN], its_data[VSOMEIP_CLIENT_POS_MAX]); - std::memcpy(&its_instance, &_data[_size - sizeof(instance_t) - - sizeof(bool) - sizeof(bool) - sizeof(bool)], sizeof(its_instance)); - std::memcpy(&its_reliable, &_data[_size - sizeof(bool) - sizeof(bool)], sizeof(its_reliable)); - - std::memcpy(&its_is_valid_crc, &_data[_size - sizeof(bool)], sizeof(its_is_valid_crc)); + std::memcpy(&its_instance, &_data[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN], + sizeof(its_instance)); + std::memcpy(&its_reliable, &_data[VSOMEIP_SEND_COMMAND_RELIABLE_POS], + sizeof(its_reliable)); + std::memcpy(&its_is_valid_crc, &_data[VSOMEIP_SEND_COMMAND_VALID_CRC_POS], + sizeof(its_is_valid_crc)); if (utility::is_request(its_data[VSOMEIP_MESSAGE_TYPE_POS])) { if (!configuration_->is_client_allowed(its_client_from_header, @@ -456,40 +457,37 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, return; } } - // reduce by size of instance, flush, reliable and is_valid_crc flag + // reduce by size of instance, flush, reliable, client and is_valid_crc flag const std::uint32_t its_message_size = its_size - - static_cast<std::uint32_t>(sizeof(its_instance) - + sizeof(bool) + sizeof(bool) + sizeof(bool)); + (VSOMEIP_SEND_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE); host_->on_message(its_service, its_instance, its_data, its_message_size, its_reliable, its_is_valid_crc); break; } case VSOMEIP_NOTIFY: { - its_data = &_data[VSOMEIP_COMMAND_PAYLOAD_POS]; + its_data = &_data[VSOMEIP_SEND_COMMAND_PAYLOAD_POS]; its_service = VSOMEIP_BYTES_TO_WORD( its_data[VSOMEIP_SERVICE_POS_MIN], its_data[VSOMEIP_SERVICE_POS_MAX]); - std::memcpy(&its_instance, &_data[_size - sizeof(instance_t) - - sizeof(bool) - sizeof(bool) - sizeof(bool)], sizeof(its_instance)); - // reduce by size of instance, flush, reliable and is_valid_crc flag + std::memcpy(&its_instance, &_data[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN], + sizeof(its_instance)); + // reduce by size of instance, flush, reliable, is_valid_crc flag and target client const std::uint32_t its_message_size = its_size - - static_cast<uint32_t>(sizeof(its_instance) - + sizeof(bool) + sizeof(bool)); + (VSOMEIP_SEND_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE); host_->on_notification(VSOMEIP_ROUTING_CLIENT, its_service, its_instance, its_data, its_message_size); break; } case VSOMEIP_NOTIFY_ONE: { - its_data = &_data[VSOMEIP_COMMAND_PAYLOAD_POS]; + its_data = &_data[VSOMEIP_SEND_COMMAND_PAYLOAD_POS]; its_service = VSOMEIP_BYTES_TO_WORD( its_data[VSOMEIP_SERVICE_POS_MIN], its_data[VSOMEIP_SERVICE_POS_MAX]); - std::memcpy(&its_instance, &_data[_size - sizeof(instance_t) - - sizeof(bool) - sizeof(bool) - sizeof(bool) - sizeof(client_t)], - sizeof(its_instance)); - std::memcpy(&its_target_client, &_data[_size - sizeof(client_t)], sizeof(client_t)); + std::memcpy(&its_instance, &_data[VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN], + sizeof(its_instance)); + std::memcpy(&its_target_client, &_data[VSOMEIP_SEND_COMMAND_DST_CLIENT_POS_MIN], + sizeof(client_t)); // reduce by size of instance, flush, reliable flag, is_valid_crc and target client const std::uint32_t its_message_size = its_size - - static_cast<uint32_t>(sizeof(its_instance) - + sizeof(bool) + sizeof(bool) + sizeof(client_t)); + (VSOMEIP_SEND_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE); host_->on_notification(its_target_client, its_service, its_instance, its_data, its_message_size, true); break; @@ -742,6 +740,14 @@ void routing_manager_stub::on_deregister_application(client_t _client) { } void routing_manager_stub::client_registration_func(void) { +#ifndef _WIN32 + { + std::stringstream s; + s << std::hex << std::setw(4) << std::setfill('0') + << host_->get_client() << "_client_reg"; + pthread_setname_np(pthread_self(),s.str().c_str()); + } +#endif std::unique_lock<std::mutex> its_lock(client_registration_mutex_); while (client_registration_running_) { while (!pending_client_registrations_.size() && client_registration_running_) { diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index 5d4a80b..219c8f0 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -11,6 +11,7 @@ #ifndef _WIN32 #include <dlfcn.h> +#include <sys/syscall.h> #endif #include <vsomeip/defines.hpp> @@ -302,6 +303,15 @@ bool application_impl::init() { } void application_impl::start() { +#ifndef _WIN32 + if (getpid() != static_cast<pid_t>(syscall(SYS_gettid))) { + // only set threadname if calling thread isn't the main thread + std::stringstream s; + s << std::hex << std::setw(4) << std::setfill('0') << client_ + << "_io" << std::setw(2) << std::setfill('0') << 0; + pthread_setname_np(pthread_self(),s.str().c_str()); + } +#endif const size_t io_thread_count = configuration_->get_io_thread_count(name_); { std::lock_guard<std::mutex> its_lock(start_stop_mutex_); @@ -352,7 +362,20 @@ void application_impl::start() { VSOMEIP_INFO << "io thread id from application: " << std::hex << std::setw(4) << std::setfill('0') << client_ << " (" << name_ << ") is: " << std::hex - << std::this_thread::get_id(); + << std::this_thread::get_id() + #ifndef _WIN32 + << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid)) + #endif + ; + #ifndef _WIN32 + { + std::stringstream s; + s << std::hex << std::setw(4) << std::setfill('0') + << client_ << "_io" << std::setw(2) + << std::setfill('0') << i+1; + pthread_setname_np(pthread_self(),s.str().c_str()); + } + #endif try { io_.run(); #ifndef _WIN32 @@ -388,7 +411,11 @@ void application_impl::start() { app_counter_mutex__.unlock(); VSOMEIP_INFO << "io thread id from application: " << std::hex << std::setw(4) << std::setfill('0') << client_ << " (" - << name_ << ") is: " << std::hex << std::this_thread::get_id(); + << name_ << ") is: " << std::hex << std::this_thread::get_id() +#ifndef _WIN32 + << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid)) +#endif + ; try { io_.run(); #ifndef _WIN32 @@ -1554,10 +1581,22 @@ routing_manager * application_impl::get_routing_manager() const { } void application_impl::main_dispatch() { +#ifndef _WIN32 + { + std::stringstream s; + s << std::hex << std::setw(4) << std::setfill('0') + << client_ << "_m_dispatch"; + pthread_setname_np(pthread_self(),s.str().c_str()); + } +#endif const std::thread::id its_id = std::this_thread::get_id(); VSOMEIP_INFO << "main dispatch thread id from application: " << std::hex << std::setw(4) << std::setfill('0') << client_ << " (" - << name_ << ") is: " << std::hex << its_id; + << name_ << ") is: " << std::hex << its_id +#ifndef _WIN32 + << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid)) +#endif + ; std::unique_lock<std::mutex> its_lock(handlers_mutex_); while (is_dispatching_) { if (handlers_.empty() || !is_active_dispatcher(its_id)) { @@ -1595,15 +1634,28 @@ void application_impl::main_dispatch() { } void application_impl::dispatch() { +#ifndef _WIN32 + { + std::stringstream s; + s << std::hex << std::setw(4) << std::setfill('0') + << client_ << "_dispatch"; + pthread_setname_np(pthread_self(),s.str().c_str()); + } +#endif const std::thread::id its_id = std::this_thread::get_id(); VSOMEIP_INFO << "dispatch thread id from application: " << std::hex << std::setw(4) << std::setfill('0') << client_ << " (" - << name_ << ") is: " << std::hex << its_id; + << name_ << ") is: " << std::hex << its_id +#ifndef _WIN32 + << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid)) +#endif + ; while (is_active_dispatcher(its_id)) { std::unique_lock<std::mutex> its_lock(handlers_mutex_); if (is_dispatching_ && handlers_.empty()) { dispatcher_condition_.wait(its_lock); - if (handlers_.empty()) { // Maybe woken up from main dispatcher + // Maybe woken up from main dispatcher + if (handlers_.empty() && !is_active_dispatcher(its_id)) { if (!is_dispatching_) { return; } @@ -1860,9 +1912,19 @@ void application_impl::clear_all_handler() { void application_impl::shutdown() { VSOMEIP_INFO << "shutdown thread id from application: " << std::hex << std::setw(4) << std::setfill('0') << client_ << " (" - << name_ << ") is: " << std::hex << std::this_thread::get_id(); + << name_ << ") is: " << std::hex << std::this_thread::get_id() +#ifndef _WIN32 + << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid)) +#endif + ; #ifndef _WIN32 boost::asio::detail::posix_signal_blocker blocker; + { + std::stringstream s; + s << std::hex << std::setw(4) << std::setfill('0') + << client_ << "_shutdown"; + pthread_setname_np(pthread_self(),s.str().c_str()); + } #endif { diff --git a/implementation/service_discovery/src/message_impl.cpp b/implementation/service_discovery/src/message_impl.cpp index 98ed9de..9d5c1ac 100755 --- a/implementation/service_discovery/src/message_impl.cpp +++ b/implementation/service_discovery/src/message_impl.cpp @@ -239,30 +239,16 @@ bool message_impl::deserialize(vsomeip::deserializer *_from) { while (is_successful && _from->get_remaining()) {
std::shared_ptr < entry_impl > its_entry(deserialize_entry(_from));
if (its_entry) {
- if (its_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP) {
- bool is_unique(true);
- for (const auto& e : entries_) {
- if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP &&
- *(static_cast<eventgroupentry_impl*>(e.get())) ==
- *(static_cast<eventgroupentry_impl*>(its_entry.get()))) {
- is_unique = false;
- break;
- }
- }
- if (is_unique) {
- entries_.push_back(its_entry);
- if (its_entry->get_ttl() > 0) {
- const std::uint8_t num_options =
- static_cast<std::uint8_t>(
- its_entry->get_num_options(1) +
- its_entry->get_num_options(2));
- number_required_acks_ =
- static_cast<std::uint8_t>(number_required_acks_
- + num_options);
- }
- }
- } else {
- entries_.push_back(its_entry);
+ entries_.push_back(its_entry);
+ if (its_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP
+ && its_entry->get_ttl() > 0) {
+ const std::uint8_t num_options =
+ static_cast<std::uint8_t>(
+ its_entry->get_num_options(1) +
+ its_entry->get_num_options(2));
+ number_required_acks_ =
+ static_cast<std::uint8_t>(number_required_acks_
+ + num_options);
}
} else {
is_successful = false;
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 1ddd80d..d6e61bd 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -2864,6 +2864,10 @@ if(NOT ${TESTS_BAT}) COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_PENDING_SUBSCRIPTION_MASTER_STARTER} SUBSCRIBE_RESUBSCRIBE_MIXED) set_tests_properties(${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_resubscribe_mixed PROPERTIES TIMEOUT 180) + add_test(NAME ${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_stopsubscribe_subscribe + COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_PENDING_SUBSCRIPTION_MASTER_STARTER} SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE) + set_tests_properties(${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_stopsubscribe_subscribe PROPERTIES TIMEOUT 180) + # malicious data test add_test(NAME ${TEST_MALICIOUS_DATA_NAME} COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_MALICIOUS_DATA_MASTER_STARTER}) diff --git a/test/big_payload_tests/big_payload_test_client.cpp b/test/big_payload_tests/big_payload_test_client.cpp index 9b50e79..23a7241 100644 --- a/test/big_payload_tests/big_payload_test_client.cpp +++ b/test/big_payload_tests/big_payload_test_client.cpp @@ -208,7 +208,7 @@ void big_payload_test_client::run() || test_mode_ == big_payload_test::test_mode::QUEUE_LIMITED_SPECIFIC) { if (i % 2) { // try to sent a too big payload for half of the messages - its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 3, + its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 1, big_payload_test::DATA_CLIENT_TO_SERVICE); } else { its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE, diff --git a/test/big_payload_tests/big_payload_test_local_queue_limited.json b/test/big_payload_tests/big_payload_test_local_queue_limited.json index 7252680..eac38b2 100644 --- a/test/big_payload_tests/big_payload_test_local_queue_limited.json +++ b/test/big_payload_tests/big_payload_test_local_queue_limited.json @@ -29,7 +29,7 @@ "instance":"0x5678" } ], - "endpoint-queue-limit-local" : "614428", + "endpoint-queue-limit-local" : "614430", "routing":"big_payload_test_service", "service-discovery": { diff --git a/test/big_payload_tests/big_payload_test_service.cpp b/test/big_payload_tests/big_payload_test_service.cpp index 4031550..a6868cf 100644 --- a/test/big_payload_tests/big_payload_test_service.cpp +++ b/test/big_payload_tests/big_payload_test_service.cpp @@ -162,7 +162,7 @@ void big_payload_test_service::on_message(const std::shared_ptr<vsomeip::message // this way the client will only get replies for a fourth of his sent // requests as he tries to sent to big data for every second request // as well - its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 3, + its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE + 1, big_payload_test::DATA_SERVICE_TO_CLIENT); } else { its_payload_data.assign(big_payload_test::BIG_PAYLOAD_SIZE, diff --git a/test/configuration_tests/configuration-test.cpp b/test/configuration_tests/configuration-test.cpp index f9717c5..bdf66e0 100644 --- a/test/configuration_tests/configuration-test.cpp +++ b/test/configuration_tests/configuration-test.cpp @@ -463,9 +463,7 @@ void check_file(const std::string &_config_file, // use 17000 instead of 1500 as configured max-local-payload size will be // increased to bigger max-reliable-payload-size std::uint32_t max_local_message_size( - 17000u + 16u + + VSOMEIP_COMMAND_HEADER_SIZE - + sizeof(vsomeip::instance_t) + sizeof(bool) + sizeof(bool) - + sizeof(vsomeip::client_t)); + 17000u + 16u + + VSOMEIP_SEND_COMMAND_SIZE); EXPECT_EQ(max_local_message_size, its_configuration->get_max_message_size_local()); EXPECT_EQ(11u, its_configuration->get_buffer_shrink_threshold()); EXPECT_EQ(14999u + 16u, its_configuration->get_max_message_size_reliable("10.10.10.10", 7777)); diff --git a/test/malicious_data_tests/malicious_data_test_msg_sender.cpp b/test/malicious_data_tests/malicious_data_test_msg_sender.cpp index ad856b8..27a62c2 100644 --- a/test/malicious_data_tests/malicious_data_test_msg_sender.cpp +++ b/test/malicious_data_tests/malicious_data_test_msg_sender.cpp @@ -266,6 +266,11 @@ TEST_F(malicious_data, send_malicious_events) send_thread.join(); receive_thread.join(); + udp_socket.shutdown(boost::asio::socket_base::shutdown_both); + tcp_socket.shutdown(boost::asio::socket_base::shutdown_both); + udp_socket.close(); + tcp_socket.close(); + } #ifndef _WIN32 diff --git a/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in b/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in index beb5db2..2d9311c 100755 --- a/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in +++ b/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in @@ -17,7 +17,7 @@ then echo "Please pass a test mode to this script." echo "For example: $0 SUSCRIBE" echo "Valid subscription types include:" - echo " [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT, SUBSCRIBE_RESUBSCRIBE_MIXED]" + echo " [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT, SUBSCRIBE_RESUBSCRIBE_MIXED, SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE]" exit 1 fi TESTMODE=$1 diff --git a/test/pending_subscription_tests/pending_subscription_test_globals.hpp b/test/pending_subscription_tests/pending_subscription_test_globals.hpp index ade1d6f..7fa5480 100644 --- a/test/pending_subscription_tests/pending_subscription_test_globals.hpp +++ b/test/pending_subscription_tests/pending_subscription_test_globals.hpp @@ -26,7 +26,8 @@ enum test_mode_e { UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT, - SUBSCRIBE_RESUBSCRIBE_MIXED + SUBSCRIBE_RESUBSCRIBE_MIXED, + SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE }; } diff --git a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp index dd7ed5a..d8108a5 100644 --- a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp +++ b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp @@ -57,6 +57,8 @@ TEST_F(pending_subscription, send_multiple_subscriptions) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ std::atomic<bool> keep_receiving(true); std::function<void()> receive; @@ -213,6 +215,9 @@ TEST_F(pending_subscription, send_multiple_subscriptions) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); } TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe) @@ -221,6 +226,8 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ const std::uint32_t expected_acks(8); std::atomic<std::uint32_t> acks_received(0); @@ -404,6 +411,9 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); } TEST_F(pending_subscription, send_multiple_unsubscriptions) @@ -412,6 +422,8 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ const std::uint32_t expected_acks(2); std::atomic<std::uint32_t> acks_received(0); @@ -595,6 +607,9 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); } TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe) @@ -603,6 +618,8 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ const std::uint32_t expected_acks(8); std::atomic<std::uint32_t> acks_received(0); @@ -797,6 +814,9 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); } TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port) @@ -805,8 +825,11 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port) std::promise<void> tcp_connected; boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); boost::asio::ip::tcp::socket tcp_socket(io_, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 30490)); + tcp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ const std::uint32_t expected_acks(8); std::atomic<std::uint32_t> acks_received(0); @@ -1004,6 +1027,11 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + tcp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + tcp_socket.close(ec); + udp_socket.close(ec); } /* @@ -1018,6 +1046,8 @@ TEST_F(pending_subscription, subscribe_resubscribe_mixed) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ std::atomic<bool> keep_receiving(true); std::function<void()> receive; @@ -1201,6 +1231,213 @@ TEST_F(pending_subscription, subscribe_resubscribe_mixed) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); +} + +/* + * @test Send a SD message containing a Subscription followed by a StopSubscribe + * Subscribe entry to the same service. Check to receive an initial event + */ +TEST_F(pending_subscription, send_subscribe_stop_subscribe_subscribe) +{ + std::promise<bool> trigger_notifications; + std::promise<void> tcp_connected; + boost::asio::ip::udp::socket udp_socket(io_, + boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + boost::asio::ip::tcp::socket tcp_socket(io_, + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 30490)); + tcp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + + std::thread receive_thread([&](){ + const std::uint32_t expected_acks(1); + std::atomic<std::uint32_t> acks_received(0); + + const std::uint32_t expected_responses(1); + std::atomic<std::uint32_t> responses_received(0); + + const std::uint32_t expected_notifications(1); + std::atomic<std::uint32_t> notifications_received(0); + + bool triggered_notifications(false); + + std::function<void()> receive; + std::vector<std::uint8_t> receive_buffer(4096); + std::vector<vsomeip::event_t> its_received_events; + + boost::system::error_code ec; + tcp_socket.connect(boost::asio::ip::tcp::endpoint( + boost::asio::ip::address::from_string(remote_address), 40001), ec); + ASSERT_EQ(0, ec.value()); + tcp_connected.set_value(); + + const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&]( + const boost::system::error_code& error, std::size_t bytes_transferred) { + if (error) { + acks_received = expected_acks; + responses_received = expected_responses; + ADD_FAILURE() << __func__ << " error: " << error.message(); + return; + } + #if 0 + std::stringstream str; + for (size_t i = 0; i < bytes_transferred; i++) { + str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " "; + } + std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl; + #endif + + vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0); + vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN], + receive_buffer[VSOMEIP_SERVICE_POS_MAX]); + vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN], + receive_buffer[VSOMEIP_METHOD_POS_MAX]); + if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) { + vsomeip::sd::message_impl sd_msg; + EXPECT_TRUE(sd_msg.deserialize(&its_deserializer)); + EXPECT_EQ(1u, sd_msg.get_entries().size()); + for (auto e : sd_msg.get_entries()) { + EXPECT_TRUE(e->is_eventgroup_entry()); + EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type()); + EXPECT_EQ(16u, e->get_ttl()); + EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service()); + EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance()); + if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) { + std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry = + std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e); + EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id); + } + } + EXPECT_EQ(0u, sd_msg.get_options().size()); + acks_received++; + } else { // non-sd-message + vsomeip::message_impl msg; + EXPECT_TRUE(msg.deserialize(&its_deserializer)); + if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) { + EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type()); + EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service()); + EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method()); + EXPECT_EQ(0x2222, msg.get_client()); + responses_received++; + } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) { + its_received_events.push_back(msg.get_method()); + EXPECT_EQ(1u, its_received_events.size()); + EXPECT_EQ(1u, msg.get_payload()->get_length()); + EXPECT_EQ(0xDD, *msg.get_payload()->get_data()); + EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service()); + EXPECT_EQ(0x0, msg.get_client()); + notifications_received++; + } + } + + + if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received + trigger_notifications.set_value(true); + triggered_notifications = true; + } + + if (!error && (acks_received != expected_acks || + responses_received != expected_responses || + notifications_received != expected_notifications)) { + receive(); + } + }; + + receive = [&]() { + udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()), + receive_cbk); + }; + + receive(); + while(acks_received < expected_acks || + responses_received < expected_responses || + notifications_received < expected_notifications) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_EQ(expected_acks, acks_received); + EXPECT_EQ(expected_responses, responses_received); + EXPECT_EQ(expected_notifications, notifications_received); + }); + + std::thread send_thread([&]() { + if (std::future_status::timeout == tcp_connected.get_future().wait_for(std::chrono::seconds(10))) { + ADD_FAILURE() << "Didn't establish tcp connection within time"; + } + + try { + std::uint8_t its_subscribe_message[] = { + 0xff, 0xff, 0x81, 0x00, + 0x00, 0x00, 0x00, 0x50, // length + 0x00, 0x00, 0x00, 0x01, + 0x01, 0x01, 0x02, 0x00, + 0xc0, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x30, // length entries array + 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry + 0x11, 0x22, 0x00, 0x01, // service / instance + 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL + 0x00, 0x00, 0x10, 0x00, // eventgroup + 0x06, 0x00, 0x00, 0x10, // Stop subscribe Eventgroup entry + 0x11, 0x22, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x10, 0x00, + 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry + 0x11, 0x22, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x10, + 0x00, 0x00, 0x10, 0x00, + 0x00, 0x00, 0x00, 0x0c, // length options array + 0x00, 0x09, 0x04, 0x00, + 0xff, 0xff, 0xff, 0xff, // ip address + 0x00, 0x11, 0x77, 0x1a + }; + + boost::asio::ip::address its_local_address = + boost::asio::ip::address::from_string(std::string(local_address)); + std::memcpy(&its_subscribe_message[80], &its_local_address.to_v4().to_bytes()[0], 4); + + boost::asio::ip::udp::socket::endpoint_type target_sd( + boost::asio::ip::address::from_string(std::string(remote_address)), + 30490); + udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd); + + if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) { + ADD_FAILURE() << "Didn't receive all SubscribeAcks within time"; + } else { + // call notify method + std::uint8_t trigger_notifications_call[] = { + 0x11, 0x22, 0x42, 0x42, + 0x00, 0x00, 0x00, 0x08, + 0x22, 0x22, 0x00, 0x01, + 0x01, 0x00, 0x01, 0x00 }; + boost::asio::ip::udp::socket::endpoint_type target_service( + boost::asio::ip::address::from_string(std::string(remote_address)), + 30001); + udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service); + } + + // call shutdown method + std::uint8_t shutdown_call[] = { + 0x11, 0x22, 0x14, 0x04, + 0x00, 0x00, 0x00, 0x08, + 0x22, 0x22, 0x00, 0x01, + 0x01, 0x00, 0x00, 0x00 }; + boost::asio::ip::udp::socket::endpoint_type target_service( + boost::asio::ip::address::from_string(std::string(remote_address)), + 30001); + udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service); + } catch (...) { + ASSERT_FALSE(true); + } + + }); + send_thread.join(); + receive_thread.join(); + boost::system::error_code ec; + tcp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + tcp_socket.close(ec); + udp_socket.close(ec); } #ifndef _WIN32 @@ -1227,6 +1464,8 @@ int main(int argc, char** argv) { ::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_unsubscribe_same_port"; } else if (its_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) { ::testing::GTEST_FLAG(filter) = "*subscribe_resubscribe_mixed"; + } else if (its_testmode == std::string("SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE")) { + ::testing::GTEST_FLAG(filter) = "*send_subscribe_stop_subscribe_subscribe"; } return RUN_ALL_TESTS(); } diff --git a/test/pending_subscription_tests/pending_subscription_test_service.cpp b/test/pending_subscription_tests/pending_subscription_test_service.cpp index 59e7aa9..2c33a44 100644 --- a/test/pending_subscription_tests/pending_subscription_test_service.cpp +++ b/test/pending_subscription_tests/pending_subscription_test_service.cpp @@ -148,6 +148,8 @@ public: ; } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) { ; + } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE) { + ; } std::future<bool> itsFuture = notify_method_called_.get_future(); if (std::future_status::timeout == itsFuture.wait_for(std::chrono::seconds(10))) { @@ -202,7 +204,7 @@ public: std::this_thread::sleep_for(std::chrono::milliseconds(500)); } if (_subscribed) { - _cbk((count_subscribe % 2)); // nack every second subscription + _cbk(((count_subscribe + 1) % 2)); // nack every second subscription } else { _cbk(true); } @@ -227,6 +229,16 @@ public: EXPECT_TRUE(_subscribed); _cbk(true); subscription_accepted_asynchronous_ = true; + } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE) { + static int was_called = 0; + was_called++; + EXPECT_EQ(1, was_called); + EXPECT_TRUE(_subscribed); + subscription_accepted_asynchronous_ = true; + // this test doesn't subscribe to the second eventgroup which is handled by the asynchronous + // subscription handler, set it to true here: + subscription_accepted_synchronous_ = true; + _cbk(true); } } @@ -274,7 +286,7 @@ public: subscription_accepted_synchronous_ = true; } if (_subscribed) { - ret = (count_subscribed % 2); // nack every second subscription + ret = ((count_subscribed + 1) % 2); // nack every second subscription } else { ret = true; } @@ -348,6 +360,8 @@ int main(int argc, char** argv) its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT; } else if (its_pased_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) { its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED; + } else if (its_pased_testmode == std::string("SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE")) { + its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE; } return RUN_ALL_TESTS(); |