diff options
author | Juergen Gehring <juergen.gehring@bmw.de> | 2017-06-20 02:53:03 -0700 |
---|---|---|
committer | Juergen Gehring <juergen.gehring@bmw.de> | 2017-06-20 02:53:03 -0700 |
commit | 27698301f8bb528c2f618af5995865523de7e0d6 (patch) | |
tree | fb91dddc521348bb99c2a5046cbe95efa3c111e1 | |
parent | cf67875117ef7b1b9a25fe1f23e8b7ba1197c934 (diff) | |
download | vSomeIP-27698301f8bb528c2f618af5995865523de7e0d6.tar.gz |
vSomeIP 2.6.42.6.4
-rw-r--r-- | CHANGES | 4 | ||||
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | implementation/endpoints/include/client_endpoint_impl.hpp | 6 | ||||
-rw-r--r-- | implementation/endpoints/include/tcp_client_endpoint_impl.hpp | 1 | ||||
-rw-r--r-- | implementation/endpoints/include/udp_client_endpoint_impl.hpp | 1 | ||||
-rw-r--r-- | implementation/endpoints/src/client_endpoint_impl.cpp | 52 | ||||
-rw-r--r-- | implementation/endpoints/src/local_client_endpoint_impl.cpp | 23 | ||||
-rw-r--r-- | implementation/endpoints/src/tcp_client_endpoint_impl.cpp | 57 | ||||
-rw-r--r-- | implementation/endpoints/src/udp_client_endpoint_impl.cpp | 26 | ||||
-rw-r--r-- | implementation/service_discovery/src/service_discovery_impl.cpp | 105 |
10 files changed, 167 insertions, 110 deletions
@@ -302,3 +302,7 @@ v2.6.3 - Introduce 'max-payload-size-reliable' json file parameter which can be used to globally limit the maximum allowed payload size for TCP communication - Added CRC checksum calculation for bit optimized messages + +v2.6.4 +- Fix bug in reboot detection of other nodes +- Improve restarting of TCP connections diff --git a/CMakeLists.txt b/CMakeLists.txt index a4b84e7..15a7d59 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ project (vsomeip) set (VSOMEIP_MAJOR_VERSION 2) set (VSOMEIP_MINOR_VERSION 6) -set (VSOMEIP_PATCH_VERSION 3) +set (VSOMEIP_PATCH_VERSION 4) set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION}) set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in set (CMAKE_VERBOSE_MAKEFILE off) diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index ef52067..141e95f 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -46,7 +46,7 @@ public: bool flush();
void stop();
- void restart();
+ virtual void restart() = 0;
bool is_client() const;
@@ -68,9 +68,11 @@ public: protected:
virtual void send_queued() = 0;
void shutdown_and_close_socket();
+ void shutdown_and_close_socket_unlocked();
+ void start_connect_timer();
mutable std::mutex socket_mutex_;
- socket_type socket_;
+ std::unique_ptr<socket_type> socket_;
endpoint_type remote_;
boost::asio::steady_timer flush_timer_;
diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp index 20eae38..cd8420a 100644 --- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp @@ -28,6 +28,7 @@ public: virtual ~tcp_client_endpoint_impl();
void start();
+ void restart();
bool get_remote_address(boost::asio::ip::address &_address) const;
unsigned short get_local_port() const;
diff --git a/implementation/endpoints/include/udp_client_endpoint_impl.hpp b/implementation/endpoints/include/udp_client_endpoint_impl.hpp index 0357303..0734d47 100644 --- a/implementation/endpoints/include/udp_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/udp_client_endpoint_impl.hpp @@ -33,6 +33,7 @@ public: virtual ~udp_client_endpoint_impl();
void start();
+ void restart();
void receive_cbk(boost::system::error_code const &_error,
std::size_t _bytes);
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index b5d6c7e..c051194 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -32,7 +32,7 @@ client_endpoint_impl<Protocol>::client_endpoint_impl( boost::asio::io_service &_io,
std::uint32_t _max_message_size)
: endpoint_impl<Protocol>(_host, _local, _io, _max_message_size),
- socket_(_io), remote_(_remote),
+ socket_(new socket_type(_io)), remote_(_remote),
flush_timer_(_io), connect_timer_(_io),
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
is_connected_(false),
@@ -70,7 +70,7 @@ void client_endpoint_impl<Protocol>::stop() { bool is_open(false);
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- is_open = socket_.is_open();
+ is_open = socket_->is_open();
}
if (is_open) {
bool send_queue_empty(false);
@@ -92,24 +92,6 @@ void client_endpoint_impl<Protocol>::stop() { }
template<typename Protocol>
-void client_endpoint_impl<Protocol>::restart() {
- {
- std::lock_guard<std::mutex> its_lock(mutex_);
- queue_.clear();
- }
- shutdown_and_close_socket();
- is_connected_ = false;
- {
- std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
- connect_timer_.expires_from_now(
- std::chrono::milliseconds(connect_timeout_));
- connect_timer_.async_wait(
- std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
- this->shared_from_this(), std::placeholders::_1));
- }
-}
-
-template<typename Protocol>
bool client_endpoint_impl<Protocol>::send_to(
const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
uint32_t _size, bool _flush) {
@@ -212,14 +194,7 @@ void client_endpoint_impl<Protocol>::connect_cbk( if (its_host) {
if (_error && _error != boost::asio::error::already_connected) {
shutdown_and_close_socket();
- {
- std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
- connect_timer_.expires_from_now(
- std::chrono::milliseconds(connect_timeout_));
- connect_timer_.async_wait(
- std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
- this->shared_from_this(), std::placeholders::_1));
- }
+ start_connect_timer();
// Double the timeout as long as the maximum allowed is larger
if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
connect_timeout_ = (connect_timeout_ << 1);
@@ -302,10 +277,15 @@ void client_endpoint_impl<Protocol>::flush_cbk( template<typename Protocol>
void client_endpoint_impl<Protocol>::shutdown_and_close_socket() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if (socket_.is_open()) {
+ shutdown_and_close_socket_unlocked();
+}
+
+template<typename Protocol>
+void client_endpoint_impl<Protocol>::shutdown_and_close_socket_unlocked() {
+ if (socket_->is_open()) {
boost::system::error_code its_error;
- socket_.shutdown(Protocol::socket::shutdown_both, its_error);
- socket_.close(its_error);
+ socket_->shutdown(Protocol::socket::shutdown_both, its_error);
+ socket_->close(its_error);
}
}
@@ -321,6 +301,16 @@ unsigned short client_endpoint_impl<Protocol>::get_remote_port() const { return 0;
}
+template<typename Protocol>
+void client_endpoint_impl<Protocol>::start_connect_timer() {
+ std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
+ connect_timer_.expires_from_now(
+ std::chrono::milliseconds(connect_timeout_));
+ connect_timer_.async_wait(
+ std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
+ this->shared_from_this(), std::placeholders::_1));
+}
+
// Instantiate template
#ifndef _WIN32
template class client_endpoint_impl<boost::asio::local::stream_protocol>;
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index 4c79047..e9de8f7 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -44,11 +44,18 @@ bool local_client_endpoint_impl::is_local() const { }
void local_client_endpoint_impl::restart() {
+ is_connected_ = false;
{
std::lock_guard<std::mutex> its_lock(mutex_);
sending_blocked_ = false;
+ queue_.clear();
}
- client_endpoint_impl::restart();
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ shutdown_and_close_socket_unlocked();
+ socket_.reset(new socket_type(service_));
+ }
+ start_connect_timer();
}
void local_client_endpoint_impl::start() {
@@ -60,15 +67,15 @@ void local_client_endpoint_impl::connect() { {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
- socket_.open(remote_.protocol(), its_error);
+ socket_->open(remote_.protocol(), its_error);
if (!its_error || its_error == boost::asio::error::already_open) {
- socket_.set_option(boost::asio::socket_base::reuse_address(true), its_error);
+ socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "local_client_endpoint_impl::connect: "
<< "couldn't enable SO_REUSEADDR: " << its_error.message();
}
- socket_.connect(remote_, its_connect_error);
+ socket_->connect(remote_, its_connect_error);
// Credentials
#ifndef _WIN32
@@ -76,7 +83,7 @@ void local_client_endpoint_impl::connect() { auto its_host = host_.lock();
if (its_host) {
if (its_host->get_configuration()->is_security_enabled()) {
- credentials::send_credentials(socket_.native(),
+ credentials::send_credentials(socket_->native(),
its_host->get_client());
}
}
@@ -102,8 +109,8 @@ void local_client_endpoint_impl::connect() { void local_client_endpoint_impl::receive() {
#ifndef _WIN32
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if (socket_.is_open()) {
- socket_.async_receive(
+ if (socket_->is_open()) {
+ socket_->async_receive(
boost::asio::buffer(recv_buffer_),
std::bind(
&local_client_endpoint_impl::receive_cbk,
@@ -146,7 +153,7 @@ VSOMEIP_INFO << msg.str(); {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::asio::async_write(
- socket_,
+ *socket_,
bufs,
std::bind(
&client_endpoint_impl::send_cbk,
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index e89bc5c..cee27ae 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -54,20 +54,37 @@ void tcp_client_endpoint_impl::start() { connect();
}
+void tcp_client_endpoint_impl::restart() {
+ is_connected_ = false;
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ shutdown_and_close_socket_unlocked();
+ recv_buffer_size_ = 0;
+ recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
+ recv_buffer_.shrink_to_fit();
+ socket_.reset(new socket_type(service_));
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.clear();
+ }
+ start_connect_timer();
+}
+
void tcp_client_endpoint_impl::connect() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
- socket_.open(remote_.protocol(), its_error);
+ socket_->open(remote_.protocol(), its_error);
if (!its_error || its_error == boost::asio::error::already_open) {
// Nagle algorithm off
- socket_.set_option(ip::tcp::no_delay(true), its_error);
+ socket_->set_option(ip::tcp::no_delay(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't disable "
<< "Nagle algorithm: " << its_error.message();
}
- socket_.set_option(boost::asio::socket_base::keep_alive(true), its_error);
+ socket_->set_option(boost::asio::socket_base::keep_alive(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
<< "keep_alive: " << its_error.message();
@@ -76,7 +93,7 @@ void tcp_client_endpoint_impl::connect() { // Enable SO_REUSEADDR to avoid bind problems with services going offline
// and coming online again and the user has specified only a small number
// of ports in the clients section for one service instance
- socket_.set_option(boost::asio::socket_base::reuse_address(true), its_error);
+ socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
<< "SO_REUSEADDR: " << its_error.message();
@@ -85,14 +102,14 @@ void tcp_client_endpoint_impl::connect() { // bind to it before connecting
if (local_.port() != ILLEGAL_PORT) {
boost::system::error_code its_bind_error;
- socket_.bind(local_, its_bind_error);
+ socket_->bind(local_, its_bind_error);
if(its_bind_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
"Error binding socket: " << its_bind_error.message();
}
}
- socket_.async_connect(
+ socket_->async_connect(
remote_,
std::bind(
&tcp_client_endpoint_base_impl::connect_cbk,
@@ -108,7 +125,7 @@ void tcp_client_endpoint_impl::connect() { void tcp_client_endpoint_impl::receive() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if(socket_.is_open()) {
+ if(socket_->is_open()) {
const std::size_t its_capacity(recv_buffer_.capacity());
size_t buffer_size = its_capacity - recv_buffer_size_;
try {
@@ -137,7 +154,7 @@ void tcp_client_endpoint_impl::receive() { // don't start receiving again
return;
}
- socket_.async_receive(
+ socket_->async_receive(
boost::asio::buffer(&recv_buffer_[recv_buffer_size_], buffer_size),
std::bind(
&tcp_client_endpoint_impl::receive_cbk,
@@ -172,7 +189,7 @@ void tcp_client_endpoint_impl::send_queued() { {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::asio::async_write(
- socket_,
+ *socket_,
boost::asio::buffer(*its_buffer),
std::bind(
&tcp_client_endpoint_base_impl::send_cbk,
@@ -196,8 +213,8 @@ bool tcp_client_endpoint_impl::get_remote_address( unsigned short tcp_client_endpoint_impl::get_local_port() const {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
- if (socket_.is_open()) {
- endpoint_type its_endpoint = socket_.local_endpoint(its_error);
+ if (socket_->is_open()) {
+ endpoint_type its_endpoint = socket_->local_endpoint(its_error);
if (!its_error) {
return its_endpoint.port();
} else {
@@ -248,6 +265,10 @@ void tcp_client_endpoint_impl::receive_cbk( << (int) recv_buffer_[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
+ std::unique_lock<std::mutex> its_lock(socket_mutex_);
+ if (!is_connected_) {
+ return;
+ }
std::shared_ptr<endpoint_host> its_host = host_.lock();
if (its_host) {
if (!_error && 0 < _bytes) {
@@ -369,14 +390,16 @@ void tcp_client_endpoint_impl::receive_cbk( missing_capacity_ = 0;
}
}
+ its_lock.unlock();
receive();
} else {
if (_error == boost::asio::error::connection_reset ||
_error == boost::asio::error::eof ||
_error == boost::asio::error::timed_out) {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk error detected: " << _error.message();
- shutdown_and_close_socket();
+ shutdown_and_close_socket_unlocked();
} else {
+ its_lock.unlock();
receive();
}
}
@@ -413,8 +436,8 @@ const std::string tcp_client_endpoint_impl::get_address_port_local() const { std::string its_address_port;
its_address_port.reserve(21);
boost::system::error_code ec;
- if (socket_.is_open()) {
- endpoint_type its_local_endpoint = socket_.local_endpoint(ec);
+ if (socket_->is_open()) {
+ endpoint_type its_local_endpoint = socket_->local_endpoint(ec);
if (!ec) {
its_address_port += its_local_endpoint.address().to_string(ec);
its_address_port += ":";
@@ -455,10 +478,10 @@ void tcp_client_endpoint_impl::handle_recv_buffer_exception( boost::system::error_code ec;
connect_timer_.cancel(ec);
}
- if (socket_.is_open()) {
+ if (socket_->is_open()) {
boost::system::error_code its_error;
- socket_.shutdown(socket_type::shutdown_both, its_error);
- socket_.close(its_error);
+ socket_->shutdown(socket_type::shutdown_both, its_error);
+ socket_->close(its_error);
}
}
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index 66aab17..cd51f47 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -44,14 +44,14 @@ void udp_client_endpoint_impl::connect() { // bind to it before connecting
if (local_.port() != ILLEGAL_PORT) {
boost::system::error_code its_bind_error;
- socket_.bind(local_, its_bind_error);
+ socket_->bind(local_, its_bind_error);
if(its_bind_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
"Error binding socket: " << its_bind_error.message();
}
}
- socket_.async_connect(
+ socket_->async_connect(
remote_,
std::bind(
&udp_client_endpoint_base_impl::connect_cbk,
@@ -65,7 +65,7 @@ void udp_client_endpoint_impl::start() { boost::system::error_code its_error;
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- socket_.open(remote_.protocol(), its_error);
+ socket_->open(remote_.protocol(), its_error);
}
if (!its_error || its_error == boost::asio::error::already_open) {
connect();
@@ -75,6 +75,16 @@ void udp_client_endpoint_impl::start() { }
}
+void udp_client_endpoint_impl::restart() {
+ is_connected_ = false;
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.clear();
+ }
+ shutdown_and_close_socket();
+ start_connect_timer();
+}
+
void udp_client_endpoint_impl::send_queued() {
message_buffer_ptr_t its_buffer;
if(queue_.size()) {
@@ -93,7 +103,7 @@ void udp_client_endpoint_impl::send_queued() { #endif
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- socket_.async_send(
+ socket_->async_send(
boost::asio::buffer(*its_buffer),
std::bind(
&udp_client_endpoint_base_impl::send_cbk,
@@ -107,10 +117,10 @@ void udp_client_endpoint_impl::send_queued() { void udp_client_endpoint_impl::receive() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if (!socket_.is_open()) {
+ if (!socket_->is_open()) {
return;
}
- socket_.async_receive_from(
+ socket_->async_receive_from(
boost::asio::buffer(&recv_buffer_[0], max_message_size_),
remote_,
std::bind(
@@ -136,8 +146,8 @@ bool udp_client_endpoint_impl::get_remote_address( unsigned short udp_client_endpoint_impl::get_local_port() const {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
- if (socket_.is_open()) {
- endpoint_type its_endpoint = socket_.local_endpoint(its_error);
+ if (socket_->is_open()) {
+ endpoint_type its_endpoint = socket_->local_endpoint(its_error);
if (!its_error) {
return its_endpoint.port();
} else {
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index e8d29ef..79e60bd 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -306,7 +306,8 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, } } } - if(has_address && 0 < its_message->get_entries().size()) { + if (has_address && its_message->get_entries().size() + && its_message->get_options().size()) { serialize_and_send(its_message, its_address); } } @@ -471,7 +472,8 @@ void service_discovery_impl::unsubscribe(service_t _service, } } } - if (has_address && 0 < its_message->get_entries().size()) { + if (has_address && its_message->get_entries().size() + && its_message->get_options().size()) { serialize_and_send(its_message, its_address); } } @@ -552,7 +554,8 @@ void service_discovery_impl::unsubscribe_client(service_t _service, } } } - if (has_address && 0 < its_message->get_entries().size()) { + if (has_address && its_message->get_entries().size() + && its_message->get_options().size()) { serialize_and_send(its_message, its_address); } } @@ -589,7 +592,13 @@ bool service_discovery_impl::is_reboot( auto its_received = sessions_received_.find(_sender); bool is_multicast = _destination.is_multicast(); + // Initialize both sessions with 0. Thus, the session identifier + // for the session not being received from the network is stored + // as 0 and will never trigger the reboot detection. session_t its_multicast_session(0), its_unicast_session(0); + + // Initialize both flags with true. Thus, the flag not being + // received from the network will never trigger the reboot detection. bool its_multicast_reboot_flag(true), its_unicast_reboot_flag(true); if (is_multicast) { @@ -603,30 +612,31 @@ bool service_discovery_impl::is_reboot( if (its_received == sessions_received_.end()) { sessions_received_[_sender] = std::make_tuple(its_multicast_session, its_unicast_session, - its_multicast_reboot_flag, its_unicast_reboot_flag); + its_multicast_reboot_flag, its_unicast_reboot_flag); } else { // Reboot detection: Either the flag has changed from false to true, - // or the session identifier overrun while the flag is true - if ((is_multicast && !std::get<2>(its_received->second) && _reboot_flag) - || (!is_multicast && !std::get<3>(its_received->second) && _reboot_flag)) { - result = true; - } else { - session_t its_old_session; - bool its_old_reboot_flag; - - if (is_multicast) { - its_old_session = std::get<0>(its_received->second); - its_old_reboot_flag = std::get<2>(its_received->second); - } else { - its_old_session = std::get<1>(its_received->second); - its_old_reboot_flag = std::get<3>(its_received->second); - } - - if (its_old_reboot_flag && _reboot_flag - && its_old_session >= _session) { - result = true; - } - } + // or the session identifier overrun while the flag is true. + if (_reboot_flag + && ((is_multicast && !std::get<2>(its_received->second)) + || (!is_multicast && !std::get<3>(its_received->second)))) { + result = true; + } else { + session_t its_old_session; + bool its_old_reboot_flag; + + if (is_multicast) { + its_old_session = std::get<0>(its_received->second); + its_old_reboot_flag = std::get<2>(its_received->second); + } else { + its_old_session = std::get<1>(its_received->second); + its_old_reboot_flag = std::get<3>(its_received->second); + } + + if (its_old_reboot_flag && _reboot_flag + && its_old_session >= _session) { + result = true; + } + } if (result == false) { // no reboot -> update session/flag @@ -635,10 +645,10 @@ bool service_discovery_impl::is_reboot( std::get<2>(its_received->second) = its_multicast_reboot_flag; } else { std::get<1>(its_received->second) = its_unicast_session; - std::get<3>(its_received->second) = its_multicast_reboot_flag; + std::get<3>(its_received->second) = its_unicast_reboot_flag; } } else { - // reboot -> reset the session + // reboot -> reset the sender data sessions_received_.erase(_sender); } } @@ -882,16 +892,20 @@ void service_discovery_impl::insert_subscription( std::shared_ptr < endpoint > its_endpoint; if (_insert_reliable) { its_endpoint = _subscription->get_endpoint(true); - if (its_endpoint && its_endpoint->get_local_port()) { - insert_option(_message, its_entry, unicast_, - its_endpoint->get_local_port(), true); + if (its_endpoint) { + const std::uint16_t its_port = its_endpoint->get_local_port(); + if (its_port) { + insert_option(_message, its_entry, unicast_, its_port, true); + } } } if (_insert_unreliable) { its_endpoint = _subscription->get_endpoint(false); if (its_endpoint && its_endpoint->get_local_port()) { - insert_option(_message, its_entry, unicast_, - its_endpoint->get_local_port(), false); + const std::uint16_t its_port = its_endpoint->get_local_port(); + if (its_port) { + insert_option(_message, its_entry, unicast_, its_port, false); + } } } } @@ -928,17 +942,19 @@ void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared std::shared_ptr < endpoint > its_endpoint; its_endpoint = _subscription->get_endpoint(true); if (its_endpoint && its_endpoint->is_connected()) { - insert_option(_message, its_stop_entry, unicast_, - its_endpoint->get_local_port(), true); - insert_option(_message, its_entry, unicast_, - its_endpoint->get_local_port(), true); + const std::uint16_t its_port = its_endpoint->get_local_port(); + if (its_port) { + insert_option(_message, its_stop_entry, unicast_, its_port, true); + insert_option(_message, its_entry, unicast_, its_port, true); + } } its_endpoint = _subscription->get_endpoint(false); if (its_endpoint) { - insert_option(_message, its_stop_entry, unicast_, - its_endpoint->get_local_port(), false); - insert_option(_message, its_entry, unicast_, - its_endpoint->get_local_port(), false); + const std::uint16_t its_port = its_endpoint->get_local_port(); + if (its_port) { + insert_option(_message, its_stop_entry, unicast_, its_port, false); + insert_option(_message, its_entry, unicast_, its_port, false); + } } } @@ -1302,7 +1318,8 @@ void service_discovery_impl::process_offerservice_serviceentry( } } - if (0 < its_message->get_entries().size()) { + if (its_message->get_entries().size() + && its_message->get_options().size()) { std::shared_ptr<endpoint_definition> its_target; std::pair<session_t, bool> its_session; std::lock_guard<std::mutex> its_lock(serialize_mutex_); @@ -1480,7 +1497,8 @@ void service_discovery_impl::on_reliable_endpoint_connected( } } } - if (has_address && 0 < its_message->get_entries().size()) { + if (has_address && its_message->get_entries().size() + && its_message->get_options().size()) { serialize_and_send(its_message, its_address); } } @@ -2239,7 +2257,8 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ _instance, found_eventgroup.first, found_client->second, _reliable, !_reliable); } - if(0 < its_message->get_entries().size()) { + if (its_message->get_entries().size() + && its_message->get_options().size()) { subscription_messages.push_front({its_message, its_address}); found_client->second->set_acknowledged(false); } |