diff options
author | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:08 -0800 |
---|---|---|
committer | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:08 -0800 |
commit | 8936891b5db1a0c894a3ec0af52c081b52cca46c (patch) | |
tree | 5934534a14878754a5a8a275a8fe5cd5a7dc9f18 | |
parent | a89a645014e17f383e07b6dc6899a4a8925cc324 (diff) | |
download | vSomeIP-8936891b5db1a0c894a3ec0af52c081b52cca46c.tar.gz |
vsomeip 2.10.72.10.7
-rw-r--r-- | CHANGES | 5 | ||||
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | implementation/endpoints/include/tcp_client_endpoint_impl.hpp | 21 | ||||
-rw-r--r-- | implementation/endpoints/src/tcp_client_endpoint_impl.cpp | 179 | ||||
-rw-r--r-- | implementation/endpoints/src/udp_client_endpoint_impl.cpp | 7 | ||||
-rw-r--r-- | implementation/routing/src/routing_manager_base.cpp | 52 | ||||
-rw-r--r-- | implementation/routing/src/routing_manager_impl.cpp | 251 | ||||
-rw-r--r-- | implementation/routing/src/routing_manager_proxy.cpp | 7 | ||||
-rw-r--r-- | implementation/runtime/src/application_impl.cpp | 8 |
9 files changed, 313 insertions, 219 deletions
@@ -1,6 +1,11 @@ Changes ======= +v2.10.7 +- Fix potential deadlock when expiring remote subscriptions +- Rework restarting of tcp client endpoints to prevent heap corruption + under high load situations + v2.10.6 - Fix concurrency issue leading to a crash when asynchronous subscription handlers were used. diff --git a/CMakeLists.txt b/CMakeLists.txt index 251784d..cce0a68 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ project (vsomeip) set (VSOMEIP_MAJOR_VERSION 2) set (VSOMEIP_MINOR_VERSION 10) -set (VSOMEIP_PATCH_VERSION 6) +set (VSOMEIP_PATCH_VERSION 7) set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION}) set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in set (CMAKE_VERBOSE_MAKEFILE off) diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp index 9738afd..be6cf24 100644 --- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp @@ -41,18 +41,27 @@ public: private:
void send_queued();
- bool is_magic_cookie(size_t _offset) const;
+ bool is_magic_cookie(const message_buffer_ptr_t& _recv_buffer,
+ size_t _offset) const;
void send_magic_cookie(message_buffer_ptr_t &_buffer);
void receive_cbk(boost::system::error_code const &_error,
- std::size_t _bytes);
+ std::size_t _bytes,
+ message_buffer_ptr_t _recv_buffer,
+ std::size_t _recv_buffer_size);
void connect();
void receive();
- void calculate_shrink_count();
+ void receive(message_buffer_ptr_t _recv_buffer,
+ std::size_t _recv_buffer_size,
+ std::size_t _missing_capacity);
+ void calculate_shrink_count(const message_buffer_ptr_t& _recv_buffer,
+ std::size_t _recv_buffer_size);
const std::string get_address_port_remote() const;
const std::string get_address_port_local() const;
- void handle_recv_buffer_exception(const std::exception &_e);
+ void handle_recv_buffer_exception(const std::exception &_e,
+ const message_buffer_ptr_t& _recv_buffer,
+ std::size_t _recv_buffer_size);
void set_local_port();
std::size_t write_completion_condition(
const boost::system::error_code& _error,
@@ -62,9 +71,7 @@ private: std::string get_remote_information() const;
const std::uint32_t recv_buffer_size_initial_;
- message_buffer_t recv_buffer_;
- size_t recv_buffer_size_;
- std::uint32_t missing_capacity_;
+ message_buffer_ptr_t recv_buffer_;
std::uint32_t shrink_count_;
const std::uint32_t buffer_shrink_threshold_;
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index 7921e42..143b95a 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -34,9 +34,7 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl( : tcp_client_endpoint_base_impl(_host, _local, _remote, _io,
_max_message_size, _queue_limit),
recv_buffer_size_initial_(VSOMEIP_SOMEIP_HEADER_SIZE),
- recv_buffer_(recv_buffer_size_initial_, 0),
- recv_buffer_size_(0),
- missing_capacity_(0),
+ recv_buffer_(std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0)),
shrink_count_(0),
buffer_shrink_threshold_(_buffer_shrink_threshold),
remote_address_(_remote.address()),
@@ -64,12 +62,12 @@ void tcp_client_endpoint_impl::start() { void tcp_client_endpoint_impl::restart() {
is_connected_ = false;
+ std::string address_port_local;
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ address_port_local = get_address_port_local();
shutdown_and_close_socket_unlocked(true);
- recv_buffer_size_ = 0;
- recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
- recv_buffer_.shrink_to_fit();
+ recv_buffer_ = std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0);
}
{
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -97,6 +95,8 @@ void tcp_client_endpoint_impl::restart() { queue_.clear();
queue_size_ = 0;
}
+ VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
+ << " remote: " << get_address_port_remote();
start_connect_timer();
}
@@ -153,43 +153,56 @@ void tcp_client_endpoint_impl::connect() { }
void tcp_client_endpoint_impl::receive() {
+ message_buffer_ptr_t its_recv_buffer;
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ its_recv_buffer = recv_buffer_;
+ }
+ receive(its_recv_buffer, 0, 0);
+}
+
+void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
+ std::size_t _recv_buffer_size,
+ std::size_t _missing_capacity) {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
if(socket_->is_open()) {
- const std::size_t its_capacity(recv_buffer_.capacity());
- size_t buffer_size = its_capacity - recv_buffer_size_;
+ const std::size_t its_capacity(_recv_buffer->capacity());
+ size_t buffer_size = its_capacity - _recv_buffer_size;
try {
- if (missing_capacity_) {
- if (missing_capacity_ > MESSAGE_SIZE_UNLIMITED) {
+ if (_missing_capacity) {
+ if (_missing_capacity > MESSAGE_SIZE_UNLIMITED) {
VSOMEIP_ERROR << "Missing receive buffer capacity exceeds allowed maximum!";
return;
}
- const std::size_t its_required_capacity(recv_buffer_size_ + missing_capacity_);
+ const std::size_t its_required_capacity(_recv_buffer_size + _missing_capacity);
if (its_capacity < its_required_capacity) {
- recv_buffer_.reserve(its_required_capacity);
- recv_buffer_.resize(its_required_capacity, 0x0);
+ _recv_buffer->reserve(its_required_capacity);
+ _recv_buffer->resize(its_required_capacity, 0x0);
}
- buffer_size = missing_capacity_;
- missing_capacity_ = 0;
+ buffer_size = _missing_capacity;
+ _missing_capacity = 0;
} else if (buffer_shrink_threshold_
&& shrink_count_ > buffer_shrink_threshold_
- && recv_buffer_size_ == 0) {
- recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
- recv_buffer_.shrink_to_fit();
+ && _recv_buffer_size == 0) {
+ _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
+ _recv_buffer->shrink_to_fit();
buffer_size = recv_buffer_size_initial_;
shrink_count_ = 0;
}
} catch (const std::exception &e) {
- handle_recv_buffer_exception(e);
+ handle_recv_buffer_exception(e, _recv_buffer, _recv_buffer_size);
// don't start receiving again
return;
}
socket_->async_receive(
- boost::asio::buffer(&recv_buffer_[recv_buffer_size_], buffer_size),
+ boost::asio::buffer(&(*_recv_buffer)[_recv_buffer_size], buffer_size),
std::bind(
&tcp_client_endpoint_impl::receive_cbk,
std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()),
std::placeholders::_1,
- std::placeholders::_2
+ std::placeholders::_2,
+ _recv_buffer,
+ _recv_buffer_size
)
);
}
@@ -337,8 +350,9 @@ bool tcp_client_endpoint_impl::is_reliable() const { return true;
}
-bool tcp_client_endpoint_impl::is_magic_cookie(size_t _offset) const {
- return (0 == std::memcmp(SERVICE_COOKIE, &recv_buffer_[_offset], sizeof(SERVICE_COOKIE)));
+bool tcp_client_endpoint_impl::is_magic_cookie(const message_buffer_ptr_t& _recv_buffer,
+ size_t _offset) const {
+ return (0 == std::memcmp(SERVICE_COOKIE, &(*_recv_buffer)[_offset], sizeof(SERVICE_COOKIE)));
}
void tcp_client_endpoint_impl::send_magic_cookie(message_buffer_ptr_t &_buffer) {
@@ -357,7 +371,8 @@ void tcp_client_endpoint_impl::send_magic_cookie(message_buffer_ptr_t &_buffer) }
void tcp_client_endpoint_impl::receive_cbk(
- boost::system::error_code const &_error, std::size_t _bytes) {
+ boost::system::error_code const &_error, std::size_t _bytes,
+ message_buffer_ptr_t _recv_buffer, std::size_t _recv_buffer_size) {
if (_error == boost::asio::error::operation_aborted) {
// endpoint was stopped
return;
@@ -365,42 +380,43 @@ void tcp_client_endpoint_impl::receive_cbk( #if 0
std::stringstream msg;
msg << "cei::rcb (" << _error.message() << "): ";
- for (std::size_t i = 0; i < _bytes + recv_buffer_size_; ++i)
+ for (std::size_t i = 0; i < _bytes + _recv_buffer_size; ++i)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int) recv_buffer_[i] << " ";
+ << (int) (_recv_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
std::unique_lock<std::mutex> its_lock(socket_mutex_);
std::shared_ptr<endpoint_host> its_host = host_.lock();
if (its_host) {
+ std::uint32_t its_missing_capacity(0);
if (!_error && 0 < _bytes) {
- if (recv_buffer_size_ + _bytes < recv_buffer_size_) {
+ if (_recv_buffer_size + _bytes < _recv_buffer_size) {
VSOMEIP_ERROR << "receive buffer overflow in tcp client endpoint ~> abort!";
return;
}
- recv_buffer_size_ += _bytes;
+ _recv_buffer_size += _bytes;
size_t its_iteration_gap = 0;
- bool has_full_message;
+ bool has_full_message(false);
do {
uint64_t read_message_size
- = utility::get_message_size(&recv_buffer_[its_iteration_gap],
- recv_buffer_size_);
+ = utility::get_message_size(&(*_recv_buffer)[its_iteration_gap],
+ _recv_buffer_size);
if (read_message_size > MESSAGE_SIZE_UNLIMITED) {
VSOMEIP_ERROR << "Message size exceeds allowed maximum!";
return;
}
uint32_t current_message_size = static_cast<uint32_t>(read_message_size);
has_full_message = (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE
- && current_message_size <= recv_buffer_size_);
+ && current_message_size <= _recv_buffer_size);
if (has_full_message) {
bool needs_forwarding(true);
- if (is_magic_cookie(its_iteration_gap)) {
+ if (is_magic_cookie(_recv_buffer, its_iteration_gap)) {
has_enabled_magic_cookies_ = true;
} else {
if (has_enabled_magic_cookies_) {
- uint32_t its_offset = find_magic_cookie(&recv_buffer_[its_iteration_gap],
- (uint32_t) recv_buffer_size_);
+ uint32_t its_offset = find_magic_cookie(&(*_recv_buffer)[its_iteration_gap],
+ (uint32_t) _recv_buffer_size);
if (its_offset < current_message_size) {
VSOMEIP_ERROR << "Message includes Magic Cookie. Ignoring it.";
current_message_size = its_offset;
@@ -410,7 +426,7 @@ void tcp_client_endpoint_impl::receive_cbk( }
if (needs_forwarding) {
if (!has_enabled_magic_cookies_) {
- its_host->on_message(&recv_buffer_[its_iteration_gap],
+ its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
current_message_size, this,
boost::asio::ip::address(),
VSOMEIP_ROUTING_CLIENT,
@@ -418,8 +434,8 @@ void tcp_client_endpoint_impl::receive_cbk( remote_port_);
} else {
// Only call on_message without a magic cookie in front of the buffer!
- if (!is_magic_cookie(its_iteration_gap)) {
- its_host->on_message(&recv_buffer_[its_iteration_gap],
+ if (!is_magic_cookie(_recv_buffer, its_iteration_gap)) {
+ its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
current_message_size, this,
boost::asio::ip::address(),
VSOMEIP_ROUTING_CLIENT,
@@ -428,15 +444,15 @@ void tcp_client_endpoint_impl::receive_cbk( }
}
}
- calculate_shrink_count();
- recv_buffer_size_ -= current_message_size;
+ calculate_shrink_count(_recv_buffer, _recv_buffer_size);
+ _recv_buffer_size -= current_message_size;
its_iteration_gap += current_message_size;
- missing_capacity_ = 0;
+ its_missing_capacity = 0;
} else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED &&
current_message_size > max_message_size_) {
- recv_buffer_size_ = 0;
- recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
- recv_buffer_.shrink_to_fit();
+ _recv_buffer_size = 0;
+ _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
+ _recv_buffer->shrink_to_fit();
if (has_enabled_magic_cookies_) {
VSOMEIP_ERROR << "Received a TCP message which exceeds "
<< "maximum message size ("
@@ -455,64 +471,71 @@ void tcp_client_endpoint_impl::receive_cbk( << get_address_port_remote();
return;
}
- } else if (current_message_size > recv_buffer_size_) {
- missing_capacity_ = current_message_size
- - static_cast<std::uint32_t>(recv_buffer_size_);
- } else if (VSOMEIP_SOMEIP_HEADER_SIZE > recv_buffer_size_) {
- missing_capacity_ = VSOMEIP_SOMEIP_HEADER_SIZE
- - static_cast<std::uint32_t>(recv_buffer_size_);
- } else if (has_enabled_magic_cookies_ && recv_buffer_size_ > 0) {
- uint32_t its_offset = find_magic_cookie(&recv_buffer_[its_iteration_gap], recv_buffer_size_);
- if (its_offset < recv_buffer_size_) {
- recv_buffer_size_ -= its_offset;
+ } else if (current_message_size > _recv_buffer_size) {
+ its_missing_capacity = current_message_size
+ - static_cast<std::uint32_t>(_recv_buffer_size);
+ } else if (VSOMEIP_SOMEIP_HEADER_SIZE > _recv_buffer_size) {
+ its_missing_capacity = VSOMEIP_SOMEIP_HEADER_SIZE
+ - static_cast<std::uint32_t>(_recv_buffer_size);
+ } else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
+ uint32_t its_offset = find_magic_cookie(&(*_recv_buffer)[its_iteration_gap], _recv_buffer_size);
+ if (its_offset < _recv_buffer_size) {
+ _recv_buffer_size -= its_offset;
its_iteration_gap += its_offset;
has_full_message = true; // trigger next loop
}
} else {
VSOMEIP_ERROR << "tce::c<" << this
<< ">rcb: recv_buffer_size is: " << std::dec
- << recv_buffer_size_ << " but couldn't read "
+ << _recv_buffer_size << " but couldn't read "
"out message_size. recv_buffer_capacity: "
- << recv_buffer_.capacity()
+ << _recv_buffer->capacity()
<< " its_iteration_gap: " << its_iteration_gap
<< "local: " << get_address_port_local()
<< " remote: " << get_address_port_remote();
}
- } while (has_full_message && recv_buffer_size_);
+ } while (has_full_message && _recv_buffer_size);
if (its_iteration_gap) {
// Copy incomplete message to front for next receive_cbk iteration
- for (size_t i = 0; i < recv_buffer_size_; ++i) {
- recv_buffer_[i] = recv_buffer_[i + its_iteration_gap];
+ for (size_t i = 0; i < _recv_buffer_size; ++i) {
+ (*_recv_buffer)[i] = (*_recv_buffer)[i + its_iteration_gap];
}
// Still more capacity needed after shifting everything to front?
- if (missing_capacity_ &&
- missing_capacity_ <= recv_buffer_.capacity() - recv_buffer_size_) {
- missing_capacity_ = 0;
+ if (its_missing_capacity &&
+ its_missing_capacity <= _recv_buffer->capacity() - _recv_buffer_size) {
+ its_missing_capacity = 0;
}
}
its_lock.unlock();
- receive();
+ receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
} else {
if (_error == boost::asio::error::connection_reset ||
_error == boost::asio::error::eof ||
- _error == boost::asio::error::timed_out) {
- VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: " << _error.message()
- << " local: " << get_address_port_local()
+ _error == boost::asio::error::timed_out ||
+ _error == boost::asio::error::bad_descriptor) {
+ VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
+ << _error.message() << "( " << std::dec << _error.value()
+ << ") local: " << get_address_port_local()
<< " remote: " << get_address_port_remote();
is_connected_ = false;
shutdown_and_close_socket_unlocked(false);
} else {
+ VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
+ << _error.message() << "( " << std::dec << _error.value()
+ << ") local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
its_lock.unlock();
- receive();
+ receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
}
}
}
}
-void tcp_client_endpoint_impl::calculate_shrink_count() {
+void tcp_client_endpoint_impl::calculate_shrink_count(const message_buffer_ptr_t& _recv_buffer,
+ std::size_t _recv_buffer_size) {
if (buffer_shrink_threshold_) {
- if (recv_buffer_.capacity() != recv_buffer_size_initial_) {
- if (recv_buffer_size_ < (recv_buffer_.capacity() >> 1)) {
+ if (_recv_buffer->capacity() != recv_buffer_size_initial_) {
+ if (_recv_buffer_size < (_recv_buffer->capacity() >> 1)) {
shrink_count_++;
} else {
shrink_count_ = 0;
@@ -551,7 +574,9 @@ const std::string tcp_client_endpoint_impl::get_address_port_local() const { }
void tcp_client_endpoint_impl::handle_recv_buffer_exception(
- const std::exception &_e) {
+ const std::exception &_e,
+ const message_buffer_ptr_t& _recv_buffer,
+ std::size_t _recv_buffer_size) {
boost::system::error_code ec;
std::stringstream its_message;
@@ -560,18 +585,18 @@ void tcp_client_endpoint_impl::handle_recv_buffer_exception( << " remote: " << get_address_port_remote()
<< " shutting down connection. Start of buffer: ";
- for (std::size_t i = 0; i < recv_buffer_size_ && i < 16; i++) {
+ for (std::size_t i = 0; i < _recv_buffer_size && i < 16; i++) {
its_message << std::setw(2) << std::setfill('0') << std::hex
- << (int) (recv_buffer_[i]) << " ";
+ << (int) ((*_recv_buffer)[i]) << " ";
}
its_message << " Last 16 Bytes captured: ";
- for (int i = 15; recv_buffer_size_ > 15 && i >= 0; i--) {
+ for (int i = 15; _recv_buffer_size > 15 && i >= 0; i--) {
its_message << std::setw(2) << std::setfill('0') << std::hex
- << (int) (recv_buffer_[i]) << " ";
+ << (int) ((*_recv_buffer)[i]) << " ";
}
VSOMEIP_ERROR << its_message.str();
- recv_buffer_.clear();
+ _recv_buffer->clear();
{
std::lock_guard<std::mutex> its_lock(mutex_);
sending_blocked_ = true;
@@ -601,7 +626,7 @@ void tcp_client_endpoint_impl::print_status() { {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
local = get_address_port_local();
- its_receive_buffer_capacity = recv_buffer_.capacity();
+ its_receive_buffer_capacity = recv_buffer_->capacity();
}
VSOMEIP_INFO << "status tce: " << local << " -> "
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index a478e69..6bc0bf5 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -82,7 +82,14 @@ void udp_client_endpoint_impl::restart() { std::lock_guard<std::mutex> its_lock(mutex_);
queue_.clear();
}
+ std::string local;
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ local = get_address_port_local();
+ }
shutdown_and_close_socket(false);
+ VSOMEIP_WARNING << "uce::restart: local: " << local
+ << " remote: " << get_address_port_remote();
start_connect_timer();
}
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index 0e8ce5d..baf6660 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -384,30 +384,36 @@ void routing_manager_base::register_event(client_t _client, service_t _service, void routing_manager_base::unregister_event(client_t _client, service_t _service, instance_t _instance, event_t _event, bool _is_provided) { (void)_client; - std::lock_guard<std::mutex> its_lock(events_mutex_); - auto found_service = events_.find(_service); - if (found_service != events_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_event = found_instance->second.find(_event); - if (found_event != found_instance->second.end()) { - auto its_event = found_event->second; - its_event->remove_ref(_client, _is_provided); - if (!its_event->has_ref()) { - auto its_eventgroups = its_event->get_eventgroups(); - for (auto eg : its_eventgroups) { - std::shared_ptr<eventgroupinfo> its_eventgroup_info - = find_eventgroup(_service, _instance, eg); - if (its_eventgroup_info) { - its_eventgroup_info->remove_event(its_event); - if (0 == its_eventgroup_info->get_events().size()) { - remove_eventgroup_info(_service, _instance, eg); - } - } + std::shared_ptr<event> its_unrefed_event; + { + std::lock_guard<std::mutex> its_lock(events_mutex_); + auto found_service = events_.find(_service); + if (found_service != events_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto found_event = found_instance->second.find(_event); + if (found_event != found_instance->second.end()) { + auto its_event = found_event->second; + its_event->remove_ref(_client, _is_provided); + if (!its_event->has_ref()) { + its_unrefed_event = its_event; + found_instance->second.erase(found_event); + } else if (_is_provided) { + its_event->set_provided(false); } - found_instance->second.erase(_event); - } else if (_is_provided) { - its_event->set_provided(false); + } + } + } + } + if (its_unrefed_event) { + auto its_eventgroups = its_unrefed_event->get_eventgroups(); + for (auto eg : its_eventgroups) { + std::shared_ptr<eventgroupinfo> its_eventgroup_info + = find_eventgroup(_service, _instance, eg); + if (its_eventgroup_info) { + its_eventgroup_info->remove_event(its_unrefed_event); + if (0 == its_eventgroup_info->get_events().size()) { + remove_eventgroup_info(_service, _instance, eg); } } } diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 15f1201..0c443fd 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -2418,59 +2418,80 @@ void routing_manager_impl::expire_services(const boost::asio::ip::address &_addr } void routing_manager_impl::expire_subscriptions(const boost::asio::ip::address &_address) { - std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - for (auto &its_service : eventgroups_) { - for (auto &its_instance : its_service.second) { - const client_t its_hosting_client = find_local_client( - its_service.first, its_instance.first); - const bool service_offered_by_host = (its_hosting_client - == host_->get_client()); - auto target = find_local(its_hosting_client); - - for (auto &its_eventgroup : its_instance.second) { - std::set<std::shared_ptr<endpoint_definition>> its_invalid_endpoints; - for (auto &its_target : its_eventgroup.second->get_targets()) { - if (its_target.endpoint_->get_address() == _address) - its_invalid_endpoints.insert(its_target.endpoint_); - } - - for (auto &its_endpoint : its_invalid_endpoints) { - its_eventgroup.second->remove_target(its_endpoint); - client_t its_client = find_client(its_service.first, - its_instance.first, its_eventgroup.second, - its_endpoint); - clear_remote_subscriber(its_service.first, - its_instance.first, its_client, its_endpoint); - - if (its_eventgroup.second->get_targets().size() == 0) { - std::set<std::shared_ptr<event> > its_events - = its_eventgroup.second->get_events(); - for (auto e : its_events) { - if (e->is_shadow()) { - e->unset_payload(); - } - } + struct subscriptions_info { + service_t service_id_; + instance_t instance_id_; + eventgroup_t eventgroup_id_; + std::shared_ptr<endpoint_definition> invalid_endpoint_; + client_t client_; + std::set<std::shared_ptr<event>> events_; + }; + std::vector<struct subscriptions_info> subscriptions_to_expire_; + { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + for (auto &its_service : eventgroups_) { + for (auto &its_instance : its_service.second) { + for (auto &its_eventgroup : its_instance.second) { + std::set<std::shared_ptr<endpoint_definition>> its_invalid_endpoints; + for (auto &its_target : its_eventgroup.second->get_targets()) { + if (its_target.endpoint_->get_address() == _address) + its_invalid_endpoints.insert(its_target.endpoint_); } - if (target) { - stub_->send_unsubscribe(target, its_client, its_service.first, - its_instance.first, its_eventgroup.first, ANY_EVENT, true); + for (auto &its_endpoint : its_invalid_endpoints) { + its_eventgroup.second->remove_target(its_endpoint); + client_t its_client = find_client(its_service.first, + its_instance.first, its_eventgroup.second, + its_endpoint); + clear_remote_subscriber(its_service.first, + its_instance.first, its_client, its_endpoint); + + std::set<std::shared_ptr<event> > its_events; + if (its_eventgroup.second->get_targets().size() == 0) { + its_events = its_eventgroup.second->get_events(); + } + subscriptions_to_expire_.push_back({its_service.first, + its_instance.first, + its_eventgroup.first, + its_endpoint, + its_client, + its_events}); } - if (service_offered_by_host) { - host_->on_subscription(its_service.first, - its_instance.first, its_eventgroup.first, - its_client, false, - [](const bool _subscription_accepted){ - (void)_subscription_accepted; - }); + if(its_eventgroup.second->is_multicast() && its_invalid_endpoints.size() && + 0 == its_eventgroup.second->get_unreliable_target_count() ) { + //clear multicast targets if no subscriber is left for multicast eventgroup + its_eventgroup.second->clear_multicast_targets(); } } - if(its_eventgroup.second->is_multicast() && its_invalid_endpoints.size() && - 0 == its_eventgroup.second->get_unreliable_target_count() ) { - //clear multicast targets if no subscriber is left for multicast eventgroup - its_eventgroup.second->clear_multicast_targets(); + } + } + } + + for (const auto &s : subscriptions_to_expire_) { + if (s.invalid_endpoint_) { + for (const auto e: s.events_) { + if (e->is_shadow()) { + e->unset_payload(); } } + const client_t its_hosting_client = find_local_client( + s.service_id_, s.instance_id_); + const bool service_offered_by_host = (its_hosting_client + == host_->get_client()); + std::shared_ptr<endpoint> target = find_local(its_hosting_client); + + if (target) { + stub_->send_unsubscribe(target, s.client_, s.service_id_, + s.instance_id_, s.eventgroup_id_, ANY_EVENT, true); + } + if (service_offered_by_host) { + host_->on_subscription(s.service_id_, + s.instance_id_, s.eventgroup_id_, + s.client_, false, + [](const bool _subscription_accepted){ + (void)_subscription_accepted; + }); + } } } } @@ -3330,81 +3351,99 @@ void routing_manager_impl::clear_remote_subscriber( std::chrono::steady_clock::time_point routing_manager_impl::expire_subscriptions() { - std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + struct subscriptions_info { + service_t service_id_; + instance_t instance_id_; + eventgroup_t eventgroup_id_; + std::shared_ptr<endpoint_definition> invalid_endpoint_; + client_t client_; + std::set<std::shared_ptr<event>> events_; + }; + std::vector<struct subscriptions_info> subscriptions_to_expire_; std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); std::chrono::steady_clock::time_point next_expiration = std::chrono::steady_clock::now() + std::chrono::hours(24); + { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - for (auto &its_service : eventgroups_) { - for (auto &its_instance : its_service.second) { - const client_t its_hosting_client = find_local_client( - its_service.first, its_instance.first); - for (auto &its_eventgroup : its_instance.second) { - std::set<std::shared_ptr<endpoint_definition>> its_expired_endpoints; - for (auto &its_target : its_eventgroup.second->get_targets()) { - if (its_target.expiration_ < now) { - its_expired_endpoints.insert(its_target.endpoint_); - } else if (its_target.expiration_ < next_expiration) { - next_expiration = its_target.expiration_; - } - } - std::shared_ptr<endpoint> target; - bool service_offered_by_host(false); - if (its_expired_endpoints.size()) { - target = find_local(its_hosting_client); - service_offered_by_host = (its_hosting_client == host_->get_client()); - } - - for (auto its_endpoint : its_expired_endpoints) { - its_eventgroup.second->remove_target(its_endpoint); - - client_t its_client - = find_client(its_service.first, its_instance.first, - its_eventgroup.second, its_endpoint); - clear_remote_subscriber(its_service.first, its_instance.first, - its_client, its_endpoint); - - if (its_eventgroup.second->get_targets().size() == 0) { - std::set<std::shared_ptr<event> > its_events - = its_eventgroup.second->get_events(); - for (auto e : its_events) { - if (e->is_shadow()) { - e->unset_payload(); - } + for (auto &its_service : eventgroups_) { + for (auto &its_instance : its_service.second) { + for (auto &its_eventgroup : its_instance.second) { + std::set<std::shared_ptr<endpoint_definition>> its_expired_endpoints; + for (auto &its_target : its_eventgroup.second->get_targets()) { + if (its_target.expiration_ < now) { + its_expired_endpoints.insert(its_target.endpoint_); + } else if (its_target.expiration_ < next_expiration) { + next_expiration = its_target.expiration_; } } - if (target) { - stub_->send_unsubscribe(target, its_client, its_service.first, - its_instance.first, its_eventgroup.first, ANY_EVENT, true); + for (auto its_endpoint : its_expired_endpoints) { + its_eventgroup.second->remove_target(its_endpoint); + + client_t its_client + = find_client(its_service.first, its_instance.first, + its_eventgroup.second, its_endpoint); + clear_remote_subscriber(its_service.first, its_instance.first, + its_client, its_endpoint); + + std::set<std::shared_ptr<event> > its_events; + if (its_eventgroup.second->get_targets().size() == 0) { + its_events = its_eventgroup.second->get_events(); + } + subscriptions_to_expire_.push_back({its_service.first, + its_instance.first, + its_eventgroup.first, + its_endpoint, + its_client, + its_events}); } - if (service_offered_by_host) { - host_->on_subscription(its_service.first, - its_instance.first, its_eventgroup.first, - its_client, false, - [](const bool _subscription_accepted){ - (void)_subscription_accepted; - }); + if(its_eventgroup.second->is_multicast() && its_expired_endpoints.size() && + 0 == its_eventgroup.second->get_unreliable_target_count() ) { + //clear multicast targets if no unreliable subscriber is left for multicast eventgroup + its_eventgroup.second->clear_multicast_targets(); } - - VSOMEIP_INFO << "Expired subscription (" - << std::hex << its_service.first << "." - << its_instance .first << "." - << its_eventgroup.first << " from " - << its_endpoint->get_address() << ":" - << std::dec << its_endpoint->get_port() - << "(" << std::hex << its_client << ")"; - } - if(its_eventgroup.second->is_multicast() && its_expired_endpoints.size() && - 0 == its_eventgroup.second->get_unreliable_target_count() ) { - //clear multicast targets if no unreliable subscriber is left for multicast eventgroup - its_eventgroup.second->clear_multicast_targets(); } } } } + for (const auto &s : subscriptions_to_expire_) { + if (s.invalid_endpoint_) { + for (const auto e: s.events_) { + if (e->is_shadow()) { + e->unset_payload(); + } + } + const client_t its_hosting_client = find_local_client(s.service_id_, + s.instance_id_); + const bool service_offered_by_host = (its_hosting_client + == host_->get_client()); + std::shared_ptr<endpoint> target = find_local(its_hosting_client); + + if (target) { + stub_->send_unsubscribe(target, s.client_, s.service_id_, + s.instance_id_, s.eventgroup_id_, ANY_EVENT, true); + } + if (service_offered_by_host) { + host_->on_subscription(s.service_id_, + s.instance_id_, s.eventgroup_id_, + s.client_, false, + [](const bool _subscription_accepted){ + (void)_subscription_accepted; + }); + } + + VSOMEIP_INFO << "Expired subscription (" + << std::hex << s.service_id_ << "." + << s.instance_id_ << "." + << s.eventgroup_id_ << " from " + << s.invalid_endpoint_->get_address() << ":" + << std::dec << s.invalid_endpoint_->get_port() + << "(" << std::hex << s.client_ << ")"; + } + } return next_expiration; } diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index 8dd8515..a15ae2d 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -1998,7 +1998,12 @@ void routing_manager_proxy::handle_client_error(client_t _client) { should_reconnect = is_started_; } if (should_reconnect) { - reconnect(known_clients_); + std::unordered_set<client_t> its_known_clients; + { + std::lock_guard<std::mutex> its_lock(known_clients_mutex_); + its_known_clients = known_clients_; + } + reconnect(its_known_clients); } } } diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index 8d22c7c..d97be8e 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -1008,7 +1008,7 @@ void application_impl::deliver_subscription_state(service_t _service, instance_t } else { auto its_any_event = found_eventgroup->second.find(ANY_EVENT); if (its_any_event != found_eventgroup->second.end()) { - if (!_error || (_error && found_event->second.second)) { + if (!_error || (_error && its_any_event->second.second)) { handlers.push_back(its_any_event->second.first); } } @@ -1027,7 +1027,7 @@ void application_impl::deliver_subscription_state(service_t _service, instance_t } else { auto its_any_event = found_eventgroup->second.find(ANY_EVENT); if (its_any_event != found_eventgroup->second.end()) { - if (!_error || (_error && found_event->second.second)) { + if (!_error || (_error && its_any_event->second.second)) { handlers.push_back(its_any_event->second.first); } } @@ -1049,7 +1049,7 @@ void application_impl::deliver_subscription_state(service_t _service, instance_t } else { auto its_any_event = found_eventgroup->second.find(ANY_EVENT); if (its_any_event != found_eventgroup->second.end()) { - if (!_error || (_error && found_event->second.second)) { + if (!_error || (_error && its_any_event->second.second)) { handlers.push_back(its_any_event->second.first); } } @@ -1068,7 +1068,7 @@ void application_impl::deliver_subscription_state(service_t _service, instance_t } else { auto its_any_event = found_eventgroup->second.find(ANY_EVENT); if (its_any_event != found_eventgroup->second.end()) { - if (!_error || (_error && found_event->second.second)) { + if (!_error || (_error && its_any_event->second.second)) { handlers.push_back(its_any_event->second.first); } } |