diff options
author | Juergen Gehring <juergen.gehring@bmw.de> | 2018-05-22 02:56:44 -0700 |
---|---|---|
committer | Juergen Gehring <juergen.gehring@bmw.de> | 2018-05-22 02:56:44 -0700 |
commit | 8b950ebd7d0d0ed349b7f59255cb1a157ceede3c (patch) | |
tree | 11c64da84e8edb13c81b6fa4b45c5ca4163fb79e | |
parent | 8826ddae04a88ca20bb8e571e6596ad9718a6077 (diff) | |
download | vSomeIP-8b950ebd7d0d0ed349b7f59255cb1a157ceede3c.tar.gz |
vsomeip 2.10.182.10.18
29 files changed, 556 insertions, 180 deletions
@@ -1,6 +1,16 @@ Changes ======= +v2.10.18 +- Fix restarting of TCP connection on connection reset by the server + and mark services reachable through it as unavailable until + connection is established again. +- Fix bug which prevented restarting of TCP connections if the peer + instantly send a RST after the connection had been established. +- Fix bug which could cause missing initial events in conjunction with + service discovery messages containing new subscriptions and + resubscriptions. + v2.10.17 - Speedup initial subscriptions to unreliable remote services - Fix deadlock in conjunction with configured client ports diff --git a/CMakeLists.txt b/CMakeLists.txt index 734930c..5cb9b56 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ project (vsomeip) set (VSOMEIP_MAJOR_VERSION 2) set (VSOMEIP_MINOR_VERSION 10) -set (VSOMEIP_PATCH_VERSION 17) +set (VSOMEIP_PATCH_VERSION 18) set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION}) set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in set (CMAKE_VERBOSE_MAKEFILE off) diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index b72ea7e..2b235d6 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -47,7 +47,7 @@ public: bool flush();
virtual void stop();
- virtual void restart() = 0;
+ virtual void restart(bool _force = false) = 0;
bool is_client() const;
@@ -61,7 +61,8 @@ public: public:
void connect_cbk(boost::system::error_code const &_error);
void wait_connect_cbk(boost::system::error_code const &_error);
- void send_cbk(boost::system::error_code const &_error, std::size_t _bytes);
+ virtual void send_cbk(boost::system::error_code const &_error,
+ std::size_t _bytes, message_buffer_ptr_t _sent_msg);
void flush_cbk(boost::system::error_code const &_error);
public:
@@ -98,7 +99,7 @@ protected: std::mutex mutex_;
- bool was_not_connected_;
+ std::atomic<bool> was_not_connected_;
std::atomic<std::uint16_t> local_port_;
diff --git a/implementation/endpoints/include/endpoint.hpp b/implementation/endpoints/include/endpoint.hpp index dc97ffe..8ee3f8f 100644 --- a/implementation/endpoints/include/endpoint.hpp +++ b/implementation/endpoints/include/endpoint.hpp @@ -47,7 +47,7 @@ public: virtual void decrement_use_count() = 0;
virtual uint32_t get_use_count() = 0;
- virtual void restart() = 0;
+ virtual void restart(bool _force = false) = 0;
virtual void register_error_handler(error_handler_t _error) = 0;
diff --git a/implementation/endpoints/include/endpoint_impl.hpp b/implementation/endpoints/include/endpoint_impl.hpp index 49b9f7f..05a6964 100644 --- a/implementation/endpoints/include/endpoint_impl.hpp +++ b/implementation/endpoints/include/endpoint_impl.hpp @@ -60,7 +60,7 @@ public: // required
virtual bool is_client() const = 0;
virtual void receive() = 0;
- virtual void restart() = 0;
+ virtual void restart(bool _force) = 0;
protected:
uint32_t find_magic_cookie(byte_t *_buffer, size_t _size);
@@ -82,7 +82,7 @@ protected: uint32_t use_count_;
- bool sending_blocked_;
+ std::atomic<bool> sending_blocked_;
std::mutex local_mutex_;
endpoint_type local_;
diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp index 1f9620c..59a9eee 100644 --- a/implementation/endpoints/include/local_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp @@ -47,7 +47,7 @@ public: bool get_remote_address(boost::asio::ip::address &_address) const; std::uint16_t get_remote_port() const; - void restart(); + void restart(bool _force); void print_status(); private: diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp index be2299f..2b0ad60 100644 --- a/implementation/endpoints/include/server_endpoint_impl.hpp +++ b/implementation/endpoints/include/server_endpoint_impl.hpp @@ -37,7 +37,7 @@ public: virtual ~server_endpoint_impl();
bool is_client() const;
- void restart();
+ void restart(bool _force);
bool is_connected() const;
void set_connected(bool _connected);
bool send(const uint8_t *_data, uint32_t _size, bool _flush);
diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp index be6cf24..cbadf5a 100644 --- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp @@ -31,7 +31,7 @@ public: virtual ~tcp_client_endpoint_impl();
void start();
- void restart();
+ void restart(bool _force);
bool get_remote_address(boost::asio::ip::address &_address) const;
std::uint16_t get_remote_port() const;
@@ -39,6 +39,8 @@ public: bool is_local() const;
void print_status();
+ void send_cbk(boost::system::error_code const &_error, std::size_t _bytes,
+ message_buffer_ptr_t _sent_msg);
private:
void send_queued();
bool is_magic_cookie(const message_buffer_ptr_t& _recv_buffer,
diff --git a/implementation/endpoints/include/udp_client_endpoint_impl.hpp b/implementation/endpoints/include/udp_client_endpoint_impl.hpp index 1805813..1866d05 100644 --- a/implementation/endpoints/include/udp_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/udp_client_endpoint_impl.hpp @@ -34,7 +34,7 @@ public: virtual ~udp_client_endpoint_impl();
void start();
- void restart();
+ void restart(bool _force);
void receive_cbk(boost::system::error_code const &_error,
std::size_t _bytes);
diff --git a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp index 303f984..4f4d6d4 100644 --- a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp @@ -50,7 +50,7 @@ public: void decrement_use_count(); uint32_t get_use_count(); - void restart(); + void restart(bool _force); void register_error_handler(error_handler_t _handler); void print_status(); diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index 59d4dde..4423270 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -61,7 +61,12 @@ bool client_endpoint_impl<Protocol>::is_connected() const { template<typename Protocol>
void client_endpoint_impl<Protocol>::set_connected(bool _connected) {
if (_connected) {
- state_ = cei_state_e::ESTABLISHED;
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ if (socket_->is_open()) {
+ state_ = cei_state_e::ESTABLISHED;
+ } else {
+ state_ = cei_state_e::CLOSED;
+ }
} else {
state_ = cei_state_e::CLOSED;
}
}
@@ -222,15 +227,15 @@ void client_endpoint_impl<Protocol>::connect_cbk( if (its_host) {
if (_error && _error != boost::asio::error::already_connected) {
shutdown_and_close_socket(true);
- start_connect_timer();
- // Double the timeout as long as the maximum allowed is larger
- if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
- connect_timeout_ = (connect_timeout_ << 1);
if (state_ != cei_state_e::ESTABLISHED) {
state_ = cei_state_e::CLOSED;
its_host->on_disconnect(this->shared_from_this());
}
+ start_connect_timer();
+ // Double the timeout as long as the maximum allowed is larger
+ if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
+ connect_timeout_ = (connect_timeout_ << 1);
} else {
{
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
@@ -244,11 +249,13 @@ void client_endpoint_impl<Protocol>::connect_cbk( receive();
- {
+ if (was_not_connected_) {
+ was_not_connected_ = false;
std::lock_guard<std::mutex> its_lock(mutex_);
- if (queue_.size() > 0 && was_not_connected_) {
- was_not_connected_ = false;
+ if (queue_.size() > 0) {
send_queued();
+ VSOMEIP_WARNING << __func__ << ": resume sending to: "
+ << get_remote_information();
}
}
}
@@ -258,14 +265,15 @@ void client_endpoint_impl<Protocol>::connect_cbk( template<typename Protocol>
void client_endpoint_impl<Protocol>::wait_connect_cbk(
boost::system::error_code const &_error) {
- if (!_error) {
+ if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) {
connect();
}
}
template<typename Protocol>
void client_endpoint_impl<Protocol>::send_cbk(
- boost::system::error_code const &_error, std::size_t _bytes) {
+ boost::system::error_code const &_error, std::size_t _bytes,
+ message_buffer_ptr_t _sent_msg) {
(void)_bytes;
if (!_error) {
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -284,27 +292,23 @@ void client_endpoint_impl<Protocol>::send_cbk( queue_.clear();
queue_size_ = 0;
} else {
- message_buffer_ptr_t its_buffer;
- if (queue_.size()) {
- its_buffer = queue_.front();
- }
service_t its_service(0);
method_t its_method(0);
client_t its_client(0);
session_t its_session(0);
- if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) {
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
its_service = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
- (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
its_method = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_METHOD_POS_MIN],
- (*its_buffer)[VSOMEIP_METHOD_POS_MAX]);
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
its_client = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_CLIENT_POS_MIN],
- (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]);
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
its_session = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SESSION_POS_MIN],
- (*its_buffer)[VSOMEIP_SESSION_POS_MAX]);
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
}
VSOMEIP_WARNING << "cei::send_cbk received error: "
<< _error.message() << " (" << std::dec
@@ -338,40 +342,33 @@ void client_endpoint_impl<Protocol>::send_cbk( << get_remote_information();
was_not_connected_ = true;
} else {
- {
- std::lock_guard<std::mutex> its_lock(mutex_);
- message_buffer_ptr_t its_buffer;
- if (queue_.size()) {
- its_buffer = queue_.front();
- }
- service_t its_service(0);
- method_t its_method(0);
- client_t its_client(0);
- session_t its_session(0);
- if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) {
- its_service = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
- (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
- its_method = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_METHOD_POS_MIN],
- (*its_buffer)[VSOMEIP_METHOD_POS_MAX]);
- its_client = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_CLIENT_POS_MIN],
- (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]);
- its_session = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SESSION_POS_MIN],
- (*its_buffer)[VSOMEIP_SESSION_POS_MAX]);
- }
- VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
- << " (" << std::dec << _error.value() << ") "
- << get_remote_information() << " "
- << " " << std::dec << queue_.size()
- << " " << std::dec << queue_size_ << " ("
- << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
}
+ VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information() << " "
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
print_status();
}
}
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index b84cb0a..4664fdb 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -45,8 +45,8 @@ bool local_client_endpoint_impl::is_local() const { return true;
}
-void local_client_endpoint_impl::restart() {
- if (state_ == cei_state_e::CONNECTING) {
+void local_client_endpoint_impl::restart(bool _force) {
+ if (!_force && state_ == cei_state_e::CONNECTING) {
return;
}
state_ = cei_state_e::CONNECTING;
@@ -206,7 +206,8 @@ VSOMEIP_INFO << msg.str(); local_client_endpoint_impl
>(shared_from_this()),
std::placeholders::_1,
- std::placeholders::_2
+ std::placeholders::_2,
+ its_buffer
)
);
}
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp index 83e3e34..005c203 100644 --- a/implementation/endpoints/src/server_endpoint_impl.cpp +++ b/implementation/endpoints/src/server_endpoint_impl.cpp @@ -48,7 +48,8 @@ bool server_endpoint_impl<Protocol>::is_client() const { }
template<typename Protocol>
-void server_endpoint_impl<Protocol>::restart() {
+void server_endpoint_impl<Protocol>::restart(bool _force) {
+ (void)_force;
// intentionally left blank
}
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index 80bc95f..056e743 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -60,8 +60,8 @@ void tcp_client_endpoint_impl::start() { connect();
}
-void tcp_client_endpoint_impl::restart() {
- if (state_ == cei_state_e::CONNECTING) {
+void tcp_client_endpoint_impl::restart(bool _force) {
+ if (!_force && state_ == cei_state_e::CONNECTING) {
return;
}
state_ = cei_state_e::CONNECTING;
@@ -133,6 +133,12 @@ void tcp_client_endpoint_impl::connect() { << "SO_REUSEADDR: " << its_error.message()
<< " remote:" << get_address_port_remote();
}
+ socket_->set_option(boost::asio::socket_base::linger(true, 0), its_error);
+ if (its_error) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
+ << "SO_LINGER: " << its_error.message()
+ << " remote:" << get_address_port_remote();
+ }
// In case a client endpoint port was configured,
// bind to it before connecting
if (local_.port() != ILLEGAL_PORT) {
@@ -265,23 +271,26 @@ void tcp_client_endpoint_impl::send_queued() { #endif
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- boost::asio::async_write(
- *socket_,
- boost::asio::buffer(*its_buffer),
- std::bind(&tcp_client_endpoint_impl::write_completion_condition,
- std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
- std::placeholders::_1,
- std::placeholders::_2,
- its_buffer->size(),
- its_service, its_method, its_client, its_session,
- std::chrono::steady_clock::now()),
- std::bind(
- &tcp_client_endpoint_base_impl::send_cbk,
- shared_from_this(),
- std::placeholders::_1,
- std::placeholders::_2
- )
- );
+ if (socket_->is_open()) {
+ boost::asio::async_write(
+ *socket_,
+ boost::asio::buffer(*its_buffer),
+ std::bind(&tcp_client_endpoint_impl::write_completion_condition,
+ std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ its_buffer->size(),
+ its_service, its_method, its_client, its_session,
+ std::chrono::steady_clock::now()),
+ std::bind(
+ &tcp_client_endpoint_base_impl::send_cbk,
+ shared_from_this(),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ its_buffer
+ )
+ );
+ }
}
}
@@ -510,7 +519,7 @@ void tcp_client_endpoint_impl::receive_cbk( << " remote: " << get_address_port_remote()
<< ". Restarting connection due to missing/broken data TCP stream.";
its_lock.unlock();
- restart();
+ restart(true);
return;
}
} while (has_full_message && _recv_buffer_size);
@@ -528,16 +537,25 @@ void tcp_client_endpoint_impl::receive_cbk( its_lock.unlock();
receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
} else {
- if (_error == boost::asio::error::connection_reset ||
- _error == boost::asio::error::eof ||
+ VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
+ << _error.message() << "(" << std::dec << _error.value()
+ << ") local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ if (_error == boost::asio::error::eof ||
_error == boost::asio::error::timed_out ||
- _error == boost::asio::error::bad_descriptor) {
- VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
- << _error.message() << "( " << std::dec << _error.value()
- << ") local: " << get_address_port_local()
- << " remote: " << get_address_port_remote();
- state_ = cei_state_e::CLOSED;
- shutdown_and_close_socket_unlocked(false);
+ _error == boost::asio::error::bad_descriptor ||
+ _error == boost::asio::error::connection_reset) {
+ if (state_ == cei_state_e::CONNECTING) {
+ VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk already"
+ " restarting" << get_remote_information();
+ } else {
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket_unlocked(false);
+ was_not_connected_ = true;
+ its_lock.unlock();
+ its_host->on_disconnect(shared_from_this());
+ restart(true);
+ }
} else {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
<< _error.message() << "( " << std::dec << _error.value()
@@ -661,4 +679,67 @@ std::string tcp_client_endpoint_impl::get_remote_information() const { + std::to_string(remote_.port());
}
+void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
+ std::size_t _bytes,
+ message_buffer_ptr_t _sent_msg) {
+ (void)_bytes;
+ if (!_error) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ if (queue_.size() > 0) {
+ queue_size_ -= queue_.front()->size();
+ queue_.pop_front();
+ send_queued();
+ }
+ } else if (_error == boost::system::errc::destination_address_required) {
+ VSOMEIP_WARNING << "tce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information();
+ was_not_connected_ = true;
+ } else if (_error == boost::asio::error::operation_aborted) {
+ // endpoint was stopped
+ shutdown_and_close_socket(false);
+ } else {
+ if (state_ == cei_state_e::CONNECTING) {
+ VSOMEIP_WARNING << "tce::send_cbk endpoint is already restarting:"
+ << get_remote_information();
+ } else {
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket(false);
+ was_not_connected_ = true;
+ std::shared_ptr<endpoint_host> its_host = host_.lock();
+ if (its_host) {
+ its_host->on_disconnect(shared_from_this());
+ }
+ restart(true);
+ }
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_WARNING << "tce::send_cbk received error: "
+ << _error.message() << " (" << std::dec
+ << _error.value() << ") " << get_remote_information()
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ }
+}
+
} // namespace vsomeip
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index 3c05357..7be55f4 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -92,8 +92,8 @@ void udp_client_endpoint_impl::start() { connect();
}
-void udp_client_endpoint_impl::restart() {
- if (state_ == cei_state_e::CONNECTING) {
+void udp_client_endpoint_impl::restart(bool _force) {
+ if (!_force && state_ == cei_state_e::CONNECTING) {
return;
}
state_ = cei_state_e::CONNECTING;
@@ -136,7 +136,8 @@ void udp_client_endpoint_impl::send_queued() { &udp_client_endpoint_base_impl::send_cbk,
shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2
+ std::placeholders::_2,
+ its_buffer
)
);
}
diff --git a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp index fe5aa13..5dbd69a 100644 --- a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp @@ -113,8 +113,8 @@ uint32_t virtual_server_endpoint_impl::get_use_count() { return use_count_; } -void virtual_server_endpoint_impl::restart() { - +void virtual_server_endpoint_impl::restart(bool _force) { + (void)_force; } void virtual_server_endpoint_impl::register_error_handler( diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp index 7992b50..94d290c 100644 --- a/implementation/routing/include/routing_manager_proxy.hpp +++ b/implementation/routing/include/routing_manager_proxy.hpp @@ -157,6 +157,7 @@ private: uint32_t get_remote_subscriber_count(service_t _service, instance_t _instance, eventgroup_t _eventgroup, bool _increment); + void clear_remote_subscriber_count(service_t _service, instance_t _instance); void register_application_timeout_cbk(boost::system::error_code const &_error); diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 08cfe95..9fd9258 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -1410,6 +1410,14 @@ void routing_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) { } on_availability(its_service.first, its_instance.first, false, its_info->get_major(), its_info->get_minor()); + stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, + its_service.first, its_instance.first, + its_info->get_major(), + its_info->get_minor()); + VSOMEIP_WARNING << __func__ + << ": lost connection to remote service: " + << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "." + << std::hex << std::setw(4) << std::setfill('0') << its_instance.first; } } } diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index c1fd00b..6d1dbca 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -210,6 +210,7 @@ void routing_manager_proxy::stop_offer_service(client_t _client, (void)_client; routing_manager_base::stop_offer_service(_client, _service, _instance, _major, _minor); + clear_remote_subscriber_count(_service, _instance); // Reliable/Unreliable unimportant as routing_proxy does not // create server endpoints which needs to be freed @@ -1894,6 +1895,19 @@ uint32_t routing_manager_proxy::get_remote_subscriber_count(service_t _service, return count; } +void routing_manager_proxy::clear_remote_subscriber_count( + service_t _service, instance_t _instance) { + std::lock_guard<std::mutex> its_lock(remote_subscriber_count_mutex_); + auto found_service = remote_subscriber_count_.find(_service); + if (found_service != remote_subscriber_count_.end()) { + if (found_service->second.erase(_instance)) { + if (!found_service->second.size()) { + remote_subscriber_count_.erase(found_service); + } + } + } +} + void routing_manager_proxy::register_application_timeout_cbk( boost::system::error_code const &_error) { if (!_error) { diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index 918b993..065e6a3 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -361,8 +361,7 @@ void application_impl::start() { #endif } catch (const std::exception &e) { VSOMEIP_ERROR << "application_impl::start() " - "catched exception:" << e.what(); - throw; + "catched exception: " << e.what(); } }); io_threads_.insert(its_thread); @@ -397,8 +396,7 @@ void application_impl::start() { boost::current_exception_diagnostic_information(); #endif } catch (const std::exception &e) { - VSOMEIP_ERROR << "application_impl::start() catched exception:" << e.what(); - throw; + VSOMEIP_ERROR << "application_impl::start() catched exception: " << e.what(); } if (stop_thread_.joinable()) { diff --git a/implementation/service_discovery/include/message_impl.hpp b/implementation/service_discovery/include/message_impl.hpp index f679e9c..baca328 100755 --- a/implementation/service_discovery/include/message_impl.hpp +++ b/implementation/service_discovery/include/message_impl.hpp @@ -98,6 +98,9 @@ public: void forced_initial_events_add(forced_initial_events_t _entry);
const std::vector<forced_initial_events_t> forced_initial_events_get();
+ void set_initial_events_required(bool _initial_events);
+ bool initial_events_required() const;
+
private:
entry_impl * deserialize_entry(vsomeip::deserializer *_from);
option_impl * deserialize_option(vsomeip::deserializer *_from);
@@ -114,6 +117,8 @@ private: std::mutex forced_initial_events_mutex_;
std::vector<forced_initial_events_t> forced_initial_events_info_;
+
+ std::atomic<bool> initial_events_required_;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 402bcc3..995cfbc 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -337,6 +337,12 @@ private: const boost::asio::ip::address &_address); void remove_remote_offer_type_by_ip(const boost::asio::ip::address &_address); + std::vector<std::tuple<service_t, instance_t, eventgroup_t, + std::shared_ptr<endpoint_definition>>> + get_eventgroups_requiring_initial_events( + const std::shared_ptr<message_impl>& _response) const; + + private: boost::asio::io_service &io_; service_discovery_host *host_; diff --git a/implementation/service_discovery/src/message_impl.cpp b/implementation/service_discovery/src/message_impl.cpp index f4695a9..98ed9de 100755 --- a/implementation/service_discovery/src/message_impl.cpp +++ b/implementation/service_discovery/src/message_impl.cpp @@ -30,7 +30,8 @@ message_impl::message_impl() : flags_(0x0),
options_length_(0x0),
number_required_acks_(0x0),
- number_contained_acks_(0x0) {
+ number_contained_acks_(0x0),
+ initial_events_required_(false) {
header_.service_ = 0xFFFF;
header_.method_ = 0x8100;
header_.protocol_version_ = 0x01;
@@ -435,5 +436,13 @@ message_impl::forced_initial_events_get() { return forced_initial_events_info_;
}
+void message_impl::set_initial_events_required(bool _initial_events_required) {
+ initial_events_required_ = _initial_events_required;
+}
+
+bool message_impl::initial_events_required() const {
+ return initial_events_required_;
+}
+
} // namespace sd
} // namespace vsomeip
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index da8c9a8..084d8bf 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -1257,6 +1257,14 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, host_->send_initial_events(fie.service_, fie.instance_, fie.eventgroup_, fie.target_); } + if (its_message_response->initial_events_required()) { + for (const auto& ack_tuple : + get_eventgroups_requiring_initial_events(its_message_response)) { + host_->send_initial_events(std::get<0>(ack_tuple), + std::get<1>(ack_tuple), std::get<2>(ack_tuple), + std::get<3>(ack_tuple)); + } + } } } for (const auto& response : its_resubscribes) { @@ -1616,7 +1624,7 @@ void service_discovery_impl::process_findservice_serviceentry( host_->get_offered_service_instances(_service); // send back all available instances for (const auto &found_instance : offered_instances) { - send_uni_or_multicast_offerservice(_service, _instance, _major, + send_uni_or_multicast_offerservice(_service, found_instance.first, _major, _minor, found_instance.second, _unicast_flag); } } @@ -3393,8 +3401,6 @@ void service_discovery_impl::remote_subscription_acknowledge_subscriber( service_t _service, instance_t _instance, eventgroup_t _eventgroup, const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged) { std::shared_ptr<message_impl> its_response = _subscriber->response_message_id_->response_; - std::vector<std::tuple<service_t, instance_t, eventgroup_t, - std::shared_ptr<endpoint_definition>>> its_acks; bool sent(false); { std::lock_guard<std::mutex> its_lock(response_mutex_); @@ -3415,32 +3421,13 @@ void service_discovery_impl::remote_subscription_acknowledge_subscriber( // set required acks to 0xFF to mark message as sent its_response->set_number_required_acks((std::numeric_limits<uint8_t>::max)()); sent = true; + } else { + its_response->set_initial_events_required(true); } } if (sent) { - for (const auto &e : its_response->get_entries()) { - if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK - && e->get_ttl() > 0) { - const std::shared_ptr<eventgroupentry_impl> casted_e = - std::static_pointer_cast<eventgroupentry_impl>(e); - const std::shared_ptr<endpoint_definition> its_reliable = - casted_e->get_target(true); - if (its_reliable) { - its_acks.push_back( - std::make_tuple(e->get_service(), e->get_instance(), - casted_e->get_eventgroup(), its_reliable)); - } - const std::shared_ptr<endpoint_definition> its_unreliable = - casted_e->get_target(false); - if (its_unreliable) { - its_acks.push_back( - std::make_tuple(e->get_service(), e->get_instance(), - casted_e->get_eventgroup(), - its_unreliable)); - } - } - } - for (const auto& ack_tuple : its_acks) { + for (const auto& ack_tuple : get_eventgroups_requiring_initial_events( + its_response)) { host_->send_initial_events(std::get<0>(ack_tuple), std::get<1>(ack_tuple), std::get<2>(ack_tuple), std::get<3>(ack_tuple)); @@ -3650,5 +3637,37 @@ void service_discovery_impl::remove_remote_offer_type_by_ip( remote_offers_by_ip_.erase(_address); } +std::vector<std::tuple<service_t, instance_t, eventgroup_t, + std::shared_ptr<endpoint_definition>>> +service_discovery_impl::get_eventgroups_requiring_initial_events( + const std::shared_ptr<message_impl>& _response) const { + std::vector<std::tuple<service_t, instance_t, eventgroup_t, + std::shared_ptr<endpoint_definition>>> its_acks; + for (const auto &e : _response->get_entries()) { + if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK + && e->get_ttl() > 0) { + const std::shared_ptr<eventgroupentry_impl> casted_e = + std::static_pointer_cast<eventgroupentry_impl>(e); + // only entries which require initial events have a target set + const std::shared_ptr<endpoint_definition> its_reliable = + casted_e->get_target(true); + if (its_reliable) { + its_acks.push_back( + std::make_tuple(e->get_service(), e->get_instance(), + casted_e->get_eventgroup(), its_reliable)); + } + const std::shared_ptr<endpoint_definition> its_unreliable = + casted_e->get_target(false); + if (its_unreliable) { + its_acks.push_back( + std::make_tuple(e->get_service(), e->get_instance(), + casted_e->get_eventgroup(), + its_unreliable)); + } + } + } + return its_acks; +} + } // namespace sd } // namespace vsomeip diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 98a4a5a..1ddd80d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -2860,6 +2860,10 @@ if(NOT ${TESTS_BAT}) COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_PENDING_SUBSCRIPTION_MASTER_STARTER} SUBSCRIBE_UNSUBSCRIBE_SAME_PORT) set_tests_properties(${TEST_PENDING_SUBSCRIPTION_NAME}_alternating_subscribe_unsubscribe_same_port PROPERTIES TIMEOUT 180) + add_test(NAME ${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_resubscribe_mixed + COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_PENDING_SUBSCRIPTION_MASTER_STARTER} SUBSCRIBE_RESUBSCRIBE_MIXED) + set_tests_properties(${TEST_PENDING_SUBSCRIPTION_NAME}_subscribe_resubscribe_mixed PROPERTIES TIMEOUT 180) + # malicious data test add_test(NAME ${TEST_MALICIOUS_DATA_NAME} COMMAND ${PROJECT_BINARY_DIR}/test/${TEST_MALICIOUS_DATA_MASTER_STARTER}) diff --git a/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in b/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in index 3a5001c..beb5db2 100755 --- a/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in +++ b/test/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in @@ -17,7 +17,7 @@ then echo "Please pass a test mode to this script." echo "For example: $0 SUSCRIBE" echo "Valid subscription types include:" - echo " [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT]" + echo " [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT, SUBSCRIBE_RESUBSCRIBE_MIXED]" exit 1 fi TESTMODE=$1 diff --git a/test/pending_subscription_tests/pending_subscription_test_globals.hpp b/test/pending_subscription_tests/pending_subscription_test_globals.hpp index 2bc1322..ade1d6f 100644 --- a/test/pending_subscription_tests/pending_subscription_test_globals.hpp +++ b/test/pending_subscription_tests/pending_subscription_test_globals.hpp @@ -25,7 +25,8 @@ enum test_mode_e { SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, - SUBSCRIBE_UNSUBSCRIBE_SAME_PORT + SUBSCRIBE_UNSUBSCRIBE_SAME_PORT, + SUBSCRIBE_RESUBSCRIBE_MIXED }; } diff --git a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp index c9e686d..dd7ed5a 100644 --- a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp +++ b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp @@ -147,7 +147,7 @@ TEST_F(pending_subscription, send_multiple_subscriptions) std::thread send_thread([&]() { try { - std::uint8_t its_offer_service_message[] = { + std::uint8_t its_subscribe_message[] = { 0xff, 0xff, 0x81, 0x00, 0x00, 0x00, 0x00, 0x40, // length 0x00, 0x00, 0x00, 0x01, @@ -169,14 +169,14 @@ TEST_F(pending_subscription, send_multiple_subscriptions) }; boost::asio::ip::address its_local_address = boost::asio::ip::address::from_string(std::string(local_address)); - std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4); + std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4); boost::asio::ip::udp::socket::endpoint_type target_sd( boost::asio::ip::address::from_string(std::string(remote_address)), 30490); for (int var = 0; var < 15; ++var) { - udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd); - ++its_offer_service_message[11]; + udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd); + ++its_subscribe_message[11]; } @@ -331,7 +331,7 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe) std::thread send_thread([&]() { try { - std::uint8_t its_offer_service_message[] = { + std::uint8_t its_subscribe_message[] = { 0xff, 0xff, 0x81, 0x00, 0x00, 0x00, 0x00, 0x40, // length 0x00, 0x00, 0x00, 0x01, @@ -354,20 +354,20 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe) boost::asio::ip::address its_local_address = boost::asio::ip::address::from_string(std::string(local_address)); - std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4); + std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4); boost::asio::ip::udp::socket::endpoint_type target_sd( boost::asio::ip::address::from_string(std::string(remote_address)), 30490); for (int var = 0; var < 15; ++var) { - udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd); - ++its_offer_service_message[11]; - if (its_offer_service_message[11] % 2) { - its_offer_service_message[35] = 16; - its_offer_service_message[51] = 16; + udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd); + ++its_subscribe_message[11]; + if (its_subscribe_message[11] % 2) { + its_subscribe_message[35] = 16; + its_subscribe_message[51] = 16; } else { - its_offer_service_message[35] = 0; - its_offer_service_message[51] = 0; + its_subscribe_message[35] = 0; + its_subscribe_message[51] = 0; } } @@ -522,7 +522,7 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions) std::thread send_thread([&]() { try { - std::uint8_t its_offer_service_message[] = { + std::uint8_t its_subscribe_message[] = { 0xff, 0xff, 0x81, 0x00, 0x00, 0x00, 0x00, 0x40, // length 0x00, 0x00, 0x00, 0x01, @@ -545,21 +545,21 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions) boost::asio::ip::address its_local_address = boost::asio::ip::address::from_string(std::string(local_address)); - std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4); + std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4); boost::asio::ip::udp::socket::endpoint_type target_sd( boost::asio::ip::address::from_string(std::string(remote_address)), 30490); for (int var = 0; var < 15; ++var) { - if (its_offer_service_message[11] == 15 || its_offer_service_message[11] == 0x1) { - its_offer_service_message[35] = 16; - its_offer_service_message[51] = 16; + if (its_subscribe_message[11] == 15 || its_subscribe_message[11] == 0x1) { + its_subscribe_message[35] = 16; + its_subscribe_message[51] = 16; } else { - its_offer_service_message[35] = 0; - its_offer_service_message[51] = 0; + its_subscribe_message[35] = 0; + its_subscribe_message[51] = 0; } - udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd); - ++its_offer_service_message[11]; + udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd); + ++its_subscribe_message[11]; } if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) { @@ -724,7 +724,7 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe) std::thread send_thread([&]() { try { - std::uint8_t its_offer_service_message[] = { + std::uint8_t its_subscribe_message[] = { 0xff, 0xff, 0x81, 0x00, 0x00, 0x00, 0x00, 0x40, // length 0x00, 0x00, 0x00, 0x01, @@ -747,20 +747,20 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe) boost::asio::ip::address its_local_address = boost::asio::ip::address::from_string(std::string(local_address)); - std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4); + std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4); boost::asio::ip::udp::socket::endpoint_type target_sd( boost::asio::ip::address::from_string(std::string(remote_address)), 30490); for (int var = 0; var < 15; ++var) { - udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd); - ++its_offer_service_message[11]; - if (its_offer_service_message[11] % 2) { - its_offer_service_message[35] = 16; - its_offer_service_message[51] = 16; + udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd); + ++its_subscribe_message[11]; + if (its_subscribe_message[11] % 2) { + its_subscribe_message[35] = 16; + its_subscribe_message[51] = 16; } else { - its_offer_service_message[35] = 0; - its_offer_service_message[51] = 0; + its_subscribe_message[35] = 0; + its_subscribe_message[51] = 0; } } @@ -927,7 +927,7 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port) } try { - std::uint8_t its_offer_service_message[] = { + std::uint8_t its_subscribe_message[] = { 0xff, 0xff, 0x81, 0x00, 0x00, 0x00, 0x00, 0x4C, // length 0x00, 0x00, 0x00, 0x01, @@ -953,21 +953,21 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port) boost::asio::ip::address its_local_address = boost::asio::ip::address::from_string(std::string(local_address)); - std::memcpy(&its_offer_service_message[64], &its_local_address.to_v4().to_bytes()[0], 4); - std::memcpy(&its_offer_service_message[76], &its_local_address.to_v4().to_bytes()[0], 4); + std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4); + std::memcpy(&its_subscribe_message[76], &its_local_address.to_v4().to_bytes()[0], 4); boost::asio::ip::udp::socket::endpoint_type target_sd( boost::asio::ip::address::from_string(std::string(remote_address)), 30490); for (int var = 0; var < 15; ++var) { - udp_socket.send_to(boost::asio::buffer(its_offer_service_message), target_sd); - ++its_offer_service_message[11]; - if (its_offer_service_message[11] % 2) { - its_offer_service_message[35] = 16; - its_offer_service_message[51] = 16; + udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd); + ++its_subscribe_message[11]; + if (its_subscribe_message[11] % 2) { + its_subscribe_message[35] = 16; + its_subscribe_message[51] = 16; } else { - its_offer_service_message[35] = 0; - its_offer_service_message[51] = 0; + its_subscribe_message[35] = 0; + its_subscribe_message[51] = 0; } } @@ -1006,6 +1006,203 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port) receive_thread.join(); } +/* + * @test Send a subscription as single message and afterwards send a + * resubscription containing a new subscription in the same message and check + * to receive initial event + */ +TEST_F(pending_subscription, subscribe_resubscribe_mixed) +{ + std::promise<void> first_initial_event_received; + std::promise<void> second_initial_event_received; + + boost::asio::ip::udp::socket udp_socket(io_, + boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + std::thread receive_thread([&](){ + std::atomic<bool> keep_receiving(true); + std::function<void()> receive; + std::vector<std::uint8_t> receive_buffer(4096); + std::vector<vsomeip::event_t> its_received_events; + + const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&]( + const boost::system::error_code& error, std::size_t bytes_transferred) { + if (error) { + keep_receiving = false; + ADD_FAILURE() << __func__ << " error: " << error.message(); + return; + } + #if 0 + std::stringstream str; + for (size_t i = 0; i < bytes_transferred; i++) { + str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " "; + } + std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl; + #endif + + vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0); + vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN], + receive_buffer[VSOMEIP_SERVICE_POS_MAX]); + vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN], + receive_buffer[VSOMEIP_METHOD_POS_MAX]); + if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) { + vsomeip::sd::message_impl sd_msg; + EXPECT_TRUE(sd_msg.deserialize(&its_deserializer)); + EXPECT_GE(2u, sd_msg.get_entries().size()); + for (auto e : sd_msg.get_entries()) { + EXPECT_TRUE(e->is_eventgroup_entry()); + EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type()); + EXPECT_EQ(3u, e->get_ttl()); + EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service()); + EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance()); + if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) { + std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry = + std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e); + EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id || + its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1); + } + } + EXPECT_EQ(0u, sd_msg.get_options().size()); + } else { // non-sd-message + vsomeip::message_impl msg; + EXPECT_TRUE(msg.deserialize(&its_deserializer)); + if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) { + EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type()); + EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service()); + EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method()); + EXPECT_EQ(0x2222, msg.get_client()); + } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) { + its_received_events.push_back(msg.get_method()); + if (its_received_events.size() == 2) { + EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]); + EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[1]); + } + EXPECT_EQ(1u, msg.get_payload()->get_length()); + EXPECT_EQ(0xDD, *msg.get_payload()->get_data()); + EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service()); + EXPECT_EQ(0x0, msg.get_client()); + } + } + + static int called = 0; + if (++called == 2) { + EXPECT_EQ(1u, its_received_events.size()); + EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]); + // all subscribeAcks and one initial event of first event received + first_initial_event_received.set_value(); + } + if (called == 4) { // events were received as well + // all subscribeAcks and one initial event of second event received + EXPECT_EQ(2u, its_received_events.size()); + EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]); + EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[1]); + keep_receiving = false; + second_initial_event_received.set_value(); + } + if (!error && keep_receiving) { + receive(); + } + }; + + receive = [&]() { + udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()), + receive_cbk); + }; + + receive(); + while(keep_receiving) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }); + + std::thread send_thread([&]() { + try { + // call notify method to ensure to receive initial events + std::uint8_t trigger_notifications_call[] = { + 0x11, 0x22, 0x42, 0x42, + 0x00, 0x00, 0x00, 0x08, + 0x22, 0x22, 0x00, 0x01, + 0x01, 0x00, 0x01, 0x00 }; + boost::asio::ip::udp::socket::endpoint_type target_service( + boost::asio::ip::address::from_string(std::string(remote_address)), + 30001); + udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service); + + std::uint8_t its_subscribe_message[] = { + 0xff, 0xff, 0x81, 0x00, + 0x00, 0x00, 0x00, 0x30, // length + 0x00, 0x00, 0x00, 0x01, + 0x01, 0x01, 0x02, 0x00, + 0xc0, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x10, // length entries array + 0x06, 0x00, 0x00, 0x10, + 0x11, 0x22, 0x00, 0x01, // service / instance + 0x00, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x10, 0x01, // eventgroup + 0x00, 0x00, 0x00, 0x0c, // length options array + 0x00, 0x09, 0x04, 0x00, + 0xff, 0xff, 0xff, 0xff, // ip address + 0x00, 0x11, 0x77, 0x1a + }; + boost::asio::ip::address its_local_address = + boost::asio::ip::address::from_string(std::string(local_address)); + std::memcpy(&its_subscribe_message[48], &its_local_address.to_v4().to_bytes()[0], 4); + + boost::asio::ip::udp::socket::endpoint_type target_sd( + boost::asio::ip::address::from_string(std::string(remote_address)), + 30490); + + udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd); + + + + if (std::future_status::timeout == first_initial_event_received.get_future().wait_for(std::chrono::seconds(10))) { + ADD_FAILURE() << "Didn't receive all SubscribeAck of first subscription within time"; + } + + // send second subscription with resubscription and new subscription + std::uint8_t its_subscribe_resubscribe_message[] = { + 0xff, 0xff, 0x81, 0x00, + 0x00, 0x00, 0x00, 0x40, // length + 0x00, 0x00, 0x00, 0x02, + 0x01, 0x01, 0x02, 0x00, + 0xc0, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x20, // length entries array + 0x06, 0x00, 0x00, 0x10, + 0x11, 0x22, 0x00, 0x01, // service / instance + 0x00, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x10, 0x00, // eventgroup + 0x06, 0x00, 0x00, 0x10, + 0x11, 0x22, 0x00, 0x01, // service / instance + 0x00, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x10, 0x01, // eventgroup 2 + 0x00, 0x00, 0x00, 0x0c, // length options array + 0x00, 0x09, 0x04, 0x00, + 0xff, 0xff, 0xff, 0xff, // ip address + 0x00, 0x11, 0x77, 0x1a + }; + std::memcpy(&its_subscribe_resubscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4); + udp_socket.send_to(boost::asio::buffer(its_subscribe_resubscribe_message), target_sd); + + if (std::future_status::timeout == second_initial_event_received.get_future().wait_for(std::chrono::seconds(10))) { + ADD_FAILURE() << "Didn't receive all SubscribeAck of second subscription within time"; + } + // call shutdown method + std::uint8_t shutdown_call[] = { + 0x11, 0x22, 0x14, 0x04, + 0x00, 0x00, 0x00, 0x08, + 0x22, 0x22, 0x00, 0x01, + 0x01, 0x00, 0x00, 0x00 }; + udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service); + } catch (...) { + ASSERT_FALSE(true); + } + + }); + + send_thread.join(); + receive_thread.join(); +} + #ifndef _WIN32 int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); @@ -1028,6 +1225,8 @@ int main(int argc, char** argv) { ::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_nack_unsubscribe"; } else if (its_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE_SAME_PORT")) { ::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_unsubscribe_same_port"; + } else if (its_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) { + ::testing::GTEST_FLAG(filter) = "*subscribe_resubscribe_mixed"; } return RUN_ALL_TESTS(); } diff --git a/test/pending_subscription_tests/pending_subscription_test_service.cpp b/test/pending_subscription_tests/pending_subscription_test_service.cpp index 0e4878a..59e7aa9 100644 --- a/test/pending_subscription_tests/pending_subscription_test_service.cpp +++ b/test/pending_subscription_tests/pending_subscription_test_service.cpp @@ -146,6 +146,8 @@ public: ; } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT) { ; + } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) { + ; } std::future<bool> itsFuture = notify_method_called_.get_future(); if (std::future_status::timeout == itsFuture.wait_for(std::chrono::seconds(10))) { @@ -218,6 +220,13 @@ public: if (count_subscribe == 16 || count_unsubscribe == 14) { subscription_accepted_asynchronous_ = true; } + } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) { + static int was_called = 0; + was_called++; + EXPECT_EQ(1, was_called); + EXPECT_TRUE(_subscribed); + _cbk(true); + subscription_accepted_asynchronous_ = true; } } @@ -280,6 +289,13 @@ public: subscription_accepted_synchronous_ = true; } ret = true; + } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) { + static int was_called = 0; + was_called++; + EXPECT_EQ(1, was_called); + EXPECT_TRUE(_subscribed); + subscription_accepted_synchronous_ = true; + ret = true; } return ret; } @@ -330,6 +346,8 @@ int main(int argc, char** argv) its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_NACK; } else if (its_pased_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE_SAME_PORT")) { its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT; + } else if (its_pased_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) { + its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED; } return RUN_ALL_TESTS(); |