summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2017-06-20 02:53:03 -0700
committerJuergen Gehring <juergen.gehring@bmw.de>2017-06-20 02:53:03 -0700
commit27698301f8bb528c2f618af5995865523de7e0d6 (patch)
treefb91dddc521348bb99c2a5046cbe95efa3c111e1
parentcf67875117ef7b1b9a25fe1f23e8b7ba1197c934 (diff)
downloadvSomeIP-27698301f8bb528c2f618af5995865523de7e0d6.tar.gz
vSomeIP 2.6.42.6.4
-rw-r--r--CHANGES4
-rw-r--r--CMakeLists.txt2
-rw-r--r--implementation/endpoints/include/client_endpoint_impl.hpp6
-rw-r--r--implementation/endpoints/include/tcp_client_endpoint_impl.hpp1
-rw-r--r--implementation/endpoints/include/udp_client_endpoint_impl.hpp1
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp52
-rw-r--r--implementation/endpoints/src/local_client_endpoint_impl.cpp23
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp57
-rw-r--r--implementation/endpoints/src/udp_client_endpoint_impl.cpp26
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp105
10 files changed, 167 insertions, 110 deletions
diff --git a/CHANGES b/CHANGES
index 1d8200a..1837480 100644
--- a/CHANGES
+++ b/CHANGES
@@ -302,3 +302,7 @@ v2.6.3
- Introduce 'max-payload-size-reliable' json file parameter which can be used to
globally limit the maximum allowed payload size for TCP communication
- Added CRC checksum calculation for bit optimized messages
+
+v2.6.4
+- Fix bug in reboot detection of other nodes
+- Improve restarting of TCP connections
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a4b84e7..15a7d59 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -8,7 +8,7 @@ project (vsomeip)
set (VSOMEIP_MAJOR_VERSION 2)
set (VSOMEIP_MINOR_VERSION 6)
-set (VSOMEIP_PATCH_VERSION 3)
+set (VSOMEIP_PATCH_VERSION 4)
set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION})
set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in
set (CMAKE_VERBOSE_MAKEFILE off)
diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp
index ef52067..141e95f 100644
--- a/implementation/endpoints/include/client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/client_endpoint_impl.hpp
@@ -46,7 +46,7 @@ public:
bool flush();
void stop();
- void restart();
+ virtual void restart() = 0;
bool is_client() const;
@@ -68,9 +68,11 @@ public:
protected:
virtual void send_queued() = 0;
void shutdown_and_close_socket();
+ void shutdown_and_close_socket_unlocked();
+ void start_connect_timer();
mutable std::mutex socket_mutex_;
- socket_type socket_;
+ std::unique_ptr<socket_type> socket_;
endpoint_type remote_;
boost::asio::steady_timer flush_timer_;
diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
index 20eae38..cd8420a 100644
--- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
@@ -28,6 +28,7 @@ public:
virtual ~tcp_client_endpoint_impl();
void start();
+ void restart();
bool get_remote_address(boost::asio::ip::address &_address) const;
unsigned short get_local_port() const;
diff --git a/implementation/endpoints/include/udp_client_endpoint_impl.hpp b/implementation/endpoints/include/udp_client_endpoint_impl.hpp
index 0357303..0734d47 100644
--- a/implementation/endpoints/include/udp_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/udp_client_endpoint_impl.hpp
@@ -33,6 +33,7 @@ public:
virtual ~udp_client_endpoint_impl();
void start();
+ void restart();
void receive_cbk(boost::system::error_code const &_error,
std::size_t _bytes);
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index b5d6c7e..c051194 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -32,7 +32,7 @@ client_endpoint_impl<Protocol>::client_endpoint_impl(
boost::asio::io_service &_io,
std::uint32_t _max_message_size)
: endpoint_impl<Protocol>(_host, _local, _io, _max_message_size),
- socket_(_io), remote_(_remote),
+ socket_(new socket_type(_io)), remote_(_remote),
flush_timer_(_io), connect_timer_(_io),
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
is_connected_(false),
@@ -70,7 +70,7 @@ void client_endpoint_impl<Protocol>::stop() {
bool is_open(false);
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- is_open = socket_.is_open();
+ is_open = socket_->is_open();
}
if (is_open) {
bool send_queue_empty(false);
@@ -92,24 +92,6 @@ void client_endpoint_impl<Protocol>::stop() {
}
template<typename Protocol>
-void client_endpoint_impl<Protocol>::restart() {
- {
- std::lock_guard<std::mutex> its_lock(mutex_);
- queue_.clear();
- }
- shutdown_and_close_socket();
- is_connected_ = false;
- {
- std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
- connect_timer_.expires_from_now(
- std::chrono::milliseconds(connect_timeout_));
- connect_timer_.async_wait(
- std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
- this->shared_from_this(), std::placeholders::_1));
- }
-}
-
-template<typename Protocol>
bool client_endpoint_impl<Protocol>::send_to(
const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
uint32_t _size, bool _flush) {
@@ -212,14 +194,7 @@ void client_endpoint_impl<Protocol>::connect_cbk(
if (its_host) {
if (_error && _error != boost::asio::error::already_connected) {
shutdown_and_close_socket();
- {
- std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
- connect_timer_.expires_from_now(
- std::chrono::milliseconds(connect_timeout_));
- connect_timer_.async_wait(
- std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
- this->shared_from_this(), std::placeholders::_1));
- }
+ start_connect_timer();
// Double the timeout as long as the maximum allowed is larger
if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
connect_timeout_ = (connect_timeout_ << 1);
@@ -302,10 +277,15 @@ void client_endpoint_impl<Protocol>::flush_cbk(
template<typename Protocol>
void client_endpoint_impl<Protocol>::shutdown_and_close_socket() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if (socket_.is_open()) {
+ shutdown_and_close_socket_unlocked();
+}
+
+template<typename Protocol>
+void client_endpoint_impl<Protocol>::shutdown_and_close_socket_unlocked() {
+ if (socket_->is_open()) {
boost::system::error_code its_error;
- socket_.shutdown(Protocol::socket::shutdown_both, its_error);
- socket_.close(its_error);
+ socket_->shutdown(Protocol::socket::shutdown_both, its_error);
+ socket_->close(its_error);
}
}
@@ -321,6 +301,16 @@ unsigned short client_endpoint_impl<Protocol>::get_remote_port() const {
return 0;
}
+template<typename Protocol>
+void client_endpoint_impl<Protocol>::start_connect_timer() {
+ std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
+ connect_timer_.expires_from_now(
+ std::chrono::milliseconds(connect_timeout_));
+ connect_timer_.async_wait(
+ std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
+ this->shared_from_this(), std::placeholders::_1));
+}
+
// Instantiate template
#ifndef _WIN32
template class client_endpoint_impl<boost::asio::local::stream_protocol>;
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp
index 4c79047..e9de8f7 100644
--- a/implementation/endpoints/src/local_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp
@@ -44,11 +44,18 @@ bool local_client_endpoint_impl::is_local() const {
}
void local_client_endpoint_impl::restart() {
+ is_connected_ = false;
{
std::lock_guard<std::mutex> its_lock(mutex_);
sending_blocked_ = false;
+ queue_.clear();
}
- client_endpoint_impl::restart();
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ shutdown_and_close_socket_unlocked();
+ socket_.reset(new socket_type(service_));
+ }
+ start_connect_timer();
}
void local_client_endpoint_impl::start() {
@@ -60,15 +67,15 @@ void local_client_endpoint_impl::connect() {
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
- socket_.open(remote_.protocol(), its_error);
+ socket_->open(remote_.protocol(), its_error);
if (!its_error || its_error == boost::asio::error::already_open) {
- socket_.set_option(boost::asio::socket_base::reuse_address(true), its_error);
+ socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "local_client_endpoint_impl::connect: "
<< "couldn't enable SO_REUSEADDR: " << its_error.message();
}
- socket_.connect(remote_, its_connect_error);
+ socket_->connect(remote_, its_connect_error);
// Credentials
#ifndef _WIN32
@@ -76,7 +83,7 @@ void local_client_endpoint_impl::connect() {
auto its_host = host_.lock();
if (its_host) {
if (its_host->get_configuration()->is_security_enabled()) {
- credentials::send_credentials(socket_.native(),
+ credentials::send_credentials(socket_->native(),
its_host->get_client());
}
}
@@ -102,8 +109,8 @@ void local_client_endpoint_impl::connect() {
void local_client_endpoint_impl::receive() {
#ifndef _WIN32
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if (socket_.is_open()) {
- socket_.async_receive(
+ if (socket_->is_open()) {
+ socket_->async_receive(
boost::asio::buffer(recv_buffer_),
std::bind(
&local_client_endpoint_impl::receive_cbk,
@@ -146,7 +153,7 @@ VSOMEIP_INFO << msg.str();
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::asio::async_write(
- socket_,
+ *socket_,
bufs,
std::bind(
&client_endpoint_impl::send_cbk,
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index e89bc5c..cee27ae 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -54,20 +54,37 @@ void tcp_client_endpoint_impl::start() {
connect();
}
+void tcp_client_endpoint_impl::restart() {
+ is_connected_ = false;
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ shutdown_and_close_socket_unlocked();
+ recv_buffer_size_ = 0;
+ recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
+ recv_buffer_.shrink_to_fit();
+ socket_.reset(new socket_type(service_));
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.clear();
+ }
+ start_connect_timer();
+}
+
void tcp_client_endpoint_impl::connect() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
- socket_.open(remote_.protocol(), its_error);
+ socket_->open(remote_.protocol(), its_error);
if (!its_error || its_error == boost::asio::error::already_open) {
// Nagle algorithm off
- socket_.set_option(ip::tcp::no_delay(true), its_error);
+ socket_->set_option(ip::tcp::no_delay(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't disable "
<< "Nagle algorithm: " << its_error.message();
}
- socket_.set_option(boost::asio::socket_base::keep_alive(true), its_error);
+ socket_->set_option(boost::asio::socket_base::keep_alive(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
<< "keep_alive: " << its_error.message();
@@ -76,7 +93,7 @@ void tcp_client_endpoint_impl::connect() {
// Enable SO_REUSEADDR to avoid bind problems with services going offline
// and coming online again and the user has specified only a small number
// of ports in the clients section for one service instance
- socket_.set_option(boost::asio::socket_base::reuse_address(true), its_error);
+ socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
<< "SO_REUSEADDR: " << its_error.message();
@@ -85,14 +102,14 @@ void tcp_client_endpoint_impl::connect() {
// bind to it before connecting
if (local_.port() != ILLEGAL_PORT) {
boost::system::error_code its_bind_error;
- socket_.bind(local_, its_bind_error);
+ socket_->bind(local_, its_bind_error);
if(its_bind_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
"Error binding socket: " << its_bind_error.message();
}
}
- socket_.async_connect(
+ socket_->async_connect(
remote_,
std::bind(
&tcp_client_endpoint_base_impl::connect_cbk,
@@ -108,7 +125,7 @@ void tcp_client_endpoint_impl::connect() {
void tcp_client_endpoint_impl::receive() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if(socket_.is_open()) {
+ if(socket_->is_open()) {
const std::size_t its_capacity(recv_buffer_.capacity());
size_t buffer_size = its_capacity - recv_buffer_size_;
try {
@@ -137,7 +154,7 @@ void tcp_client_endpoint_impl::receive() {
// don't start receiving again
return;
}
- socket_.async_receive(
+ socket_->async_receive(
boost::asio::buffer(&recv_buffer_[recv_buffer_size_], buffer_size),
std::bind(
&tcp_client_endpoint_impl::receive_cbk,
@@ -172,7 +189,7 @@ void tcp_client_endpoint_impl::send_queued() {
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::asio::async_write(
- socket_,
+ *socket_,
boost::asio::buffer(*its_buffer),
std::bind(
&tcp_client_endpoint_base_impl::send_cbk,
@@ -196,8 +213,8 @@ bool tcp_client_endpoint_impl::get_remote_address(
unsigned short tcp_client_endpoint_impl::get_local_port() const {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
- if (socket_.is_open()) {
- endpoint_type its_endpoint = socket_.local_endpoint(its_error);
+ if (socket_->is_open()) {
+ endpoint_type its_endpoint = socket_->local_endpoint(its_error);
if (!its_error) {
return its_endpoint.port();
} else {
@@ -248,6 +265,10 @@ void tcp_client_endpoint_impl::receive_cbk(
<< (int) recv_buffer_[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
+ std::unique_lock<std::mutex> its_lock(socket_mutex_);
+ if (!is_connected_) {
+ return;
+ }
std::shared_ptr<endpoint_host> its_host = host_.lock();
if (its_host) {
if (!_error && 0 < _bytes) {
@@ -369,14 +390,16 @@ void tcp_client_endpoint_impl::receive_cbk(
missing_capacity_ = 0;
}
}
+ its_lock.unlock();
receive();
} else {
if (_error == boost::asio::error::connection_reset ||
_error == boost::asio::error::eof ||
_error == boost::asio::error::timed_out) {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk error detected: " << _error.message();
- shutdown_and_close_socket();
+ shutdown_and_close_socket_unlocked();
} else {
+ its_lock.unlock();
receive();
}
}
@@ -413,8 +436,8 @@ const std::string tcp_client_endpoint_impl::get_address_port_local() const {
std::string its_address_port;
its_address_port.reserve(21);
boost::system::error_code ec;
- if (socket_.is_open()) {
- endpoint_type its_local_endpoint = socket_.local_endpoint(ec);
+ if (socket_->is_open()) {
+ endpoint_type its_local_endpoint = socket_->local_endpoint(ec);
if (!ec) {
its_address_port += its_local_endpoint.address().to_string(ec);
its_address_port += ":";
@@ -455,10 +478,10 @@ void tcp_client_endpoint_impl::handle_recv_buffer_exception(
boost::system::error_code ec;
connect_timer_.cancel(ec);
}
- if (socket_.is_open()) {
+ if (socket_->is_open()) {
boost::system::error_code its_error;
- socket_.shutdown(socket_type::shutdown_both, its_error);
- socket_.close(its_error);
+ socket_->shutdown(socket_type::shutdown_both, its_error);
+ socket_->close(its_error);
}
}
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
index 66aab17..cd51f47 100644
--- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
@@ -44,14 +44,14 @@ void udp_client_endpoint_impl::connect() {
// bind to it before connecting
if (local_.port() != ILLEGAL_PORT) {
boost::system::error_code its_bind_error;
- socket_.bind(local_, its_bind_error);
+ socket_->bind(local_, its_bind_error);
if(its_bind_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
"Error binding socket: " << its_bind_error.message();
}
}
- socket_.async_connect(
+ socket_->async_connect(
remote_,
std::bind(
&udp_client_endpoint_base_impl::connect_cbk,
@@ -65,7 +65,7 @@ void udp_client_endpoint_impl::start() {
boost::system::error_code its_error;
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- socket_.open(remote_.protocol(), its_error);
+ socket_->open(remote_.protocol(), its_error);
}
if (!its_error || its_error == boost::asio::error::already_open) {
connect();
@@ -75,6 +75,16 @@ void udp_client_endpoint_impl::start() {
}
}
+void udp_client_endpoint_impl::restart() {
+ is_connected_ = false;
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.clear();
+ }
+ shutdown_and_close_socket();
+ start_connect_timer();
+}
+
void udp_client_endpoint_impl::send_queued() {
message_buffer_ptr_t its_buffer;
if(queue_.size()) {
@@ -93,7 +103,7 @@ void udp_client_endpoint_impl::send_queued() {
#endif
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- socket_.async_send(
+ socket_->async_send(
boost::asio::buffer(*its_buffer),
std::bind(
&udp_client_endpoint_base_impl::send_cbk,
@@ -107,10 +117,10 @@ void udp_client_endpoint_impl::send_queued() {
void udp_client_endpoint_impl::receive() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if (!socket_.is_open()) {
+ if (!socket_->is_open()) {
return;
}
- socket_.async_receive_from(
+ socket_->async_receive_from(
boost::asio::buffer(&recv_buffer_[0], max_message_size_),
remote_,
std::bind(
@@ -136,8 +146,8 @@ bool udp_client_endpoint_impl::get_remote_address(
unsigned short udp_client_endpoint_impl::get_local_port() const {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
- if (socket_.is_open()) {
- endpoint_type its_endpoint = socket_.local_endpoint(its_error);
+ if (socket_->is_open()) {
+ endpoint_type its_endpoint = socket_->local_endpoint(its_error);
if (!its_error) {
return its_endpoint.port();
} else {
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index e8d29ef..79e60bd 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -306,7 +306,8 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
}
}
}
- if(has_address && 0 < its_message->get_entries().size()) {
+ if (has_address && its_message->get_entries().size()
+ && its_message->get_options().size()) {
serialize_and_send(its_message, its_address);
}
}
@@ -471,7 +472,8 @@ void service_discovery_impl::unsubscribe(service_t _service,
}
}
}
- if (has_address && 0 < its_message->get_entries().size()) {
+ if (has_address && its_message->get_entries().size()
+ && its_message->get_options().size()) {
serialize_and_send(its_message, its_address);
}
}
@@ -552,7 +554,8 @@ void service_discovery_impl::unsubscribe_client(service_t _service,
}
}
}
- if (has_address && 0 < its_message->get_entries().size()) {
+ if (has_address && its_message->get_entries().size()
+ && its_message->get_options().size()) {
serialize_and_send(its_message, its_address);
}
}
@@ -589,7 +592,13 @@ bool service_discovery_impl::is_reboot(
auto its_received = sessions_received_.find(_sender);
bool is_multicast = _destination.is_multicast();
+ // Initialize both sessions with 0. Thus, the session identifier
+ // for the session not being received from the network is stored
+ // as 0 and will never trigger the reboot detection.
session_t its_multicast_session(0), its_unicast_session(0);
+
+ // Initialize both flags with true. Thus, the flag not being
+ // received from the network will never trigger the reboot detection.
bool its_multicast_reboot_flag(true), its_unicast_reboot_flag(true);
if (is_multicast) {
@@ -603,30 +612,31 @@ bool service_discovery_impl::is_reboot(
if (its_received == sessions_received_.end()) {
sessions_received_[_sender]
= std::make_tuple(its_multicast_session, its_unicast_session,
- its_multicast_reboot_flag, its_unicast_reboot_flag);
+ its_multicast_reboot_flag, its_unicast_reboot_flag);
} else {
// Reboot detection: Either the flag has changed from false to true,
- // or the session identifier overrun while the flag is true
- if ((is_multicast && !std::get<2>(its_received->second) && _reboot_flag)
- || (!is_multicast && !std::get<3>(its_received->second) && _reboot_flag)) {
- result = true;
- } else {
- session_t its_old_session;
- bool its_old_reboot_flag;
-
- if (is_multicast) {
- its_old_session = std::get<0>(its_received->second);
- its_old_reboot_flag = std::get<2>(its_received->second);
- } else {
- its_old_session = std::get<1>(its_received->second);
- its_old_reboot_flag = std::get<3>(its_received->second);
- }
-
- if (its_old_reboot_flag && _reboot_flag
- && its_old_session >= _session) {
- result = true;
- }
- }
+ // or the session identifier overrun while the flag is true.
+ if (_reboot_flag
+ && ((is_multicast && !std::get<2>(its_received->second))
+ || (!is_multicast && !std::get<3>(its_received->second)))) {
+ result = true;
+ } else {
+ session_t its_old_session;
+ bool its_old_reboot_flag;
+
+ if (is_multicast) {
+ its_old_session = std::get<0>(its_received->second);
+ its_old_reboot_flag = std::get<2>(its_received->second);
+ } else {
+ its_old_session = std::get<1>(its_received->second);
+ its_old_reboot_flag = std::get<3>(its_received->second);
+ }
+
+ if (its_old_reboot_flag && _reboot_flag
+ && its_old_session >= _session) {
+ result = true;
+ }
+ }
if (result == false) {
// no reboot -> update session/flag
@@ -635,10 +645,10 @@ bool service_discovery_impl::is_reboot(
std::get<2>(its_received->second) = its_multicast_reboot_flag;
} else {
std::get<1>(its_received->second) = its_unicast_session;
- std::get<3>(its_received->second) = its_multicast_reboot_flag;
+ std::get<3>(its_received->second) = its_unicast_reboot_flag;
}
} else {
- // reboot -> reset the session
+ // reboot -> reset the sender data
sessions_received_.erase(_sender);
}
}
@@ -882,16 +892,20 @@ void service_discovery_impl::insert_subscription(
std::shared_ptr < endpoint > its_endpoint;
if (_insert_reliable) {
its_endpoint = _subscription->get_endpoint(true);
- if (its_endpoint && its_endpoint->get_local_port()) {
- insert_option(_message, its_entry, unicast_,
- its_endpoint->get_local_port(), true);
+ if (its_endpoint) {
+ const std::uint16_t its_port = its_endpoint->get_local_port();
+ if (its_port) {
+ insert_option(_message, its_entry, unicast_, its_port, true);
+ }
}
}
if (_insert_unreliable) {
its_endpoint = _subscription->get_endpoint(false);
if (its_endpoint && its_endpoint->get_local_port()) {
- insert_option(_message, its_entry, unicast_,
- its_endpoint->get_local_port(), false);
+ const std::uint16_t its_port = its_endpoint->get_local_port();
+ if (its_port) {
+ insert_option(_message, its_entry, unicast_, its_port, false);
+ }
}
}
}
@@ -928,17 +942,19 @@ void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared
std::shared_ptr < endpoint > its_endpoint;
its_endpoint = _subscription->get_endpoint(true);
if (its_endpoint && its_endpoint->is_connected()) {
- insert_option(_message, its_stop_entry, unicast_,
- its_endpoint->get_local_port(), true);
- insert_option(_message, its_entry, unicast_,
- its_endpoint->get_local_port(), true);
+ const std::uint16_t its_port = its_endpoint->get_local_port();
+ if (its_port) {
+ insert_option(_message, its_stop_entry, unicast_, its_port, true);
+ insert_option(_message, its_entry, unicast_, its_port, true);
+ }
}
its_endpoint = _subscription->get_endpoint(false);
if (its_endpoint) {
- insert_option(_message, its_stop_entry, unicast_,
- its_endpoint->get_local_port(), false);
- insert_option(_message, its_entry, unicast_,
- its_endpoint->get_local_port(), false);
+ const std::uint16_t its_port = its_endpoint->get_local_port();
+ if (its_port) {
+ insert_option(_message, its_stop_entry, unicast_, its_port, false);
+ insert_option(_message, its_entry, unicast_, its_port, false);
+ }
}
}
@@ -1302,7 +1318,8 @@ void service_discovery_impl::process_offerservice_serviceentry(
}
}
- if (0 < its_message->get_entries().size()) {
+ if (its_message->get_entries().size()
+ && its_message->get_options().size()) {
std::shared_ptr<endpoint_definition> its_target;
std::pair<session_t, bool> its_session;
std::lock_guard<std::mutex> its_lock(serialize_mutex_);
@@ -1480,7 +1497,8 @@ void service_discovery_impl::on_reliable_endpoint_connected(
}
}
}
- if (has_address && 0 < its_message->get_entries().size()) {
+ if (has_address && its_message->get_entries().size()
+ && its_message->get_options().size()) {
serialize_and_send(its_message, its_address);
}
}
@@ -2239,7 +2257,8 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
_instance, found_eventgroup.first,
found_client->second, _reliable, !_reliable);
}
- if(0 < its_message->get_entries().size()) {
+ if (its_message->get_entries().size()
+ && its_message->get_options().size()) {
subscription_messages.push_front({its_message, its_address});
found_client->second->set_acknowledged(false);
}