summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2018-05-22 02:56:44 -0700
committerJuergen Gehring <juergen.gehring@bmw.de>2018-05-22 02:56:44 -0700
commit8b950ebd7d0d0ed349b7f59255cb1a157ceede3c (patch)
tree11c64da84e8edb13c81b6fa4b45c5ca4163fb79e
parent8826ddae04a88ca20bb8e571e6596ad9718a6077 (diff)
downloadvSomeIP-8b950ebd7d0d0ed349b7f59255cb1a157ceede3c.tar.gz
vsomeip 2.10.182.10.18
-rw-r--r--CHANGES10
-rw-r--r--CMakeLists.txt2
-rw-r--r--implementation/endpoints/include/client_endpoint_impl.hpp7
-rw-r--r--implementation/endpoints/include/endpoint.hpp2
-rw-r--r--implementation/endpoints/include/endpoint_impl.hpp4
-rw-r--r--implementation/endpoints/include/local_client_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/include/server_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/include/tcp_client_endpoint_impl.hpp4
-rw-r--r--implementation/endpoints/include/udp_client_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/include/virtual_server_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp109
-rw-r--r--implementation/endpoints/src/local_client_endpoint_impl.cpp7
-rw-r--r--implementation/endpoints/src/server_endpoint_impl.cpp3
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp139
-rw-r--r--implementation/endpoints/src/udp_client_endpoint_impl.cpp7
-rw-r--r--implementation/endpoints/src/virtual_server_endpoint_impl.cpp4
-rw-r--r--implementation/routing/include/routing_manager_proxy.hpp1
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp8
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp14
-rw-r--r--implementation/runtime/src/application_impl.cpp6
-rwxr-xr-ximplementation/service_discovery/include/message_impl.hpp5
-rw-r--r--implementation/service_discovery/include/service_discovery_impl.hpp6
-rwxr-xr-ximplementation/service_discovery/src/message_impl.cpp11
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp71
-rw-r--r--test/CMakeLists.txt4
-rwxr-xr-xtest/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in2
-rw-r--r--test/pending_subscription_tests/pending_subscription_test_globals.hpp3
-rw-r--r--test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp281
-rw-r--r--test/pending_subscription_tests/pending_subscription_test_service.cpp18
29 files changed, 556 insertions, 180 deletions
diff --git a/CHANGES b/CHANGES
index b73ed27..c65c8ef 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,16 @@
Changes
=======
+v2.10.18
+- Fix restarting of TCP connection on connection reset by the server
+ and mark services reachable through it as unavailable until
+ connection is established again.
+- Fix bug which prevented restarting of TCP connections if the peer
+ instantly send a RST after the connection had been established.
+- Fix bug which could cause missing initial events in conjunction with
+ service discovery messages containing new subscriptions and
+ resubscriptions.
+
v2.10.17
- Speedup initial subscriptions to unreliable remote services
- Fix deadlock in conjunction with configured client ports
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 734930c..5cb9b56 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -8,7 +8,7 @@ project (vsomeip)
set (VSOMEIP_MAJOR_VERSION 2)
set (VSOMEIP_MINOR_VERSION 10)
-set (VSOMEIP_PATCH_VERSION 17)
+set (VSOMEIP_PATCH_VERSION 18)
set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION})
set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in
set (CMAKE_VERBOSE_MAKEFILE off)
diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp
index b72ea7e..2b235d6 100644
--- a/implementation/endpoints/include/client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/client_endpoint_impl.hpp
@@ -47,7 +47,7 @@ public:
bool flush();
virtual void stop();
- virtual void restart() = 0;
+ virtual void restart(bool _force = false) = 0;
bool is_client() const;
@@ -61,7 +61,8 @@ public:
public:
void connect_cbk(boost::system::error_code const &_error);
void wait_connect_cbk(boost::system::error_code const &_error);
- void send_cbk(boost::system::error_code const &_error, std::size_t _bytes);
+ virtual void send_cbk(boost::system::error_code const &_error,
+ std::size_t _bytes, message_buffer_ptr_t _sent_msg);
void flush_cbk(boost::system::error_code const &_error);
public:
@@ -98,7 +99,7 @@ protected:
std::mutex mutex_;
- bool was_not_connected_;
+ std::atomic<bool> was_not_connected_;
std::atomic<std::uint16_t> local_port_;
diff --git a/implementation/endpoints/include/endpoint.hpp b/implementation/endpoints/include/endpoint.hpp
index dc97ffe..8ee3f8f 100644
--- a/implementation/endpoints/include/endpoint.hpp
+++ b/implementation/endpoints/include/endpoint.hpp
@@ -47,7 +47,7 @@ public:
virtual void decrement_use_count() = 0;
virtual uint32_t get_use_count() = 0;
- virtual void restart() = 0;
+ virtual void restart(bool _force = false) = 0;
virtual void register_error_handler(error_handler_t _error) = 0;
diff --git a/implementation/endpoints/include/endpoint_impl.hpp b/implementation/endpoints/include/endpoint_impl.hpp
index 49b9f7f..05a6964 100644
--- a/implementation/endpoints/include/endpoint_impl.hpp
+++ b/implementation/endpoints/include/endpoint_impl.hpp
@@ -60,7 +60,7 @@ public:
// required
virtual bool is_client() const = 0;
virtual void receive() = 0;
- virtual void restart() = 0;
+ virtual void restart(bool _force) = 0;
protected:
uint32_t find_magic_cookie(byte_t *_buffer, size_t _size);
@@ -82,7 +82,7 @@ protected:
uint32_t use_count_;
- bool sending_blocked_;
+ std::atomic<bool> sending_blocked_;
std::mutex local_mutex_;
endpoint_type local_;
diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp
index 1f9620c..59a9eee 100644
--- a/implementation/endpoints/include/local_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp
@@ -47,7 +47,7 @@ public:
bool get_remote_address(boost::asio::ip::address &_address) const;
std::uint16_t get_remote_port() const;
- void restart();
+ void restart(bool _force);
void print_status();
private:
diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp
index be2299f..2b0ad60 100644
--- a/implementation/endpoints/include/server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/server_endpoint_impl.hpp
@@ -37,7 +37,7 @@ public:
virtual ~server_endpoint_impl();
bool is_client() const;
- void restart();
+ void restart(bool _force);
bool is_connected() const;
void set_connected(bool _connected);
bool send(const uint8_t *_data, uint32_t _size, bool _flush);
diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
index be6cf24..cbadf5a 100644
--- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
@@ -31,7 +31,7 @@ public:
virtual ~tcp_client_endpoint_impl();
void start();
- void restart();
+ void restart(bool _force);
bool get_remote_address(boost::asio::ip::address &_address) const;
std::uint16_t get_remote_port() const;
@@ -39,6 +39,8 @@ public:
bool is_local() const;
void print_status();
+ void send_cbk(boost::system::error_code const &_error, std::size_t _bytes,
+ message_buffer_ptr_t _sent_msg);
private:
void send_queued();
bool is_magic_cookie(const message_buffer_ptr_t& _recv_buffer,
diff --git a/implementation/endpoints/include/udp_client_endpoint_impl.hpp b/implementation/endpoints/include/udp_client_endpoint_impl.hpp
index 1805813..1866d05 100644
--- a/implementation/endpoints/include/udp_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/udp_client_endpoint_impl.hpp
@@ -34,7 +34,7 @@ public:
virtual ~udp_client_endpoint_impl();
void start();
- void restart();
+ void restart(bool _force);
void receive_cbk(boost::system::error_code const &_error,
std::size_t _bytes);
diff --git a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
index 303f984..4f4d6d4 100644
--- a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
@@ -50,7 +50,7 @@ public:
void decrement_use_count();
uint32_t get_use_count();
- void restart();
+ void restart(bool _force);
void register_error_handler(error_handler_t _handler);
void print_status();
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 59d4dde..4423270 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -61,7 +61,12 @@ bool client_endpoint_impl<Protocol>::is_connected() const {
template<typename Protocol>
void client_endpoint_impl<Protocol>::set_connected(bool _connected) {
if (_connected) {
- state_ = cei_state_e::ESTABLISHED;
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ if (socket_->is_open()) {
+ state_ = cei_state_e::ESTABLISHED;
+ } else {
+ state_ = cei_state_e::CLOSED;
+ }
} else {
state_ = cei_state_e::CLOSED;
} }
@@ -222,15 +227,15 @@ void client_endpoint_impl<Protocol>::connect_cbk(
if (its_host) {
if (_error && _error != boost::asio::error::already_connected) {
shutdown_and_close_socket(true);
- start_connect_timer();
- // Double the timeout as long as the maximum allowed is larger
- if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
- connect_timeout_ = (connect_timeout_ << 1);
if (state_ != cei_state_e::ESTABLISHED) {
state_ = cei_state_e::CLOSED;
its_host->on_disconnect(this->shared_from_this());
}
+ start_connect_timer();
+ // Double the timeout as long as the maximum allowed is larger
+ if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
+ connect_timeout_ = (connect_timeout_ << 1);
} else {
{
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
@@ -244,11 +249,13 @@ void client_endpoint_impl<Protocol>::connect_cbk(
receive();
- {
+ if (was_not_connected_) {
+ was_not_connected_ = false;
std::lock_guard<std::mutex> its_lock(mutex_);
- if (queue_.size() > 0 && was_not_connected_) {
- was_not_connected_ = false;
+ if (queue_.size() > 0) {
send_queued();
+ VSOMEIP_WARNING << __func__ << ": resume sending to: "
+ << get_remote_information();
}
}
}
@@ -258,14 +265,15 @@ void client_endpoint_impl<Protocol>::connect_cbk(
template<typename Protocol>
void client_endpoint_impl<Protocol>::wait_connect_cbk(
boost::system::error_code const &_error) {
- if (!_error) {
+ if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) {
connect();
}
}
template<typename Protocol>
void client_endpoint_impl<Protocol>::send_cbk(
- boost::system::error_code const &_error, std::size_t _bytes) {
+ boost::system::error_code const &_error, std::size_t _bytes,
+ message_buffer_ptr_t _sent_msg) {
(void)_bytes;
if (!_error) {
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -284,27 +292,23 @@ void client_endpoint_impl<Protocol>::send_cbk(
queue_.clear();
queue_size_ = 0;
} else {
- message_buffer_ptr_t its_buffer;
- if (queue_.size()) {
- its_buffer = queue_.front();
- }
service_t its_service(0);
method_t its_method(0);
client_t its_client(0);
session_t its_session(0);
- if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) {
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
its_service = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
- (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
its_method = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_METHOD_POS_MIN],
- (*its_buffer)[VSOMEIP_METHOD_POS_MAX]);
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
its_client = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_CLIENT_POS_MIN],
- (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]);
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
its_session = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SESSION_POS_MIN],
- (*its_buffer)[VSOMEIP_SESSION_POS_MAX]);
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
}
VSOMEIP_WARNING << "cei::send_cbk received error: "
<< _error.message() << " (" << std::dec
@@ -338,40 +342,33 @@ void client_endpoint_impl<Protocol>::send_cbk(
<< get_remote_information();
was_not_connected_ = true;
} else {
- {
- std::lock_guard<std::mutex> its_lock(mutex_);
- message_buffer_ptr_t its_buffer;
- if (queue_.size()) {
- its_buffer = queue_.front();
- }
- service_t its_service(0);
- method_t its_method(0);
- client_t its_client(0);
- session_t its_session(0);
- if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) {
- its_service = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
- (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
- its_method = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_METHOD_POS_MIN],
- (*its_buffer)[VSOMEIP_METHOD_POS_MAX]);
- its_client = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_CLIENT_POS_MIN],
- (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]);
- its_session = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SESSION_POS_MIN],
- (*its_buffer)[VSOMEIP_SESSION_POS_MAX]);
- }
- VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
- << " (" << std::dec << _error.value() << ") "
- << get_remote_information() << " "
- << " " << std::dec << queue_.size()
- << " " << std::dec << queue_size_ << " ("
- << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
}
+ VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information() << " "
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
print_status();
}
}
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp
index b84cb0a..4664fdb 100644
--- a/implementation/endpoints/src/local_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp
@@ -45,8 +45,8 @@ bool local_client_endpoint_impl::is_local() const {
return true;
}
-void local_client_endpoint_impl::restart() {
- if (state_ == cei_state_e::CONNECTING) {
+void local_client_endpoint_impl::restart(bool _force) {
+ if (!_force && state_ == cei_state_e::CONNECTING) {
return;
}
state_ = cei_state_e::CONNECTING;
@@ -206,7 +206,8 @@ VSOMEIP_INFO << msg.str();
local_client_endpoint_impl
>(shared_from_this()),
std::placeholders::_1,
- std::placeholders::_2
+ std::placeholders::_2,
+ its_buffer
)
);
}
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp
index 83e3e34..005c203 100644
--- a/implementation/endpoints/src/server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/server_endpoint_impl.cpp
@@ -48,7 +48,8 @@ bool server_endpoint_impl<Protocol>::is_client() const {
}
template<typename Protocol>
-void server_endpoint_impl<Protocol>::restart() {
+void server_endpoint_impl<Protocol>::restart(bool _force) {
+ (void)_force;
// intentionally left blank
}
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index 80bc95f..056e743 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -60,8 +60,8 @@ void tcp_client_endpoint_impl::start() {
connect();
}
-void tcp_client_endpoint_impl::restart() {
- if (state_ == cei_state_e::CONNECTING) {
+void tcp_client_endpoint_impl::restart(bool _force) {
+ if (!_force && state_ == cei_state_e::CONNECTING) {
return;
}
state_ = cei_state_e::CONNECTING;
@@ -133,6 +133,12 @@ void tcp_client_endpoint_impl::connect() {
<< "SO_REUSEADDR: " << its_error.message()
<< " remote:" << get_address_port_remote();
}
+ socket_->set_option(boost::asio::socket_base::linger(true, 0), its_error);
+ if (its_error) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
+ << "SO_LINGER: " << its_error.message()
+ << " remote:" << get_address_port_remote();
+ }
// In case a client endpoint port was configured,
// bind to it before connecting
if (local_.port() != ILLEGAL_PORT) {
@@ -265,23 +271,26 @@ void tcp_client_endpoint_impl::send_queued() {
#endif
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- boost::asio::async_write(
- *socket_,
- boost::asio::buffer(*its_buffer),
- std::bind(&tcp_client_endpoint_impl::write_completion_condition,
- std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
- std::placeholders::_1,
- std::placeholders::_2,
- its_buffer->size(),
- its_service, its_method, its_client, its_session,
- std::chrono::steady_clock::now()),
- std::bind(
- &tcp_client_endpoint_base_impl::send_cbk,
- shared_from_this(),
- std::placeholders::_1,
- std::placeholders::_2
- )
- );
+ if (socket_->is_open()) {
+ boost::asio::async_write(
+ *socket_,
+ boost::asio::buffer(*its_buffer),
+ std::bind(&tcp_client_endpoint_impl::write_completion_condition,
+ std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ its_buffer->size(),
+ its_service, its_method, its_client, its_session,
+ std::chrono::steady_clock::now()),
+ std::bind(
+ &tcp_client_endpoint_base_impl::send_cbk,
+ shared_from_this(),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ its_buffer
+ )
+ );
+ }
}
}
@@ -510,7 +519,7 @@ void tcp_client_endpoint_impl::receive_cbk(
<< " remote: " << get_address_port_remote()
<< ". Restarting connection due to missing/broken data TCP stream.";
its_lock.unlock();
- restart();
+ restart(true);
return;
}
} while (has_full_message && _recv_buffer_size);
@@ -528,16 +537,25 @@ void tcp_client_endpoint_impl::receive_cbk(
its_lock.unlock();
receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
} else {
- if (_error == boost::asio::error::connection_reset ||
- _error == boost::asio::error::eof ||
+ VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
+ << _error.message() << "(" << std::dec << _error.value()
+ << ") local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ if (_error == boost::asio::error::eof ||
_error == boost::asio::error::timed_out ||
- _error == boost::asio::error::bad_descriptor) {
- VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
- << _error.message() << "( " << std::dec << _error.value()
- << ") local: " << get_address_port_local()
- << " remote: " << get_address_port_remote();
- state_ = cei_state_e::CLOSED;
- shutdown_and_close_socket_unlocked(false);
+ _error == boost::asio::error::bad_descriptor ||
+ _error == boost::asio::error::connection_reset) {
+ if (state_ == cei_state_e::CONNECTING) {
+ VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk already"
+ " restarting" << get_remote_information();
+ } else {
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket_unlocked(false);
+ was_not_connected_ = true;
+ its_lock.unlock();
+ its_host->on_disconnect(shared_from_this());
+ restart(true);
+ }
} else {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
<< _error.message() << "( " << std::dec << _error.value()
@@ -661,4 +679,67 @@ std::string tcp_client_endpoint_impl::get_remote_information() const {
+ std::to_string(remote_.port());
}
+void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
+ std::size_t _bytes,
+ message_buffer_ptr_t _sent_msg) {
+ (void)_bytes;
+ if (!_error) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ if (queue_.size() > 0) {
+ queue_size_ -= queue_.front()->size();
+ queue_.pop_front();
+ send_queued();
+ }
+ } else if (_error == boost::system::errc::destination_address_required) {
+ VSOMEIP_WARNING << "tce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information();
+ was_not_connected_ = true;
+ } else if (_error == boost::asio::error::operation_aborted) {
+ // endpoint was stopped
+ shutdown_and_close_socket(false);
+ } else {
+ if (state_ == cei_state_e::CONNECTING) {
+ VSOMEIP_WARNING << "tce::send_cbk endpoint is already restarting:"
+ << get_remote_information();
+ } else {
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket(false);
+ was_not_connected_ = true;
+ std::shared_ptr<endpoint_host> its_host = host_.lock();
+ if (its_host) {
+ its_host->on_disconnect(shared_from_this());
+ }
+ restart(true);
+ }
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_WARNING << "tce::send_cbk received error: "
+ << _error.message() << " (" << std::dec
+ << _error.value() << ") " << get_remote_information()
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ }
+}
+
} // namespace vsomeip
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
index 3c05357..7be55f4 100644
--- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
@@ -92,8 +92,8 @@ void udp_client_endpoint_impl::start() {
connect();
}
-void udp_client_endpoint_impl::restart() {
- if (state_ == cei_state_e::CONNECTING) {
+void udp_client_endpoint_impl::restart(bool _force) {
+ if (!_force && state_ == cei_state_e::CONNECTING) {
return;
}
state_ = cei_state_e::CONNECTING;
@@ -136,7 +136,8 @@ void udp_client_endpoint_impl::send_queued() {
&udp_client_endpoint_base_impl::send_cbk,
shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2
+ std::placeholders::_2,
+ its_buffer
)
);
}
diff --git a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
index fe5aa13..5dbd69a 100644
--- a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
@@ -113,8 +113,8 @@ uint32_t virtual_server_endpoint_impl::get_use_count() {
return use_count_;
}
-void virtual_server_endpoint_impl::restart() {
-
+void virtual_server_endpoint_impl::restart(bool _force) {
+ (void)_force;
}
void virtual_server_endpoint_impl::register_error_handler(
diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp
index 7992b50..94d290c 100644
--- a/implementation/routing/include/routing_manager_proxy.hpp
+++ b/implementation/routing/include/routing_manager_proxy.hpp
@@ -157,6 +157,7 @@ private:
uint32_t get_remote_subscriber_count(service_t _service, instance_t _instance,
eventgroup_t _eventgroup, bool _increment);
+ void clear_remote_subscriber_count(service_t _service, instance_t _instance);
void register_application_timeout_cbk(boost::system::error_code const &_error);
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 08cfe95..9fd9258 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -1410,6 +1410,14 @@ void routing_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
}
on_availability(its_service.first, its_instance.first,
false, its_info->get_major(), its_info->get_minor());
+ stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT,
+ its_service.first, its_instance.first,
+ its_info->get_major(),
+ its_info->get_minor());
+ VSOMEIP_WARNING << __func__
+ << ": lost connection to remote service: "
+ << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance.first;
}
}
}
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index c1fd00b..6d1dbca 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -210,6 +210,7 @@ void routing_manager_proxy::stop_offer_service(client_t _client,
(void)_client;
routing_manager_base::stop_offer_service(_client, _service, _instance, _major, _minor);
+ clear_remote_subscriber_count(_service, _instance);
// Reliable/Unreliable unimportant as routing_proxy does not
// create server endpoints which needs to be freed
@@ -1894,6 +1895,19 @@ uint32_t routing_manager_proxy::get_remote_subscriber_count(service_t _service,
return count;
}
+void routing_manager_proxy::clear_remote_subscriber_count(
+ service_t _service, instance_t _instance) {
+ std::lock_guard<std::mutex> its_lock(remote_subscriber_count_mutex_);
+ auto found_service = remote_subscriber_count_.find(_service);
+ if (found_service != remote_subscriber_count_.end()) {
+ if (found_service->second.erase(_instance)) {
+ if (!found_service->second.size()) {
+ remote_subscriber_count_.erase(found_service);
+ }
+ }
+ }
+}
+
void routing_manager_proxy::register_application_timeout_cbk(
boost::system::error_code const &_error) {
if (!_error) {
diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp
index 918b993..065e6a3 100644
--- a/implementation/runtime/src/application_impl.cpp
+++ b/implementation/runtime/src/application_impl.cpp
@@ -361,8 +361,7 @@ void application_impl::start() {
#endif
} catch (const std::exception &e) {
VSOMEIP_ERROR << "application_impl::start() "
- "catched exception:" << e.what();
- throw;
+ "catched exception: " << e.what();
}
});
io_threads_.insert(its_thread);
@@ -397,8 +396,7 @@ void application_impl::start() {
boost::current_exception_diagnostic_information();
#endif
} catch (const std::exception &e) {
- VSOMEIP_ERROR << "application_impl::start() catched exception:" << e.what();
- throw;
+ VSOMEIP_ERROR << "application_impl::start() catched exception: " << e.what();
}
if (stop_thread_.joinable()) {
diff --git a/implementation/service_discovery/include/message_impl.hpp b/implementation/service_discovery/include/message_impl.hpp
index f679e9c..baca328 100755
--- a/implementation/service_discovery/include/message_impl.hpp
+++ b/implementation/service_discovery/include/message_impl.hpp
@@ -98,6 +98,9 @@ public:
void forced_initial_events_add(forced_initial_events_t _entry);
const std::vector<forced_initial_events_t> forced_initial_events_get();
+ void set_initial_events_required(bool _initial_events);
+ bool initial_events_required() const;
+
private:
entry_impl * deserialize_entry(vsomeip::deserializer *_from);
option_impl * deserialize_option(vsomeip::deserializer *_from);
@@ -114,6 +117,8 @@ private:
std::mutex forced_initial_events_mutex_;
std::vector<forced_initial_events_t> forced_initial_events_info_;
+
+ std::atomic<bool> initial_events_required_;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp
index 402bcc3..995cfbc 100644
--- a/implementation/service_discovery/include/service_discovery_impl.hpp
+++ b/implementation/service_discovery/include/service_discovery_impl.hpp
@@ -337,6 +337,12 @@ private:
const boost::asio::ip::address &_address);
void remove_remote_offer_type_by_ip(const boost::asio::ip::address &_address);
+ std::vector<std::tuple<service_t, instance_t, eventgroup_t,
+ std::shared_ptr<endpoint_definition>>>
+ get_eventgroups_requiring_initial_events(
+ const std::shared_ptr<message_impl>& _response) const;
+
+
private:
boost::asio::io_service &io_;
service_discovery_host *host_;
diff --git a/implementation/service_discovery/src/message_impl.cpp b/implementation/service_discovery/src/message_impl.cpp
index f4695a9..98ed9de 100755
--- a/implementation/service_discovery/src/message_impl.cpp
+++ b/implementation/service_discovery/src/message_impl.cpp
@@ -30,7 +30,8 @@ message_impl::message_impl() :
flags_(0x0),
options_length_(0x0),
number_required_acks_(0x0),
- number_contained_acks_(0x0) {
+ number_contained_acks_(0x0),
+ initial_events_required_(false) {
header_.service_ = 0xFFFF;
header_.method_ = 0x8100;
header_.protocol_version_ = 0x01;
@@ -435,5 +436,13 @@ message_impl::forced_initial_events_get() {
return forced_initial_events_info_;
}
+void message_impl::set_initial_events_required(bool _initial_events_required) {
+ initial_events_required_ = _initial_events_required;
+}
+
+bool message_impl::initial_events_required() const {
+ return initial_events_required_;
+}
+
} // namespace sd
} // namespace vsomeip
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index da8c9a8..084d8bf 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -1257,6 +1257,14 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
host_->send_initial_events(fie.service_, fie.instance_,
fie.eventgroup_, fie.target_);
}
+ if (its_message_response->initial_events_required()) {
+ for (const auto& ack_tuple :
+ get_eventgroups_requiring_initial_events(its_message_response)) {
+ host_->send_initial_events(std::get<0>(ack_tuple),
+ std::get<1>(ack_tuple), std::get<2>(ack_tuple),
+ std::get<3>(ack_tuple));
+ }
+ }
}
}
for (const auto& response : its_resubscribes) {
@@ -1616,7 +1624,7 @@ void service_discovery_impl::process_findservice_serviceentry(
host_->get_offered_service_instances(_service);
// send back all available instances
for (const auto &found_instance : offered_instances) {
- send_uni_or_multicast_offerservice(_service, _instance, _major,
+ send_uni_or_multicast_offerservice(_service, found_instance.first, _major,
_minor, found_instance.second, _unicast_flag);
}
}
@@ -3393,8 +3401,6 @@ void service_discovery_impl::remote_subscription_acknowledge_subscriber(
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged) {
std::shared_ptr<message_impl> its_response = _subscriber->response_message_id_->response_;
- std::vector<std::tuple<service_t, instance_t, eventgroup_t,
- std::shared_ptr<endpoint_definition>>> its_acks;
bool sent(false);
{
std::lock_guard<std::mutex> its_lock(response_mutex_);
@@ -3415,32 +3421,13 @@ void service_discovery_impl::remote_subscription_acknowledge_subscriber(
// set required acks to 0xFF to mark message as sent
its_response->set_number_required_acks((std::numeric_limits<uint8_t>::max)());
sent = true;
+ } else {
+ its_response->set_initial_events_required(true);
}
}
if (sent) {
- for (const auto &e : its_response->get_entries()) {
- if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
- && e->get_ttl() > 0) {
- const std::shared_ptr<eventgroupentry_impl> casted_e =
- std::static_pointer_cast<eventgroupentry_impl>(e);
- const std::shared_ptr<endpoint_definition> its_reliable =
- casted_e->get_target(true);
- if (its_reliable) {
- its_acks.push_back(
- std::make_tuple(e->get_service(), e->get_instance(),
- casted_e->get_eventgroup(), its_reliable));
- }
- const std::shared_ptr<endpoint_definition> its_unreliable =
- casted_e->get_target(false);
- if (its_unreliable) {
- its_acks.push_back(
- std::make_tuple(e->get_service(), e->get_instance(),
- casted_e->get_eventgroup(),
- its_unreliable));
- }
- }
- }
- for (const auto& ack_tuple : its_acks) {
+ for (const auto& ack_tuple : get_eventgroups_requiring_initial_events(
+ its_response)) {
host_->send_initial_events(std::get<0>(ack_tuple),
std::get<1>(ack_tuple), std::get<2>(ack_tuple),
std::get<3>(ack_tuple));
@@ -3650,5 +3637,37 @@ void service_discovery_impl::remove_remote_offer_type_by_ip(
remote_offers_by_ip_.erase(_address);
}
+std::vector<std::tuple<service_t, instance_t, eventgroup_t,
+ std::shared_ptr<endpoint_definition>>>
+service_discovery_impl::get_eventgroups_requiring_initial_events(
+ const std::shared_ptr<message_impl>& _response) const {
+ std::vector<std::tuple<service_t, instance_t, eventgroup_t,
+ std::shared_ptr<endpoint_definition>>> its_acks;
+ for (const auto &e : _response->get_entries()) {
+ if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
+ && e->get_ttl() > 0) {
+ const std::shared_ptr<eventgroupentry_impl> casted_e =
+ std::static_pointer_cast<eventgroupentry_impl>(e);
+ // only entries which require initial events have a target set
+ const std::shared_ptr<endpoint_definition> its_reliable =
+ casted_e->get_target(true);
+ if (its_reliable) {
+ its_acks.push_back(
+ std::make_tuple(e->get_service(), e->get_instance(),
+ casted_e->get_eventgroup(), its_reliable));
+ }
+ const std::shared_ptr<endpoint_definition> its_unreliable =
+ casted_e->get_target(false);
+ if (its_unreliable) {
+ its_acks.push_back(
+ std::make_tuple(e->get_service(), e->get_instance(),
+ casted_e->get_eventgroup(),
+ its_unreliable));
+ }
+ }
+ }
+ return its_acks;
+}
+
} // namespace sd
} // namespace vsomeip
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 98a4a5a..1ddd80d 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -2860,6 +2860,10 @@ if(NOT ${TESTS_BAT})
COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_PENDING_SUBSCRIPTION_MASTER_STARTER} SUBSCRIBE_UNSUBSCRIBE_SAME_PORT)
set_tests_properties(${TEST_PENDING_SUBSCRIPTION_NAME}_alternating_subscribe_unsubscribe_same_port PROPERTIES TIMEOUT 180)
+ add_test(NAME ${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_resubscribe_mixed
+ COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_PENDING_SUBSCRIPTION_MASTER_STARTER} SUBSCRIBE_RESUBSCRIBE_MIXED)
+ set_tests_properties(${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_resubscribe_mixed PROPERTIES TIMEOUT 180)
+
# malicious data test
add_test(NAME ${TEST_MALICIOUS_DATA_NAME}
COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_MALICIOUS_DATA_MASTER_STARTER})
diff --git a/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in b/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in
index 3a5001c..beb5db2 100755
--- a/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in
+++ b/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in
@@ -17,7 +17,7 @@ then
echo "Please pass a test mode to this script."
echo "For example: $0 SUSCRIBE"
echo "Valid subscription types include:"
- echo " [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT]"
+ echo " [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT, SUBSCRIBE_RESUBSCRIBE_MIXED]"
exit 1
fi
TESTMODE=$1
diff --git a/test/pending_subscription_tests/pending_subscription_test_globals.hpp b/test/pending_subscription_tests/pending_subscription_test_globals.hpp
index 2bc1322..ade1d6f 100644
--- a/test/pending_subscription_tests/pending_subscription_test_globals.hpp
+++ b/test/pending_subscription_tests/pending_subscription_test_globals.hpp
@@ -25,7 +25,8 @@ enum test_mode_e {
SUBSCRIBE_UNSUBSCRIBE,
UNSUBSCRIBE,
SUBSCRIBE_UNSUBSCRIBE_NACK,
- SUBSCRIBE_UNSUBSCRIBE_SAME_PORT
+ SUBSCRIBE_UNSUBSCRIBE_SAME_PORT,
+ SUBSCRIBE_RESUBSCRIBE_MIXED
};
}
diff --git a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
index c9e686d..dd7ed5a 100644
--- a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
+++ b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
@@ -147,7 +147,7 @@ TEST_F(pending_subscription, send_multiple_subscriptions)
std::thread send_thread([&]() {
try {
- std::uint8_t its_offer_service_message[] = {
+ std::uint8_t its_subscribe_message[] = {
0xff, 0xff, 0x81, 0x00,
0x00, 0x00, 0x00, 0x40, // length
0x00, 0x00, 0x00, 0x01,
@@ -169,14 +169,14 @@ TEST_F(pending_subscription, send_multiple_subscriptions)
};
boost::asio::ip::address its_local_address =
boost::asio::ip::address::from_string(std::string(local_address));
- std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
boost::asio::ip::udp::socket::endpoint_type target_sd(
boost::asio::ip::address::from_string(std::string(remote_address)),
30490);
for (int var = 0; var < 15; ++var) {
- udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd);
- ++its_offer_service_message[11];
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
}
@@ -331,7 +331,7 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe)
std::thread send_thread([&]() {
try {
- std::uint8_t its_offer_service_message[] = {
+ std::uint8_t its_subscribe_message[] = {
0xff, 0xff, 0x81, 0x00,
0x00, 0x00, 0x00, 0x40, // length
0x00, 0x00, 0x00, 0x01,
@@ -354,20 +354,20 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe)
boost::asio::ip::address its_local_address =
boost::asio::ip::address::from_string(std::string(local_address));
- std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
boost::asio::ip::udp::socket::endpoint_type target_sd(
boost::asio::ip::address::from_string(std::string(remote_address)),
30490);
for (int var = 0; var < 15; ++var) {
- udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd);
- ++its_offer_service_message[11];
- if (its_offer_service_message[11] % 2) {
- its_offer_service_message[35] = 16;
- its_offer_service_message[51] = 16;
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
+ if (its_subscribe_message[11] % 2) {
+ its_subscribe_message[35] = 16;
+ its_subscribe_message[51] = 16;
} else {
- its_offer_service_message[35] = 0;
- its_offer_service_message[51] = 0;
+ its_subscribe_message[35] = 0;
+ its_subscribe_message[51] = 0;
}
}
@@ -522,7 +522,7 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions)
std::thread send_thread([&]() {
try {
- std::uint8_t its_offer_service_message[] = {
+ std::uint8_t its_subscribe_message[] = {
0xff, 0xff, 0x81, 0x00,
0x00, 0x00, 0x00, 0x40, // length
0x00, 0x00, 0x00, 0x01,
@@ -545,21 +545,21 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions)
boost::asio::ip::address its_local_address =
boost::asio::ip::address::from_string(std::string(local_address));
- std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
boost::asio::ip::udp::socket::endpoint_type target_sd(
boost::asio::ip::address::from_string(std::string(remote_address)),
30490);
for (int var = 0; var < 15; ++var) {
- if (its_offer_service_message[11] == 15 || its_offer_service_message[11] == 0x1) {
- its_offer_service_message[35] = 16;
- its_offer_service_message[51] = 16;
+ if (its_subscribe_message[11] == 15 || its_subscribe_message[11] == 0x1) {
+ its_subscribe_message[35] = 16;
+ its_subscribe_message[51] = 16;
} else {
- its_offer_service_message[35] = 0;
- its_offer_service_message[51] = 0;
+ its_subscribe_message[35] = 0;
+ its_subscribe_message[51] = 0;
}
- udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd);
- ++its_offer_service_message[11];
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
}
if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) {
@@ -724,7 +724,7 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe)
std::thread send_thread([&]() {
try {
- std::uint8_t its_offer_service_message[] = {
+ std::uint8_t its_subscribe_message[] = {
0xff, 0xff, 0x81, 0x00,
0x00, 0x00, 0x00, 0x40, // length
0x00, 0x00, 0x00, 0x01,
@@ -747,20 +747,20 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe)
boost::asio::ip::address its_local_address =
boost::asio::ip::address::from_string(std::string(local_address));
- std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
boost::asio::ip::udp::socket::endpoint_type target_sd(
boost::asio::ip::address::from_string(std::string(remote_address)),
30490);
for (int var = 0; var < 15; ++var) {
- udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd);
- ++its_offer_service_message[11];
- if (its_offer_service_message[11] % 2) {
- its_offer_service_message[35] = 16;
- its_offer_service_message[51] = 16;
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
+ if (its_subscribe_message[11] % 2) {
+ its_subscribe_message[35] = 16;
+ its_subscribe_message[51] = 16;
} else {
- its_offer_service_message[35] = 0;
- its_offer_service_message[51] = 0;
+ its_subscribe_message[35] = 0;
+ its_subscribe_message[51] = 0;
}
}
@@ -927,7 +927,7 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
}
try {
- std::uint8_t its_offer_service_message[] = {
+ std::uint8_t its_subscribe_message[] = {
0xff, 0xff, 0x81, 0x00,
0x00, 0x00, 0x00, 0x4C, // length
0x00, 0x00, 0x00, 0x01,
@@ -953,21 +953,21 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
boost::asio::ip::address its_local_address =
boost::asio::ip::address::from_string(std::string(local_address));
- std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
- std::memcpy(&its_offer_service_message[76], &its_local_address.to_v4().to_bytes()[0], 4);
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+ std::memcpy(&its_subscribe_message[76], &its_local_address.to_v4().to_bytes()[0], 4);
boost::asio::ip::udp::socket::endpoint_type target_sd(
boost::asio::ip::address::from_string(std::string(remote_address)),
30490);
for (int var = 0; var < 15; ++var) {
- udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd);
- ++its_offer_service_message[11];
- if (its_offer_service_message[11] % 2) {
- its_offer_service_message[35] = 16;
- its_offer_service_message[51] = 16;
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
+ if (its_subscribe_message[11] % 2) {
+ its_subscribe_message[35] = 16;
+ its_subscribe_message[51] = 16;
} else {
- its_offer_service_message[35] = 0;
- its_offer_service_message[51] = 0;
+ its_subscribe_message[35] = 0;
+ its_subscribe_message[51] = 0;
}
}
@@ -1006,6 +1006,203 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
receive_thread.join();
}
+/*
+ * @test Send a subscription as single message and afterwards send a
+ * resubscription containing a new subscription in the same message and check
+ * to receive initial event
+ */
+TEST_F(pending_subscription, subscribe_resubscribe_mixed)
+{
+ std::promise<void> first_initial_event_received;
+ std::promise<void> second_initial_event_received;
+
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ std::thread receive_thread([&](){
+ std::atomic<bool> keep_receiving(true);
+ std::function<void()> receive;
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+
+ const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
+ const boost::system::error_code& error, std::size_t bytes_transferred) {
+ if (error) {
+ keep_receiving = false;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ return;
+ }
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
+ }
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+
+ vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[VSOMEIP_METHOD_POS_MAX]);
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_GE(2u, sd_msg.get_entries().size());
+ for (auto e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(3u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ }
+ }
+
+ static int called = 0;
+ if (++called == 2) {
+ EXPECT_EQ(1u, its_received_events.size());
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
+ // all subscribeAcks and one initial event of first event received
+ first_initial_event_received.set_value();
+ }
+ if (called == 4) { // events were received as well
+ // all subscribeAcks and one initial event of second event received
+ EXPECT_EQ(2u, its_received_events.size());
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[1]);
+ keep_receiving = false;
+ second_initial_event_received.set_value();
+ }
+ if (!error && keep_receiving) {
+ receive();
+ }
+ };
+
+ receive = [&]() {
+ udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
+ receive_cbk);
+ };
+
+ receive();
+ while(keep_receiving) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ });
+
+ std::thread send_thread([&]() {
+ try {
+ // call notify method to ensure to receive initial events
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x30, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x10, // length entries array
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x01, // eventgroup
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[48], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+
+
+
+ if (std::future_status::timeout == first_initial_event_received.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAck of first subscription within time";
+ }
+
+ // send second subscription with resubscription and new subscription
+ std::uint8_t its_subscribe_resubscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x40, // length
+ 0x00, 0x00, 0x00, 0x02,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x20, // length entries array
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x01, // eventgroup 2
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+ std::memcpy(&its_subscribe_resubscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_resubscribe_message), target_sd);
+
+ if (std::future_status::timeout == second_initial_event_received.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAck of second subscription within time";
+ }
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+
+ send_thread.join();
+ receive_thread.join();
+}
+
#ifndef _WIN32
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
@@ -1028,6 +1225,8 @@ int main(int argc, char** argv) {
::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_nack_unsubscribe";
} else if (its_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE_SAME_PORT")) {
::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_unsubscribe_same_port";
+ } else if (its_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) {
+ ::testing::GTEST_FLAG(filter) = "*subscribe_resubscribe_mixed";
}
return RUN_ALL_TESTS();
}
diff --git a/test/pending_subscription_tests/pending_subscription_test_service.cpp b/test/pending_subscription_tests/pending_subscription_test_service.cpp
index 0e4878a..59e7aa9 100644
--- a/test/pending_subscription_tests/pending_subscription_test_service.cpp
+++ b/test/pending_subscription_tests/pending_subscription_test_service.cpp
@@ -146,6 +146,8 @@ public:
;
} else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT) {
;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) {
+ ;
}
std::future<bool> itsFuture = notify_method_called_.get_future();
if (std::future_status::timeout == itsFuture.wait_for(std::chrono::seconds(10))) {
@@ -218,6 +220,13 @@ public:
if (count_subscribe == 16 || count_unsubscribe == 14) {
subscription_accepted_asynchronous_ = true;
}
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) {
+ static int was_called = 0;
+ was_called++;
+ EXPECT_EQ(1, was_called);
+ EXPECT_TRUE(_subscribed);
+ _cbk(true);
+ subscription_accepted_asynchronous_ = true;
}
}
@@ -280,6 +289,13 @@ public:
subscription_accepted_synchronous_ = true;
}
ret = true;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) {
+ static int was_called = 0;
+ was_called++;
+ EXPECT_EQ(1, was_called);
+ EXPECT_TRUE(_subscribed);
+ subscription_accepted_synchronous_ = true;
+ ret = true;
}
return ret;
}
@@ -330,6 +346,8 @@ int main(int argc, char** argv)
its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_NACK;
} else if (its_pased_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE_SAME_PORT")) {
its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT;
+ } else if (its_pased_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) {
+ its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED;
}
return RUN_ALL_TESTS();