summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/tcp_server_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/tcp_server_endpoint_impl.cpp171
1 files changed, 102 insertions, 69 deletions
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
index 37db3f5..f23e9be 100644
--- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
@@ -1,5 +1,5 @@
-// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -26,7 +26,7 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl(
const std::shared_ptr<endpoint_host>& _endpoint_host,
const std::shared_ptr<routing_host>& _routing_host,
const endpoint_type& _local,
- boost::asio::io_service &_io,
+ boost::asio::io_context &_io,
const std::shared_ptr<configuration>& _configuration)
: tcp_server_endpoint_base_impl(_endpoint_host, _routing_host, _local, _io,
_configuration->get_max_message_size_reliable(_local.address().to_string(), _local.port()),
@@ -41,11 +41,16 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl(
boost::system::error_code ec;
acceptor_.open(_local.protocol(), ec);
- boost::asio::detail::throw_error(ec, "acceptor open");
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": open failed (" << ec.message() << ")";
+
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
- boost::asio::detail::throw_error(ec, "acceptor set_option");
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": set reuse address option failed (" << ec.message() << ")";
-#ifndef _WIN32
+#if defined(__linux__) || defined(ANDROID)
// If specified, bind to device
std::string its_device(configuration_->get_device());
if (its_device != "") {
@@ -57,9 +62,14 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl(
#endif
acceptor_.bind(_local, ec);
- boost::asio::detail::throw_error(ec, "acceptor bind");
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": bind failed (" << ec.message() << ")";
+
acceptor_.listen(boost::asio::socket_base::max_connections, ec);
- boost::asio::detail::throw_error(ec, "acceptor listen");
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": listen failed (" << ec.message() << ")";
}
tcp_server_endpoint_impl::~tcp_server_endpoint_impl() {
@@ -76,7 +86,7 @@ void tcp_server_endpoint_impl::start() {
std::dynamic_pointer_cast<tcp_server_endpoint_impl>(
shared_from_this()), max_message_size_,
buffer_shrink_threshold_, has_enabled_magic_cookies_,
- service_, send_timeout_);
+ io_, send_timeout_);
{
std::unique_lock<std::mutex> its_socket_lock(new_connection->get_socket_lock());
@@ -121,44 +131,47 @@ bool tcp_server_endpoint_impl::send_error(
bool ret(false);
std::lock_guard<std::mutex> its_lock(mutex_);
const endpoint_type its_target(_target->get_address(), _target->get_port());
- const queue_iterator_type target_queue_iterator(find_or_create_queue_unlocked(its_target));
- auto& its_qpair = target_queue_iterator->second;
- const bool queue_size_zero_on_entry(its_qpair.second.empty());
+ const auto its_target_iterator(find_or_create_target_unlocked(its_target));
+ auto &its_data = its_target_iterator->second;
+ const bool queue_size_zero_on_entry(its_data.queue_.empty());
if (check_message_size(nullptr, _size, its_target) == endpoint_impl::cms_ret_e::MSG_OK &&
- check_queue_limit(_data, _size, its_qpair.first)) {
- its_qpair.second.emplace_back(
- std::make_shared<message_buffer_t>(_data, _data + _size));
- its_qpair.first += _size;
+ check_queue_limit(_data, _size, its_data.queue_size_)) {
+ its_data.queue_.emplace_back(
+ std::make_pair(std::make_shared<message_buffer_t>(_data, _data + _size), 0));
+ its_data.queue_size_ += _size;
if (queue_size_zero_on_entry) { // no writing in progress
- send_queued(target_queue_iterator);
+ send_queued(its_target_iterator);
}
ret = true;
}
return ret;
}
-void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iterator) {
+bool tcp_server_endpoint_impl::send_queued(const target_data_iterator_type _it) {
+
+ bool must_erase(false);
connection::ptr its_connection;
{
std::lock_guard<std::mutex> its_lock(connections_mutex_);
- auto connection_iterator = connections_.find(_queue_iterator->first);
+ auto connection_iterator = connections_.find(_it->first);
if (connection_iterator != connections_.end()) {
its_connection = connection_iterator->second;
} else {
VSOMEIP_INFO << "Didn't find connection: "
- << _queue_iterator->first.address().to_string() << ":" << std::dec
- << static_cast<std::uint16_t>(_queue_iterator->first.port())
+ << _it->first.address().to_string() << ":" << std::dec
+ << static_cast<std::uint16_t>(_it->first.port())
<< " dropping outstanding messages (" << std::dec
- << _queue_iterator->second.second.size() << ").";
+ << _it->second.queue_.size() << ").";
- if (_queue_iterator->second.second.size()) {
+ if (_it->second.queue_.size()) {
std::set<service_t> its_services;
// check all outstanding messages of this connection
// whether stop handlers need to be called
- for (const auto &its_buffer : _queue_iterator->second.second) {
+ for (const auto &its_q : _it->second.queue_) {
+ auto its_buffer(its_q.first);
if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) {
service_t its_service = VSOMEIP_BYTES_TO_WORD(
(*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
@@ -176,7 +189,7 @@ void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iter
<< its_service;
auto handler = found_cbk->second;
auto ptr = this->shared_from_this();
- service_.post([ptr, handler, its_service](){
+ io_.post([ptr, handler, its_service](){
handler(ptr, its_service);
});
prepare_stop_handlers_.erase(found_cbk);
@@ -184,12 +197,14 @@ void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iter
}
}
- queues_.erase(_queue_iterator->first);
+ must_erase = true;
}
}
if (its_connection) {
- its_connection->send_queued(_queue_iterator);
+ its_connection->send_queued(_it);
}
+
+ return (must_erase);
}
void tcp_server_endpoint_impl::get_configured_times_from_endpoint(
@@ -202,7 +217,7 @@ void tcp_server_endpoint_impl::get_configured_times_from_endpoint(
_debouncing, _maximum_retention);
}
-bool tcp_server_endpoint_impl::is_established(const std::shared_ptr<endpoint_definition>& _endpoint) {
+bool tcp_server_endpoint_impl::is_established_to(const std::shared_ptr<endpoint_definition>& _endpoint) {
bool is_connected = false;
endpoint_type endpoint(_endpoint->get_address(), _endpoint->get_port());
{
@@ -274,7 +289,7 @@ void tcp_server_endpoint_impl::accept_cbk(const connection::ptr& _connection,
<< _error.message() << " (" << std::dec << _error.value()
<< ") Will try to accept again in 1000ms";
std::shared_ptr<boost::asio::steady_timer> its_timer =
- std::make_shared<boost::asio::steady_timer>(service_,
+ std::make_shared<boost::asio::steady_timer>(io_,
std::chrono::milliseconds(1000));
auto its_ep = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(
shared_from_this());
@@ -288,6 +303,7 @@ void tcp_server_endpoint_impl::accept_cbk(const connection::ptr& _connection,
}
std::uint16_t tcp_server_endpoint_impl::get_local_port() const {
+
return local_port_;
}
@@ -308,9 +324,9 @@ tcp_server_endpoint_impl::connection::connection(
std::uint32_t _recv_buffer_size_initial,
std::uint32_t _buffer_shrink_threshold,
bool _magic_cookies_enabled,
- boost::asio::io_service &_io_service,
+ boost::asio::io_context &_io,
std::chrono::milliseconds _send_timeout) :
- socket_(_io_service),
+ socket_(_io),
server_(_server),
max_message_size_(_max_message_size),
recv_buffer_size_initial_(_recv_buffer_size_initial),
@@ -326,20 +342,33 @@ tcp_server_endpoint_impl::connection::connection(
send_timeout_warning_(_send_timeout / 2) {
}
+tcp_server_endpoint_impl::connection::~connection() {
+
+ auto its_server(server_.lock());
+ if (its_server) {
+ auto its_routing_host(its_server->routing_host_.lock());
+ if (its_routing_host) {
+ its_routing_host->remove_subscriptions(
+ its_server->local_port_,
+ remote_address_, remote_port_);
+ }
+ }
+}
+
tcp_server_endpoint_impl::connection::ptr
tcp_server_endpoint_impl::connection::create(
const std::weak_ptr<tcp_server_endpoint_impl>& _server,
std::uint32_t _max_message_size,
std::uint32_t _buffer_shrink_threshold,
bool _magic_cookies_enabled,
- boost::asio::io_service & _io_service,
+ boost::asio::io_context &_io,
std::chrono::milliseconds _send_timeout) {
const std::uint32_t its_initial_receveive_buffer_size =
VSOMEIP_SOMEIP_HEADER_SIZE + 8 + MAGIC_COOKIE_SIZE + 8;
return ptr(new connection(_server, _max_message_size,
its_initial_receveive_buffer_size,
_buffer_shrink_threshold, _magic_cookies_enabled,
- _io_service, _send_timeout));
+ _io, _send_timeout));
}
tcp_server_endpoint_impl::socket_type &
@@ -360,7 +389,13 @@ void tcp_server_endpoint_impl::connection::receive() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
if(socket_.is_open()) {
const std::size_t its_capacity(recv_buffer_.capacity());
- size_t buffer_size = its_capacity - recv_buffer_size_;
+ if (recv_buffer_size_ > its_capacity) {
+ VSOMEIP_ERROR << __func__ << "Received buffer size is greater than the buffer capacity!"
+ << " recv_buffer_size_: " << recv_buffer_size_
+ << " its_capacity: " << its_capacity;
+ return;
+ }
+ size_t left_buffer_size = its_capacity - recv_buffer_size_;
try {
if (missing_capacity_) {
if (missing_capacity_ > MESSAGE_SIZE_UNLIMITED) {
@@ -369,6 +404,7 @@ void tcp_server_endpoint_impl::connection::receive() {
}
const std::size_t its_required_capacity(recv_buffer_size_ + missing_capacity_);
if (its_capacity < its_required_capacity) {
+ // Make the resize to its_required_capacity
recv_buffer_.reserve(its_required_capacity);
recv_buffer_.resize(its_required_capacity, 0x0);
if (recv_buffer_.size() > 1048576) {
@@ -378,14 +414,16 @@ void tcp_server_endpoint_impl::connection::receive() {
<< " remote: " << get_address_port_remote();
}
}
- buffer_size = missing_capacity_;
+ left_buffer_size = missing_capacity_;
missing_capacity_ = 0;
} else if (buffer_shrink_threshold_
&& shrink_count_ > buffer_shrink_threshold_
&& recv_buffer_size_ == 0) {
+ // In this case, make the resize to recv_buffer_size_initial_
recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
recv_buffer_.shrink_to_fit();
- buffer_size = recv_buffer_size_initial_;
+ // And set buffer_size to recv_buffer_size_initial_, the same of our resize
+ left_buffer_size = recv_buffer_size_initial_;
shrink_count_ = 0;
}
} catch (const std::exception &e) {
@@ -393,7 +431,7 @@ void tcp_server_endpoint_impl::connection::receive() {
// don't start receiving again
return;
}
- socket_.async_receive(boost::asio::buffer(&recv_buffer_[recv_buffer_size_], buffer_size),
+ socket_.async_receive(boost::asio::buffer(&recv_buffer_[recv_buffer_size_], left_buffer_size),
std::bind(&tcp_server_endpoint_impl::connection::receive_cbk,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2));
@@ -410,14 +448,15 @@ void tcp_server_endpoint_impl::connection::stop() {
}
void tcp_server_endpoint_impl::connection::send_queued(
- const queue_iterator_type _queue_iterator) {
+ const target_data_iterator_type _it) {
+
std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock());
if (!its_server) {
VSOMEIP_TRACE << "tcp_server_endpoint_impl::connection::send_queued "
" couldn't lock server_";
return;
}
- message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
+ message_buffer_ptr_t its_buffer = _it->second.queue_.front().first;
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
(*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
(*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
@@ -437,7 +476,7 @@ void tcp_server_endpoint_impl::connection::send_queued(
now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
if (send_magic_cookie(its_buffer)) {
last_cookie_sent_ = now;
- _queue_iterator->second.first += sizeof(SERVICE_COOKIE);
+ _it->second.queue_size_ += sizeof(SERVICE_COOKIE);
}
}
}
@@ -459,15 +498,16 @@ void tcp_server_endpoint_impl::connection::send_queued(
std::chrono::steady_clock::now()),
std::bind(&tcp_server_endpoint_base_impl::send_cbk,
its_server,
- _queue_iterator,
+ _it,
std::placeholders::_1,
std::placeholders::_2));
}
}
void tcp_server_endpoint_impl::connection::send_queued_sync(
- const queue_iterator_type _queue_iterator) {
- message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
+ const target_data_iterator_type _it) {
+
+ message_buffer_ptr_t its_buffer = _it->second.queue_.front().first;
if (magic_cookies_enabled_) {
const std::chrono::steady_clock::time_point now =
std::chrono::steady_clock::now();
@@ -475,7 +515,7 @@ void tcp_server_endpoint_impl::connection::send_queued_sync(
now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
if (send_magic_cookie(its_buffer)) {
last_cookie_sent_ = now;
- _queue_iterator->second.first += sizeof(SERVICE_COOKIE);
+ _it->second.queue_size_ += sizeof(SERVICE_COOKIE);
}
}
}
@@ -587,36 +627,29 @@ void tcp_server_endpoint_impl::connection::receive_cbk(
recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN],
recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MAX]);
if (its_client != MAGIC_COOKIE_CLIENT) {
- const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MIN],
- recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MAX]);
- const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MIN],
- recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN],
recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MAX]);
-
- std::lock_guard<std::mutex> its_requests_guard(its_server->requests_mutex_);
- its_server->requests_[its_client]
- [std::make_tuple(its_service, its_method, its_session)] = remote_;
+ its_server->clients_mutex_.lock();
+ its_server->clients_[its_client][its_session] = remote_;
+ its_server->clients_mutex_.unlock();
}
}
if (!magic_cookies_enabled_) {
its_host->on_message(&recv_buffer_[its_iteration_gap],
current_message_size, its_server.get(),
- boost::asio::ip::address(),
+ false,
VSOMEIP_ROUTING_CLIENT,
- std::make_pair(ANY_UID, ANY_GID),
+ nullptr,
remote_address_, remote_port_);
} else {
// Only call on_message without a magic cookie in front of the buffer!
if (!is_magic_cookie(its_iteration_gap)) {
its_host->on_message(&recv_buffer_[its_iteration_gap],
current_message_size, its_server.get(),
- boost::asio::ip::address(),
+ false,
VSOMEIP_ROUTING_CLIENT,
- std::make_pair(ANY_UID, ANY_GID),
+ nullptr,
remote_address_, remote_port_);
}
}
@@ -677,9 +710,9 @@ void tcp_server_endpoint_impl::connection::receive_cbk(
// ensure to send back a error message w/ wrong protocol version
its_host->on_message(&recv_buffer_[its_iteration_gap],
VSOMEIP_SOMEIP_HEADER_SIZE + 8, its_server.get(),
- boost::asio::ip::address(),
+ false,
VSOMEIP_ROUTING_CLIENT,
- std::make_pair(ANY_UID, ANY_GID),
+ nullptr,
remote_address_, remote_port_);
} else if (!utility::is_valid_message_type(static_cast<message_type_e>(
recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]))) {
@@ -814,7 +847,7 @@ void tcp_server_endpoint_impl::connection::set_remote_info(
remote_port_ = _remote.port();
}
-const std::string tcp_server_endpoint_impl::connection::get_address_port_remote() const {
+std::string tcp_server_endpoint_impl::connection::get_address_port_remote() const {
std::string its_address_port;
its_address_port.reserve(21);
boost::system::error_code ec;
@@ -824,7 +857,7 @@ const std::string tcp_server_endpoint_impl::connection::get_address_port_remote(
return its_address_port;
}
-const std::string tcp_server_endpoint_impl::connection::get_address_port_local() const {
+std::string tcp_server_endpoint_impl::connection::get_address_port_local() const {
std::string its_address_port;
its_address_port.reserve(21);
boost::system::error_code ec;
@@ -954,7 +987,7 @@ void tcp_server_endpoint_impl::print_status() {
VSOMEIP_INFO << "status tse: " << std::dec << local_port_
<< " connections: " << std::dec << its_connections.size()
- << " queues: " << std::dec << queues_.size();
+ << " targets: " << std::dec << targets_.size();
for (const auto &c : its_connections) {
std::size_t its_data_size(0);
std::size_t its_queue_size(0);
@@ -963,10 +996,10 @@ void tcp_server_endpoint_impl::print_status() {
std::unique_lock<std::mutex> c_s_lock(c.second->get_socket_lock());
its_recv_size = c.second->get_recv_buffer_capacity();
}
- auto found_queue = queues_.find(c.first);
- if (found_queue != queues_.end()) {
- its_queue_size = found_queue->second.second.size();
- its_data_size = found_queue->second.first;
+ auto found_queue = targets_.find(c.first);
+ if (found_queue != targets_.end()) {
+ its_queue_size = found_queue->second.queue_.size();
+ its_data_size = found_queue->second.queue_size_;
}
VSOMEIP_INFO << "status tse: client: "
<< c.second->get_address_port_remote()
@@ -977,10 +1010,10 @@ void tcp_server_endpoint_impl::print_status() {
}
std::string tcp_server_endpoint_impl::get_remote_information(
- const queue_iterator_type _queue_iterator) const {
+ const target_data_iterator_type _it) const {
boost::system::error_code ec;
- return _queue_iterator->first.address().to_string(ec) + ":"
- + std::to_string(_queue_iterator->first.port());
+ return _it->first.address().to_string(ec) + ":"
+ + std::to_string(_it->first.port());
}
std::string tcp_server_endpoint_impl::get_remote_information(