diff options
Diffstat (limited to 'implementation/endpoints/src/tcp_server_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/tcp_server_endpoint_impl.cpp | 171 |
1 files changed, 102 insertions, 69 deletions
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index 37db3f5..f23e9be 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -1,5 +1,5 @@ -// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -26,7 +26,7 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl( const std::shared_ptr<endpoint_host>& _endpoint_host, const std::shared_ptr<routing_host>& _routing_host, const endpoint_type& _local, - boost::asio::io_service &_io, + boost::asio::io_context &_io, const std::shared_ptr<configuration>& _configuration) : tcp_server_endpoint_base_impl(_endpoint_host, _routing_host, _local, _io, _configuration->get_max_message_size_reliable(_local.address().to_string(), _local.port()), @@ -41,11 +41,16 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl( boost::system::error_code ec; acceptor_.open(_local.protocol(), ec); - boost::asio::detail::throw_error(ec, "acceptor open"); + if (ec) + VSOMEIP_ERROR << __func__ + << ": open failed (" << ec.message() << ")"; + acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec); - boost::asio::detail::throw_error(ec, "acceptor set_option"); + if (ec) + VSOMEIP_ERROR << __func__ + << ": set reuse address option failed (" << ec.message() << ")"; -#ifndef _WIN32 +#if defined(__linux__) || defined(ANDROID) // If specified, bind to device std::string its_device(configuration_->get_device()); if (its_device != "") { @@ -57,9 +62,14 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl( #endif acceptor_.bind(_local, ec); - boost::asio::detail::throw_error(ec, "acceptor bind"); + if (ec) + VSOMEIP_ERROR << __func__ + << ": bind failed (" << ec.message() << ")"; + acceptor_.listen(boost::asio::socket_base::max_connections, ec); - boost::asio::detail::throw_error(ec, "acceptor listen"); + if (ec) + VSOMEIP_ERROR << __func__ + << ": listen failed (" << ec.message() << ")"; } tcp_server_endpoint_impl::~tcp_server_endpoint_impl() { @@ -76,7 +86,7 @@ void tcp_server_endpoint_impl::start() { std::dynamic_pointer_cast<tcp_server_endpoint_impl>( shared_from_this()), max_message_size_, buffer_shrink_threshold_, has_enabled_magic_cookies_, - service_, send_timeout_); + io_, send_timeout_); { std::unique_lock<std::mutex> its_socket_lock(new_connection->get_socket_lock()); @@ -121,44 +131,47 @@ bool tcp_server_endpoint_impl::send_error( bool ret(false); std::lock_guard<std::mutex> its_lock(mutex_); const endpoint_type its_target(_target->get_address(), _target->get_port()); - const queue_iterator_type target_queue_iterator(find_or_create_queue_unlocked(its_target)); - auto& its_qpair = target_queue_iterator->second; - const bool queue_size_zero_on_entry(its_qpair.second.empty()); + const auto its_target_iterator(find_or_create_target_unlocked(its_target)); + auto &its_data = its_target_iterator->second; + const bool queue_size_zero_on_entry(its_data.queue_.empty()); if (check_message_size(nullptr, _size, its_target) == endpoint_impl::cms_ret_e::MSG_OK && - check_queue_limit(_data, _size, its_qpair.first)) { - its_qpair.second.emplace_back( - std::make_shared<message_buffer_t>(_data, _data + _size)); - its_qpair.first += _size; + check_queue_limit(_data, _size, its_data.queue_size_)) { + its_data.queue_.emplace_back( + std::make_pair(std::make_shared<message_buffer_t>(_data, _data + _size), 0)); + its_data.queue_size_ += _size; if (queue_size_zero_on_entry) { // no writing in progress - send_queued(target_queue_iterator); + send_queued(its_target_iterator); } ret = true; } return ret; } -void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iterator) { +bool tcp_server_endpoint_impl::send_queued(const target_data_iterator_type _it) { + + bool must_erase(false); connection::ptr its_connection; { std::lock_guard<std::mutex> its_lock(connections_mutex_); - auto connection_iterator = connections_.find(_queue_iterator->first); + auto connection_iterator = connections_.find(_it->first); if (connection_iterator != connections_.end()) { its_connection = connection_iterator->second; } else { VSOMEIP_INFO << "Didn't find connection: " - << _queue_iterator->first.address().to_string() << ":" << std::dec - << static_cast<std::uint16_t>(_queue_iterator->first.port()) + << _it->first.address().to_string() << ":" << std::dec + << static_cast<std::uint16_t>(_it->first.port()) << " dropping outstanding messages (" << std::dec - << _queue_iterator->second.second.size() << ")."; + << _it->second.queue_.size() << ")."; - if (_queue_iterator->second.second.size()) { + if (_it->second.queue_.size()) { std::set<service_t> its_services; // check all outstanding messages of this connection // whether stop handlers need to be called - for (const auto &its_buffer : _queue_iterator->second.second) { + for (const auto &its_q : _it->second.queue_) { + auto its_buffer(its_q.first); if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) { service_t its_service = VSOMEIP_BYTES_TO_WORD( (*its_buffer)[VSOMEIP_SERVICE_POS_MIN], @@ -176,7 +189,7 @@ void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iter << its_service; auto handler = found_cbk->second; auto ptr = this->shared_from_this(); - service_.post([ptr, handler, its_service](){ + io_.post([ptr, handler, its_service](){ handler(ptr, its_service); }); prepare_stop_handlers_.erase(found_cbk); @@ -184,12 +197,14 @@ void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iter } } - queues_.erase(_queue_iterator->first); + must_erase = true; } } if (its_connection) { - its_connection->send_queued(_queue_iterator); + its_connection->send_queued(_it); } + + return (must_erase); } void tcp_server_endpoint_impl::get_configured_times_from_endpoint( @@ -202,7 +217,7 @@ void tcp_server_endpoint_impl::get_configured_times_from_endpoint( _debouncing, _maximum_retention); } -bool tcp_server_endpoint_impl::is_established(const std::shared_ptr<endpoint_definition>& _endpoint) { +bool tcp_server_endpoint_impl::is_established_to(const std::shared_ptr<endpoint_definition>& _endpoint) { bool is_connected = false; endpoint_type endpoint(_endpoint->get_address(), _endpoint->get_port()); { @@ -274,7 +289,7 @@ void tcp_server_endpoint_impl::accept_cbk(const connection::ptr& _connection, << _error.message() << " (" << std::dec << _error.value() << ") Will try to accept again in 1000ms"; std::shared_ptr<boost::asio::steady_timer> its_timer = - std::make_shared<boost::asio::steady_timer>(service_, + std::make_shared<boost::asio::steady_timer>(io_, std::chrono::milliseconds(1000)); auto its_ep = std::dynamic_pointer_cast<tcp_server_endpoint_impl>( shared_from_this()); @@ -288,6 +303,7 @@ void tcp_server_endpoint_impl::accept_cbk(const connection::ptr& _connection, } std::uint16_t tcp_server_endpoint_impl::get_local_port() const { + return local_port_; } @@ -308,9 +324,9 @@ tcp_server_endpoint_impl::connection::connection( std::uint32_t _recv_buffer_size_initial, std::uint32_t _buffer_shrink_threshold, bool _magic_cookies_enabled, - boost::asio::io_service &_io_service, + boost::asio::io_context &_io, std::chrono::milliseconds _send_timeout) : - socket_(_io_service), + socket_(_io), server_(_server), max_message_size_(_max_message_size), recv_buffer_size_initial_(_recv_buffer_size_initial), @@ -326,20 +342,33 @@ tcp_server_endpoint_impl::connection::connection( send_timeout_warning_(_send_timeout / 2) { } +tcp_server_endpoint_impl::connection::~connection() { + + auto its_server(server_.lock()); + if (its_server) { + auto its_routing_host(its_server->routing_host_.lock()); + if (its_routing_host) { + its_routing_host->remove_subscriptions( + its_server->local_port_, + remote_address_, remote_port_); + } + } +} + tcp_server_endpoint_impl::connection::ptr tcp_server_endpoint_impl::connection::create( const std::weak_ptr<tcp_server_endpoint_impl>& _server, std::uint32_t _max_message_size, std::uint32_t _buffer_shrink_threshold, bool _magic_cookies_enabled, - boost::asio::io_service & _io_service, + boost::asio::io_context &_io, std::chrono::milliseconds _send_timeout) { const std::uint32_t its_initial_receveive_buffer_size = VSOMEIP_SOMEIP_HEADER_SIZE + 8 + MAGIC_COOKIE_SIZE + 8; return ptr(new connection(_server, _max_message_size, its_initial_receveive_buffer_size, _buffer_shrink_threshold, _magic_cookies_enabled, - _io_service, _send_timeout)); + _io, _send_timeout)); } tcp_server_endpoint_impl::socket_type & @@ -360,7 +389,13 @@ void tcp_server_endpoint_impl::connection::receive() { std::lock_guard<std::mutex> its_lock(socket_mutex_); if(socket_.is_open()) { const std::size_t its_capacity(recv_buffer_.capacity()); - size_t buffer_size = its_capacity - recv_buffer_size_; + if (recv_buffer_size_ > its_capacity) { + VSOMEIP_ERROR << __func__ << "Received buffer size is greater than the buffer capacity!" + << " recv_buffer_size_: " << recv_buffer_size_ + << " its_capacity: " << its_capacity; + return; + } + size_t left_buffer_size = its_capacity - recv_buffer_size_; try { if (missing_capacity_) { if (missing_capacity_ > MESSAGE_SIZE_UNLIMITED) { @@ -369,6 +404,7 @@ void tcp_server_endpoint_impl::connection::receive() { } const std::size_t its_required_capacity(recv_buffer_size_ + missing_capacity_); if (its_capacity < its_required_capacity) { + // Make the resize to its_required_capacity recv_buffer_.reserve(its_required_capacity); recv_buffer_.resize(its_required_capacity, 0x0); if (recv_buffer_.size() > 1048576) { @@ -378,14 +414,16 @@ void tcp_server_endpoint_impl::connection::receive() { << " remote: " << get_address_port_remote(); } } - buffer_size = missing_capacity_; + left_buffer_size = missing_capacity_; missing_capacity_ = 0; } else if (buffer_shrink_threshold_ && shrink_count_ > buffer_shrink_threshold_ && recv_buffer_size_ == 0) { + // In this case, make the resize to recv_buffer_size_initial_ recv_buffer_.resize(recv_buffer_size_initial_, 0x0); recv_buffer_.shrink_to_fit(); - buffer_size = recv_buffer_size_initial_; + // And set buffer_size to recv_buffer_size_initial_, the same of our resize + left_buffer_size = recv_buffer_size_initial_; shrink_count_ = 0; } } catch (const std::exception &e) { @@ -393,7 +431,7 @@ void tcp_server_endpoint_impl::connection::receive() { // don't start receiving again return; } - socket_.async_receive(boost::asio::buffer(&recv_buffer_[recv_buffer_size_], buffer_size), + socket_.async_receive(boost::asio::buffer(&recv_buffer_[recv_buffer_size_], left_buffer_size), std::bind(&tcp_server_endpoint_impl::connection::receive_cbk, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); @@ -410,14 +448,15 @@ void tcp_server_endpoint_impl::connection::stop() { } void tcp_server_endpoint_impl::connection::send_queued( - const queue_iterator_type _queue_iterator) { + const target_data_iterator_type _it) { + std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock()); if (!its_server) { VSOMEIP_TRACE << "tcp_server_endpoint_impl::connection::send_queued " " couldn't lock server_"; return; } - message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front(); + message_buffer_ptr_t its_buffer = _it->second.queue_.front().first; const service_t its_service = VSOMEIP_BYTES_TO_WORD( (*its_buffer)[VSOMEIP_SERVICE_POS_MIN], (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]); @@ -437,7 +476,7 @@ void tcp_server_endpoint_impl::connection::send_queued( now - last_cookie_sent_) > std::chrono::milliseconds(10000)) { if (send_magic_cookie(its_buffer)) { last_cookie_sent_ = now; - _queue_iterator->second.first += sizeof(SERVICE_COOKIE); + _it->second.queue_size_ += sizeof(SERVICE_COOKIE); } } } @@ -459,15 +498,16 @@ void tcp_server_endpoint_impl::connection::send_queued( std::chrono::steady_clock::now()), std::bind(&tcp_server_endpoint_base_impl::send_cbk, its_server, - _queue_iterator, + _it, std::placeholders::_1, std::placeholders::_2)); } } void tcp_server_endpoint_impl::connection::send_queued_sync( - const queue_iterator_type _queue_iterator) { - message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front(); + const target_data_iterator_type _it) { + + message_buffer_ptr_t its_buffer = _it->second.queue_.front().first; if (magic_cookies_enabled_) { const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); @@ -475,7 +515,7 @@ void tcp_server_endpoint_impl::connection::send_queued_sync( now - last_cookie_sent_) > std::chrono::milliseconds(10000)) { if (send_magic_cookie(its_buffer)) { last_cookie_sent_ = now; - _queue_iterator->second.first += sizeof(SERVICE_COOKIE); + _it->second.queue_size_ += sizeof(SERVICE_COOKIE); } } } @@ -587,36 +627,29 @@ void tcp_server_endpoint_impl::connection::receive_cbk( recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN], recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MAX]); if (its_client != MAGIC_COOKIE_CLIENT) { - const service_t its_service = VSOMEIP_BYTES_TO_WORD( - recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MIN], - recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MAX]); - const method_t its_method = VSOMEIP_BYTES_TO_WORD( - recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MIN], - recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN], recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MAX]); - - std::lock_guard<std::mutex> its_requests_guard(its_server->requests_mutex_); - its_server->requests_[its_client] - [std::make_tuple(its_service, its_method, its_session)] = remote_; + its_server->clients_mutex_.lock(); + its_server->clients_[its_client][its_session] = remote_; + its_server->clients_mutex_.unlock(); } } if (!magic_cookies_enabled_) { its_host->on_message(&recv_buffer_[its_iteration_gap], current_message_size, its_server.get(), - boost::asio::ip::address(), + false, VSOMEIP_ROUTING_CLIENT, - std::make_pair(ANY_UID, ANY_GID), + nullptr, remote_address_, remote_port_); } else { // Only call on_message without a magic cookie in front of the buffer! if (!is_magic_cookie(its_iteration_gap)) { its_host->on_message(&recv_buffer_[its_iteration_gap], current_message_size, its_server.get(), - boost::asio::ip::address(), + false, VSOMEIP_ROUTING_CLIENT, - std::make_pair(ANY_UID, ANY_GID), + nullptr, remote_address_, remote_port_); } } @@ -677,9 +710,9 @@ void tcp_server_endpoint_impl::connection::receive_cbk( // ensure to send back a error message w/ wrong protocol version its_host->on_message(&recv_buffer_[its_iteration_gap], VSOMEIP_SOMEIP_HEADER_SIZE + 8, its_server.get(), - boost::asio::ip::address(), + false, VSOMEIP_ROUTING_CLIENT, - std::make_pair(ANY_UID, ANY_GID), + nullptr, remote_address_, remote_port_); } else if (!utility::is_valid_message_type(static_cast<message_type_e>( recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]))) { @@ -814,7 +847,7 @@ void tcp_server_endpoint_impl::connection::set_remote_info( remote_port_ = _remote.port(); } -const std::string tcp_server_endpoint_impl::connection::get_address_port_remote() const { +std::string tcp_server_endpoint_impl::connection::get_address_port_remote() const { std::string its_address_port; its_address_port.reserve(21); boost::system::error_code ec; @@ -824,7 +857,7 @@ const std::string tcp_server_endpoint_impl::connection::get_address_port_remote( return its_address_port; } -const std::string tcp_server_endpoint_impl::connection::get_address_port_local() const { +std::string tcp_server_endpoint_impl::connection::get_address_port_local() const { std::string its_address_port; its_address_port.reserve(21); boost::system::error_code ec; @@ -954,7 +987,7 @@ void tcp_server_endpoint_impl::print_status() { VSOMEIP_INFO << "status tse: " << std::dec << local_port_ << " connections: " << std::dec << its_connections.size() - << " queues: " << std::dec << queues_.size(); + << " targets: " << std::dec << targets_.size(); for (const auto &c : its_connections) { std::size_t its_data_size(0); std::size_t its_queue_size(0); @@ -963,10 +996,10 @@ void tcp_server_endpoint_impl::print_status() { std::unique_lock<std::mutex> c_s_lock(c.second->get_socket_lock()); its_recv_size = c.second->get_recv_buffer_capacity(); } - auto found_queue = queues_.find(c.first); - if (found_queue != queues_.end()) { - its_queue_size = found_queue->second.second.size(); - its_data_size = found_queue->second.first; + auto found_queue = targets_.find(c.first); + if (found_queue != targets_.end()) { + its_queue_size = found_queue->second.queue_.size(); + its_data_size = found_queue->second.queue_size_; } VSOMEIP_INFO << "status tse: client: " << c.second->get_address_port_remote() @@ -977,10 +1010,10 @@ void tcp_server_endpoint_impl::print_status() { } std::string tcp_server_endpoint_impl::get_remote_information( - const queue_iterator_type _queue_iterator) const { + const target_data_iterator_type _it) const { boost::system::error_code ec; - return _queue_iterator->first.address().to_string(ec) + ":" - + std::to_string(_queue_iterator->first.port()); + return _it->first.address().to_string(ec) + ":" + + std::to_string(_it->first.port()); } std::string tcp_server_endpoint_impl::get_remote_information( |