summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:08 -0800
committerJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:08 -0800
commit8936891b5db1a0c894a3ec0af52c081b52cca46c (patch)
tree5934534a14878754a5a8a275a8fe5cd5a7dc9f18
parenta89a645014e17f383e07b6dc6899a4a8925cc324 (diff)
downloadvSomeIP-8936891b5db1a0c894a3ec0af52c081b52cca46c.tar.gz
vsomeip 2.10.72.10.7
-rw-r--r--CHANGES5
-rw-r--r--CMakeLists.txt2
-rw-r--r--implementation/endpoints/include/tcp_client_endpoint_impl.hpp21
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp179
-rw-r--r--implementation/endpoints/src/udp_client_endpoint_impl.cpp7
-rw-r--r--implementation/routing/src/routing_manager_base.cpp52
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp251
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp7
-rw-r--r--implementation/runtime/src/application_impl.cpp8
9 files changed, 313 insertions, 219 deletions
diff --git a/CHANGES b/CHANGES
index d4b9b4f..8f31b84 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,11 @@
Changes
=======
+v2.10.7
+- Fix potential deadlock when expiring remote subscriptions
+- Rework restarting of tcp client endpoints to prevent heap corruption
+ under high load situations
+
v2.10.6
- Fix concurrency issue leading to a crash when asynchronous
subscription handlers were used.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 251784d..cce0a68 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -8,7 +8,7 @@ project (vsomeip)
set (VSOMEIP_MAJOR_VERSION 2)
set (VSOMEIP_MINOR_VERSION 10)
-set (VSOMEIP_PATCH_VERSION 6)
+set (VSOMEIP_PATCH_VERSION 7)
set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION})
set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in
set (CMAKE_VERBOSE_MAKEFILE off)
diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
index 9738afd..be6cf24 100644
--- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
@@ -41,18 +41,27 @@ public:
private:
void send_queued();
- bool is_magic_cookie(size_t _offset) const;
+ bool is_magic_cookie(const message_buffer_ptr_t& _recv_buffer,
+ size_t _offset) const;
void send_magic_cookie(message_buffer_ptr_t &_buffer);
void receive_cbk(boost::system::error_code const &_error,
- std::size_t _bytes);
+ std::size_t _bytes,
+ message_buffer_ptr_t _recv_buffer,
+ std::size_t _recv_buffer_size);
void connect();
void receive();
- void calculate_shrink_count();
+ void receive(message_buffer_ptr_t _recv_buffer,
+ std::size_t _recv_buffer_size,
+ std::size_t _missing_capacity);
+ void calculate_shrink_count(const message_buffer_ptr_t& _recv_buffer,
+ std::size_t _recv_buffer_size);
const std::string get_address_port_remote() const;
const std::string get_address_port_local() const;
- void handle_recv_buffer_exception(const std::exception &_e);
+ void handle_recv_buffer_exception(const std::exception &_e,
+ const message_buffer_ptr_t& _recv_buffer,
+ std::size_t _recv_buffer_size);
void set_local_port();
std::size_t write_completion_condition(
const boost::system::error_code& _error,
@@ -62,9 +71,7 @@ private:
std::string get_remote_information() const;
const std::uint32_t recv_buffer_size_initial_;
- message_buffer_t recv_buffer_;
- size_t recv_buffer_size_;
- std::uint32_t missing_capacity_;
+ message_buffer_ptr_t recv_buffer_;
std::uint32_t shrink_count_;
const std::uint32_t buffer_shrink_threshold_;
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index 7921e42..143b95a 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -34,9 +34,7 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl(
: tcp_client_endpoint_base_impl(_host, _local, _remote, _io,
_max_message_size, _queue_limit),
recv_buffer_size_initial_(VSOMEIP_SOMEIP_HEADER_SIZE),
- recv_buffer_(recv_buffer_size_initial_, 0),
- recv_buffer_size_(0),
- missing_capacity_(0),
+ recv_buffer_(std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0)),
shrink_count_(0),
buffer_shrink_threshold_(_buffer_shrink_threshold),
remote_address_(_remote.address()),
@@ -64,12 +62,12 @@ void tcp_client_endpoint_impl::start() {
void tcp_client_endpoint_impl::restart() {
is_connected_ = false;
+ std::string address_port_local;
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ address_port_local = get_address_port_local();
shutdown_and_close_socket_unlocked(true);
- recv_buffer_size_ = 0;
- recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
- recv_buffer_.shrink_to_fit();
+ recv_buffer_ = std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0);
}
{
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -97,6 +95,8 @@ void tcp_client_endpoint_impl::restart() {
queue_.clear();
queue_size_ = 0;
}
+ VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
+ << " remote: " << get_address_port_remote();
start_connect_timer();
}
@@ -153,43 +153,56 @@ void tcp_client_endpoint_impl::connect() {
}
void tcp_client_endpoint_impl::receive() {
+ message_buffer_ptr_t its_recv_buffer;
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ its_recv_buffer = recv_buffer_;
+ }
+ receive(its_recv_buffer, 0, 0);
+}
+
+void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
+ std::size_t _recv_buffer_size,
+ std::size_t _missing_capacity) {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
if(socket_->is_open()) {
- const std::size_t its_capacity(recv_buffer_.capacity());
- size_t buffer_size = its_capacity - recv_buffer_size_;
+ const std::size_t its_capacity(_recv_buffer->capacity());
+ size_t buffer_size = its_capacity - _recv_buffer_size;
try {
- if (missing_capacity_) {
- if (missing_capacity_ > MESSAGE_SIZE_UNLIMITED) {
+ if (_missing_capacity) {
+ if (_missing_capacity > MESSAGE_SIZE_UNLIMITED) {
VSOMEIP_ERROR << "Missing receive buffer capacity exceeds allowed maximum!";
return;
}
- const std::size_t its_required_capacity(recv_buffer_size_ + missing_capacity_);
+ const std::size_t its_required_capacity(_recv_buffer_size + _missing_capacity);
if (its_capacity < its_required_capacity) {
- recv_buffer_.reserve(its_required_capacity);
- recv_buffer_.resize(its_required_capacity, 0x0);
+ _recv_buffer->reserve(its_required_capacity);
+ _recv_buffer->resize(its_required_capacity, 0x0);
}
- buffer_size = missing_capacity_;
- missing_capacity_ = 0;
+ buffer_size = _missing_capacity;
+ _missing_capacity = 0;
} else if (buffer_shrink_threshold_
&& shrink_count_ > buffer_shrink_threshold_
- && recv_buffer_size_ == 0) {
- recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
- recv_buffer_.shrink_to_fit();
+ && _recv_buffer_size == 0) {
+ _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
+ _recv_buffer->shrink_to_fit();
buffer_size = recv_buffer_size_initial_;
shrink_count_ = 0;
}
} catch (const std::exception &e) {
- handle_recv_buffer_exception(e);
+ handle_recv_buffer_exception(e, _recv_buffer, _recv_buffer_size);
// don't start receiving again
return;
}
socket_->async_receive(
- boost::asio::buffer(&recv_buffer_[recv_buffer_size_], buffer_size),
+ boost::asio::buffer(&(*_recv_buffer)[_recv_buffer_size], buffer_size),
std::bind(
&tcp_client_endpoint_impl::receive_cbk,
std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()),
std::placeholders::_1,
- std::placeholders::_2
+ std::placeholders::_2,
+ _recv_buffer,
+ _recv_buffer_size
)
);
}
@@ -337,8 +350,9 @@ bool tcp_client_endpoint_impl::is_reliable() const {
return true;
}
-bool tcp_client_endpoint_impl::is_magic_cookie(size_t _offset) const {
- return (0 == std::memcmp(SERVICE_COOKIE, &recv_buffer_[_offset], sizeof(SERVICE_COOKIE)));
+bool tcp_client_endpoint_impl::is_magic_cookie(const message_buffer_ptr_t& _recv_buffer,
+ size_t _offset) const {
+ return (0 == std::memcmp(SERVICE_COOKIE, &(*_recv_buffer)[_offset], sizeof(SERVICE_COOKIE)));
}
void tcp_client_endpoint_impl::send_magic_cookie(message_buffer_ptr_t &_buffer) {
@@ -357,7 +371,8 @@ void tcp_client_endpoint_impl::send_magic_cookie(message_buffer_ptr_t &_buffer)
}
void tcp_client_endpoint_impl::receive_cbk(
- boost::system::error_code const &_error, std::size_t _bytes) {
+ boost::system::error_code const &_error, std::size_t _bytes,
+ message_buffer_ptr_t _recv_buffer, std::size_t _recv_buffer_size) {
if (_error == boost::asio::error::operation_aborted) {
// endpoint was stopped
return;
@@ -365,42 +380,43 @@ void tcp_client_endpoint_impl::receive_cbk(
#if 0
std::stringstream msg;
msg << "cei::rcb (" << _error.message() << "): ";
- for (std::size_t i = 0; i < _bytes + recv_buffer_size_; ++i)
+ for (std::size_t i = 0; i < _bytes + _recv_buffer_size; ++i)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int) recv_buffer_[i] << " ";
+ << (int) (_recv_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
std::unique_lock<std::mutex> its_lock(socket_mutex_);
std::shared_ptr<endpoint_host> its_host = host_.lock();
if (its_host) {
+ std::uint32_t its_missing_capacity(0);
if (!_error && 0 < _bytes) {
- if (recv_buffer_size_ + _bytes < recv_buffer_size_) {
+ if (_recv_buffer_size + _bytes < _recv_buffer_size) {
VSOMEIP_ERROR << "receive buffer overflow in tcp client endpoint ~> abort!";
return;
}
- recv_buffer_size_ += _bytes;
+ _recv_buffer_size += _bytes;
size_t its_iteration_gap = 0;
- bool has_full_message;
+ bool has_full_message(false);
do {
uint64_t read_message_size
- = utility::get_message_size(&recv_buffer_[its_iteration_gap],
- recv_buffer_size_);
+ = utility::get_message_size(&(*_recv_buffer)[its_iteration_gap],
+ _recv_buffer_size);
if (read_message_size > MESSAGE_SIZE_UNLIMITED) {
VSOMEIP_ERROR << "Message size exceeds allowed maximum!";
return;
}
uint32_t current_message_size = static_cast<uint32_t>(read_message_size);
has_full_message = (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE
- && current_message_size <= recv_buffer_size_);
+ && current_message_size <= _recv_buffer_size);
if (has_full_message) {
bool needs_forwarding(true);
- if (is_magic_cookie(its_iteration_gap)) {
+ if (is_magic_cookie(_recv_buffer, its_iteration_gap)) {
has_enabled_magic_cookies_ = true;
} else {
if (has_enabled_magic_cookies_) {
- uint32_t its_offset = find_magic_cookie(&recv_buffer_[its_iteration_gap],
- (uint32_t) recv_buffer_size_);
+ uint32_t its_offset = find_magic_cookie(&(*_recv_buffer)[its_iteration_gap],
+ (uint32_t) _recv_buffer_size);
if (its_offset < current_message_size) {
VSOMEIP_ERROR << "Message includes Magic Cookie. Ignoring it.";
current_message_size = its_offset;
@@ -410,7 +426,7 @@ void tcp_client_endpoint_impl::receive_cbk(
}
if (needs_forwarding) {
if (!has_enabled_magic_cookies_) {
- its_host->on_message(&recv_buffer_[its_iteration_gap],
+ its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
current_message_size, this,
boost::asio::ip::address(),
VSOMEIP_ROUTING_CLIENT,
@@ -418,8 +434,8 @@ void tcp_client_endpoint_impl::receive_cbk(
remote_port_);
} else {
// Only call on_message without a magic cookie in front of the buffer!
- if (!is_magic_cookie(its_iteration_gap)) {
- its_host->on_message(&recv_buffer_[its_iteration_gap],
+ if (!is_magic_cookie(_recv_buffer, its_iteration_gap)) {
+ its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
current_message_size, this,
boost::asio::ip::address(),
VSOMEIP_ROUTING_CLIENT,
@@ -428,15 +444,15 @@ void tcp_client_endpoint_impl::receive_cbk(
}
}
}
- calculate_shrink_count();
- recv_buffer_size_ -= current_message_size;
+ calculate_shrink_count(_recv_buffer, _recv_buffer_size);
+ _recv_buffer_size -= current_message_size;
its_iteration_gap += current_message_size;
- missing_capacity_ = 0;
+ its_missing_capacity = 0;
} else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED &&
current_message_size > max_message_size_) {
- recv_buffer_size_ = 0;
- recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
- recv_buffer_.shrink_to_fit();
+ _recv_buffer_size = 0;
+ _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
+ _recv_buffer->shrink_to_fit();
if (has_enabled_magic_cookies_) {
VSOMEIP_ERROR << "Received a TCP message which exceeds "
<< "maximum message size ("
@@ -455,64 +471,71 @@ void tcp_client_endpoint_impl::receive_cbk(
<< get_address_port_remote();
return;
}
- } else if (current_message_size > recv_buffer_size_) {
- missing_capacity_ = current_message_size
- - static_cast<std::uint32_t>(recv_buffer_size_);
- } else if (VSOMEIP_SOMEIP_HEADER_SIZE > recv_buffer_size_) {
- missing_capacity_ = VSOMEIP_SOMEIP_HEADER_SIZE
- - static_cast<std::uint32_t>(recv_buffer_size_);
- } else if (has_enabled_magic_cookies_ && recv_buffer_size_ > 0) {
- uint32_t its_offset = find_magic_cookie(&recv_buffer_[its_iteration_gap], recv_buffer_size_);
- if (its_offset < recv_buffer_size_) {
- recv_buffer_size_ -= its_offset;
+ } else if (current_message_size > _recv_buffer_size) {
+ its_missing_capacity = current_message_size
+ - static_cast<std::uint32_t>(_recv_buffer_size);
+ } else if (VSOMEIP_SOMEIP_HEADER_SIZE > _recv_buffer_size) {
+ its_missing_capacity = VSOMEIP_SOMEIP_HEADER_SIZE
+ - static_cast<std::uint32_t>(_recv_buffer_size);
+ } else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
+ uint32_t its_offset = find_magic_cookie(&(*_recv_buffer)[its_iteration_gap], _recv_buffer_size);
+ if (its_offset < _recv_buffer_size) {
+ _recv_buffer_size -= its_offset;
its_iteration_gap += its_offset;
has_full_message = true; // trigger next loop
}
} else {
VSOMEIP_ERROR << "tce::c<" << this
<< ">rcb: recv_buffer_size is: " << std::dec
- << recv_buffer_size_ << " but couldn't read "
+ << _recv_buffer_size << " but couldn't read "
"out message_size. recv_buffer_capacity: "
- << recv_buffer_.capacity()
+ << _recv_buffer->capacity()
<< " its_iteration_gap: " << its_iteration_gap
<< "local: " << get_address_port_local()
<< " remote: " << get_address_port_remote();
}
- } while (has_full_message && recv_buffer_size_);
+ } while (has_full_message && _recv_buffer_size);
if (its_iteration_gap) {
// Copy incomplete message to front for next receive_cbk iteration
- for (size_t i = 0; i < recv_buffer_size_; ++i) {
- recv_buffer_[i] = recv_buffer_[i + its_iteration_gap];
+ for (size_t i = 0; i < _recv_buffer_size; ++i) {
+ (*_recv_buffer)[i] = (*_recv_buffer)[i + its_iteration_gap];
}
// Still more capacity needed after shifting everything to front?
- if (missing_capacity_ &&
- missing_capacity_ <= recv_buffer_.capacity() - recv_buffer_size_) {
- missing_capacity_ = 0;
+ if (its_missing_capacity &&
+ its_missing_capacity <= _recv_buffer->capacity() - _recv_buffer_size) {
+ its_missing_capacity = 0;
}
}
its_lock.unlock();
- receive();
+ receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
} else {
if (_error == boost::asio::error::connection_reset ||
_error == boost::asio::error::eof ||
- _error == boost::asio::error::timed_out) {
- VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: " << _error.message()
- << " local: " << get_address_port_local()
+ _error == boost::asio::error::timed_out ||
+ _error == boost::asio::error::bad_descriptor) {
+ VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
+ << _error.message() << "( " << std::dec << _error.value()
+ << ") local: " << get_address_port_local()
<< " remote: " << get_address_port_remote();
is_connected_ = false;
shutdown_and_close_socket_unlocked(false);
} else {
+ VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
+ << _error.message() << "( " << std::dec << _error.value()
+ << ") local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
its_lock.unlock();
- receive();
+ receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
}
}
}
}
-void tcp_client_endpoint_impl::calculate_shrink_count() {
+void tcp_client_endpoint_impl::calculate_shrink_count(const message_buffer_ptr_t& _recv_buffer,
+ std::size_t _recv_buffer_size) {
if (buffer_shrink_threshold_) {
- if (recv_buffer_.capacity() != recv_buffer_size_initial_) {
- if (recv_buffer_size_ < (recv_buffer_.capacity() >> 1)) {
+ if (_recv_buffer->capacity() != recv_buffer_size_initial_) {
+ if (_recv_buffer_size < (_recv_buffer->capacity() >> 1)) {
shrink_count_++;
} else {
shrink_count_ = 0;
@@ -551,7 +574,9 @@ const std::string tcp_client_endpoint_impl::get_address_port_local() const {
}
void tcp_client_endpoint_impl::handle_recv_buffer_exception(
- const std::exception &_e) {
+ const std::exception &_e,
+ const message_buffer_ptr_t& _recv_buffer,
+ std::size_t _recv_buffer_size) {
boost::system::error_code ec;
std::stringstream its_message;
@@ -560,18 +585,18 @@ void tcp_client_endpoint_impl::handle_recv_buffer_exception(
<< " remote: " << get_address_port_remote()
<< " shutting down connection. Start of buffer: ";
- for (std::size_t i = 0; i < recv_buffer_size_ && i < 16; i++) {
+ for (std::size_t i = 0; i < _recv_buffer_size && i < 16; i++) {
its_message << std::setw(2) << std::setfill('0') << std::hex
- << (int) (recv_buffer_[i]) << " ";
+ << (int) ((*_recv_buffer)[i]) << " ";
}
its_message << " Last 16 Bytes captured: ";
- for (int i = 15; recv_buffer_size_ > 15 && i >= 0; i--) {
+ for (int i = 15; _recv_buffer_size > 15 && i >= 0; i--) {
its_message << std::setw(2) << std::setfill('0') << std::hex
- << (int) (recv_buffer_[i]) << " ";
+ << (int) ((*_recv_buffer)[i]) << " ";
}
VSOMEIP_ERROR << its_message.str();
- recv_buffer_.clear();
+ _recv_buffer->clear();
{
std::lock_guard<std::mutex> its_lock(mutex_);
sending_blocked_ = true;
@@ -601,7 +626,7 @@ void tcp_client_endpoint_impl::print_status() {
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
local = get_address_port_local();
- its_receive_buffer_capacity = recv_buffer_.capacity();
+ its_receive_buffer_capacity = recv_buffer_->capacity();
}
VSOMEIP_INFO << "status tce: " << local << " -> "
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
index a478e69..6bc0bf5 100644
--- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
@@ -82,7 +82,14 @@ void udp_client_endpoint_impl::restart() {
std::lock_guard<std::mutex> its_lock(mutex_);
queue_.clear();
}
+ std::string local;
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ local = get_address_port_local();
+ }
shutdown_and_close_socket(false);
+ VSOMEIP_WARNING << "uce::restart: local: " << local
+ << " remote: " << get_address_port_remote();
start_connect_timer();
}
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp
index 0e8ce5d..baf6660 100644
--- a/implementation/routing/src/routing_manager_base.cpp
+++ b/implementation/routing/src/routing_manager_base.cpp
@@ -384,30 +384,36 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
void routing_manager_base::unregister_event(client_t _client, service_t _service, instance_t _instance,
event_t _event, bool _is_provided) {
(void)_client;
- std::lock_guard<std::mutex> its_lock(events_mutex_);
- auto found_service = events_.find(_service);
- if (found_service != events_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_event = found_instance->second.find(_event);
- if (found_event != found_instance->second.end()) {
- auto its_event = found_event->second;
- its_event->remove_ref(_client, _is_provided);
- if (!its_event->has_ref()) {
- auto its_eventgroups = its_event->get_eventgroups();
- for (auto eg : its_eventgroups) {
- std::shared_ptr<eventgroupinfo> its_eventgroup_info
- = find_eventgroup(_service, _instance, eg);
- if (its_eventgroup_info) {
- its_eventgroup_info->remove_event(its_event);
- if (0 == its_eventgroup_info->get_events().size()) {
- remove_eventgroup_info(_service, _instance, eg);
- }
- }
+ std::shared_ptr<event> its_unrefed_event;
+ {
+ std::lock_guard<std::mutex> its_lock(events_mutex_);
+ auto found_service = events_.find(_service);
+ if (found_service != events_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ auto found_event = found_instance->second.find(_event);
+ if (found_event != found_instance->second.end()) {
+ auto its_event = found_event->second;
+ its_event->remove_ref(_client, _is_provided);
+ if (!its_event->has_ref()) {
+ its_unrefed_event = its_event;
+ found_instance->second.erase(found_event);
+ } else if (_is_provided) {
+ its_event->set_provided(false);
}
- found_instance->second.erase(_event);
- } else if (_is_provided) {
- its_event->set_provided(false);
+ }
+ }
+ }
+ }
+ if (its_unrefed_event) {
+ auto its_eventgroups = its_unrefed_event->get_eventgroups();
+ for (auto eg : its_eventgroups) {
+ std::shared_ptr<eventgroupinfo> its_eventgroup_info
+ = find_eventgroup(_service, _instance, eg);
+ if (its_eventgroup_info) {
+ its_eventgroup_info->remove_event(its_unrefed_event);
+ if (0 == its_eventgroup_info->get_events().size()) {
+ remove_eventgroup_info(_service, _instance, eg);
}
}
}
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 15f1201..0c443fd 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -2418,59 +2418,80 @@ void routing_manager_impl::expire_services(const boost::asio::ip::address &_addr
}
void routing_manager_impl::expire_subscriptions(const boost::asio::ip::address &_address) {
- std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- for (auto &its_service : eventgroups_) {
- for (auto &its_instance : its_service.second) {
- const client_t its_hosting_client = find_local_client(
- its_service.first, its_instance.first);
- const bool service_offered_by_host = (its_hosting_client
- == host_->get_client());
- auto target = find_local(its_hosting_client);
-
- for (auto &its_eventgroup : its_instance.second) {
- std::set<std::shared_ptr<endpoint_definition>> its_invalid_endpoints;
- for (auto &its_target : its_eventgroup.second->get_targets()) {
- if (its_target.endpoint_->get_address() == _address)
- its_invalid_endpoints.insert(its_target.endpoint_);
- }
-
- for (auto &its_endpoint : its_invalid_endpoints) {
- its_eventgroup.second->remove_target(its_endpoint);
- client_t its_client = find_client(its_service.first,
- its_instance.first, its_eventgroup.second,
- its_endpoint);
- clear_remote_subscriber(its_service.first,
- its_instance.first, its_client, its_endpoint);
-
- if (its_eventgroup.second->get_targets().size() == 0) {
- std::set<std::shared_ptr<event> > its_events
- = its_eventgroup.second->get_events();
- for (auto e : its_events) {
- if (e->is_shadow()) {
- e->unset_payload();
- }
- }
+ struct subscriptions_info {
+ service_t service_id_;
+ instance_t instance_id_;
+ eventgroup_t eventgroup_id_;
+ std::shared_ptr<endpoint_definition> invalid_endpoint_;
+ client_t client_;
+ std::set<std::shared_ptr<event>> events_;
+ };
+ std::vector<struct subscriptions_info> subscriptions_to_expire_;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ for (auto &its_service : eventgroups_) {
+ for (auto &its_instance : its_service.second) {
+ for (auto &its_eventgroup : its_instance.second) {
+ std::set<std::shared_ptr<endpoint_definition>> its_invalid_endpoints;
+ for (auto &its_target : its_eventgroup.second->get_targets()) {
+ if (its_target.endpoint_->get_address() == _address)
+ its_invalid_endpoints.insert(its_target.endpoint_);
}
- if (target) {
- stub_->send_unsubscribe(target, its_client, its_service.first,
- its_instance.first, its_eventgroup.first, ANY_EVENT, true);
+ for (auto &its_endpoint : its_invalid_endpoints) {
+ its_eventgroup.second->remove_target(its_endpoint);
+ client_t its_client = find_client(its_service.first,
+ its_instance.first, its_eventgroup.second,
+ its_endpoint);
+ clear_remote_subscriber(its_service.first,
+ its_instance.first, its_client, its_endpoint);
+
+ std::set<std::shared_ptr<event> > its_events;
+ if (its_eventgroup.second->get_targets().size() == 0) {
+ its_events = its_eventgroup.second->get_events();
+ }
+ subscriptions_to_expire_.push_back({its_service.first,
+ its_instance.first,
+ its_eventgroup.first,
+ its_endpoint,
+ its_client,
+ its_events});
}
- if (service_offered_by_host) {
- host_->on_subscription(its_service.first,
- its_instance.first, its_eventgroup.first,
- its_client, false,
- [](const bool _subscription_accepted){
- (void)_subscription_accepted;
- });
+ if(its_eventgroup.second->is_multicast() && its_invalid_endpoints.size() &&
+ 0 == its_eventgroup.second->get_unreliable_target_count() ) {
+ //clear multicast targets if no subscriber is left for multicast eventgroup
+ its_eventgroup.second->clear_multicast_targets();
}
}
- if(its_eventgroup.second->is_multicast() && its_invalid_endpoints.size() &&
- 0 == its_eventgroup.second->get_unreliable_target_count() ) {
- //clear multicast targets if no subscriber is left for multicast eventgroup
- its_eventgroup.second->clear_multicast_targets();
+ }
+ }
+ }
+
+ for (const auto &s : subscriptions_to_expire_) {
+ if (s.invalid_endpoint_) {
+ for (const auto e: s.events_) {
+ if (e->is_shadow()) {
+ e->unset_payload();
}
}
+ const client_t its_hosting_client = find_local_client(
+ s.service_id_, s.instance_id_);
+ const bool service_offered_by_host = (its_hosting_client
+ == host_->get_client());
+ std::shared_ptr<endpoint> target = find_local(its_hosting_client);
+
+ if (target) {
+ stub_->send_unsubscribe(target, s.client_, s.service_id_,
+ s.instance_id_, s.eventgroup_id_, ANY_EVENT, true);
+ }
+ if (service_offered_by_host) {
+ host_->on_subscription(s.service_id_,
+ s.instance_id_, s.eventgroup_id_,
+ s.client_, false,
+ [](const bool _subscription_accepted){
+ (void)_subscription_accepted;
+ });
+ }
}
}
}
@@ -3330,81 +3351,99 @@ void routing_manager_impl::clear_remote_subscriber(
std::chrono::steady_clock::time_point
routing_manager_impl::expire_subscriptions() {
- std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ struct subscriptions_info {
+ service_t service_id_;
+ instance_t instance_id_;
+ eventgroup_t eventgroup_id_;
+ std::shared_ptr<endpoint_definition> invalid_endpoint_;
+ client_t client_;
+ std::set<std::shared_ptr<event>> events_;
+ };
+ std::vector<struct subscriptions_info> subscriptions_to_expire_;
std::chrono::steady_clock::time_point now
= std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point next_expiration
= std::chrono::steady_clock::now() + std::chrono::hours(24);
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- for (auto &its_service : eventgroups_) {
- for (auto &its_instance : its_service.second) {
- const client_t its_hosting_client = find_local_client(
- its_service.first, its_instance.first);
- for (auto &its_eventgroup : its_instance.second) {
- std::set<std::shared_ptr<endpoint_definition>> its_expired_endpoints;
- for (auto &its_target : its_eventgroup.second->get_targets()) {
- if (its_target.expiration_ < now) {
- its_expired_endpoints.insert(its_target.endpoint_);
- } else if (its_target.expiration_ < next_expiration) {
- next_expiration = its_target.expiration_;
- }
- }
- std::shared_ptr<endpoint> target;
- bool service_offered_by_host(false);
- if (its_expired_endpoints.size()) {
- target = find_local(its_hosting_client);
- service_offered_by_host = (its_hosting_client == host_->get_client());
- }
-
- for (auto its_endpoint : its_expired_endpoints) {
- its_eventgroup.second->remove_target(its_endpoint);
-
- client_t its_client
- = find_client(its_service.first, its_instance.first,
- its_eventgroup.second, its_endpoint);
- clear_remote_subscriber(its_service.first, its_instance.first,
- its_client, its_endpoint);
-
- if (its_eventgroup.second->get_targets().size() == 0) {
- std::set<std::shared_ptr<event> > its_events
- = its_eventgroup.second->get_events();
- for (auto e : its_events) {
- if (e->is_shadow()) {
- e->unset_payload();
- }
+ for (auto &its_service : eventgroups_) {
+ for (auto &its_instance : its_service.second) {
+ for (auto &its_eventgroup : its_instance.second) {
+ std::set<std::shared_ptr<endpoint_definition>> its_expired_endpoints;
+ for (auto &its_target : its_eventgroup.second->get_targets()) {
+ if (its_target.expiration_ < now) {
+ its_expired_endpoints.insert(its_target.endpoint_);
+ } else if (its_target.expiration_ < next_expiration) {
+ next_expiration = its_target.expiration_;
}
}
- if (target) {
- stub_->send_unsubscribe(target, its_client, its_service.first,
- its_instance.first, its_eventgroup.first, ANY_EVENT, true);
+ for (auto its_endpoint : its_expired_endpoints) {
+ its_eventgroup.second->remove_target(its_endpoint);
+
+ client_t its_client
+ = find_client(its_service.first, its_instance.first,
+ its_eventgroup.second, its_endpoint);
+ clear_remote_subscriber(its_service.first, its_instance.first,
+ its_client, its_endpoint);
+
+ std::set<std::shared_ptr<event> > its_events;
+ if (its_eventgroup.second->get_targets().size() == 0) {
+ its_events = its_eventgroup.second->get_events();
+ }
+ subscriptions_to_expire_.push_back({its_service.first,
+ its_instance.first,
+ its_eventgroup.first,
+ its_endpoint,
+ its_client,
+ its_events});
}
- if (service_offered_by_host) {
- host_->on_subscription(its_service.first,
- its_instance.first, its_eventgroup.first,
- its_client, false,
- [](const bool _subscription_accepted){
- (void)_subscription_accepted;
- });
+ if(its_eventgroup.second->is_multicast() && its_expired_endpoints.size() &&
+ 0 == its_eventgroup.second->get_unreliable_target_count() ) {
+ //clear multicast targets if no unreliable subscriber is left for multicast eventgroup
+ its_eventgroup.second->clear_multicast_targets();
}
-
- VSOMEIP_INFO << "Expired subscription ("
- << std::hex << its_service.first << "."
- << its_instance .first << "."
- << its_eventgroup.first << " from "
- << its_endpoint->get_address() << ":"
- << std::dec << its_endpoint->get_port()
- << "(" << std::hex << its_client << ")";
- }
- if(its_eventgroup.second->is_multicast() && its_expired_endpoints.size() &&
- 0 == its_eventgroup.second->get_unreliable_target_count() ) {
- //clear multicast targets if no unreliable subscriber is left for multicast eventgroup
- its_eventgroup.second->clear_multicast_targets();
}
}
}
}
+ for (const auto &s : subscriptions_to_expire_) {
+ if (s.invalid_endpoint_) {
+ for (const auto e: s.events_) {
+ if (e->is_shadow()) {
+ e->unset_payload();
+ }
+ }
+ const client_t its_hosting_client = find_local_client(s.service_id_,
+ s.instance_id_);
+ const bool service_offered_by_host = (its_hosting_client
+ == host_->get_client());
+ std::shared_ptr<endpoint> target = find_local(its_hosting_client);
+
+ if (target) {
+ stub_->send_unsubscribe(target, s.client_, s.service_id_,
+ s.instance_id_, s.eventgroup_id_, ANY_EVENT, true);
+ }
+ if (service_offered_by_host) {
+ host_->on_subscription(s.service_id_,
+ s.instance_id_, s.eventgroup_id_,
+ s.client_, false,
+ [](const bool _subscription_accepted){
+ (void)_subscription_accepted;
+ });
+ }
+
+ VSOMEIP_INFO << "Expired subscription ("
+ << std::hex << s.service_id_ << "."
+ << s.instance_id_ << "."
+ << s.eventgroup_id_ << " from "
+ << s.invalid_endpoint_->get_address() << ":"
+ << std::dec << s.invalid_endpoint_->get_port()
+ << "(" << std::hex << s.client_ << ")";
+ }
+ }
return next_expiration;
}
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index 8dd8515..a15ae2d 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -1998,7 +1998,12 @@ void routing_manager_proxy::handle_client_error(client_t _client) {
should_reconnect = is_started_;
}
if (should_reconnect) {
- reconnect(known_clients_);
+ std::unordered_set<client_t> its_known_clients;
+ {
+ std::lock_guard<std::mutex> its_lock(known_clients_mutex_);
+ its_known_clients = known_clients_;
+ }
+ reconnect(its_known_clients);
}
}
}
diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp
index 8d22c7c..d97be8e 100644
--- a/implementation/runtime/src/application_impl.cpp
+++ b/implementation/runtime/src/application_impl.cpp
@@ -1008,7 +1008,7 @@ void application_impl::deliver_subscription_state(service_t _service, instance_t
} else {
auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
if (its_any_event != found_eventgroup->second.end()) {
- if (!_error || (_error && found_event->second.second)) {
+ if (!_error || (_error && its_any_event->second.second)) {
handlers.push_back(its_any_event->second.first);
}
}
@@ -1027,7 +1027,7 @@ void application_impl::deliver_subscription_state(service_t _service, instance_t
} else {
auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
if (its_any_event != found_eventgroup->second.end()) {
- if (!_error || (_error && found_event->second.second)) {
+ if (!_error || (_error && its_any_event->second.second)) {
handlers.push_back(its_any_event->second.first);
}
}
@@ -1049,7 +1049,7 @@ void application_impl::deliver_subscription_state(service_t _service, instance_t
} else {
auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
if (its_any_event != found_eventgroup->second.end()) {
- if (!_error || (_error && found_event->second.second)) {
+ if (!_error || (_error && its_any_event->second.second)) {
handlers.push_back(its_any_event->second.first);
}
}
@@ -1068,7 +1068,7 @@ void application_impl::deliver_subscription_state(service_t _service, instance_t
} else {
auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
if (its_any_event != found_eventgroup->second.end()) {
- if (!_error || (_error && found_event->second.second)) {
+ if (!_error || (_error && its_any_event->second.second)) {
handlers.push_back(its_any_event->second.first);
}
}