diff options
Diffstat (limited to 'implementation/endpoints/src/tcp_server_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/tcp_server_endpoint_impl.cpp | 69 |
1 files changed, 42 insertions, 27 deletions
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index b7b1d0a..1cd2b5b 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -408,6 +408,11 @@ void tcp_server_endpoint_impl::connection::send_queued( { std::lock_guard<std::mutex> its_lock(socket_mutex_); + { + std::lock_guard<std::mutex> its_sent_lock(its_server->sent_mutex_); + its_server->is_sending_ = true; + } + boost::asio::async_write(socket_, boost::asio::buffer(*its_buffer), std::bind(&tcp_server_endpoint_impl::connection::write_completion_condition, shared_from_this(), @@ -652,19 +657,15 @@ void tcp_server_endpoint_impl::connection::receive_cbk( << " remote: " << get_address_port_remote() << ". Closing connection due to missing/broken data TCP stream."; } - { - std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_); - stop(); - } - its_server->remove_connection(this); + wait_until_sent(boost::asio::error::operation_aborted); return; } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED && current_message_size > max_message_size_) { - std::lock_guard<std::mutex> its_lock(socket_mutex_); recv_buffer_size_ = 0; recv_buffer_.resize(recv_buffer_size_initial_, 0x0); recv_buffer_.shrink_to_fit(); if (magic_cookies_enabled_) { + std::lock_guard<std::mutex> its_lock(socket_mutex_); VSOMEIP_ERROR << "Received a TCP message which exceeds " << "maximum message size (" << std::dec << current_message_size @@ -674,19 +675,18 @@ void tcp_server_endpoint_impl::connection::receive_cbk( << get_address_port_local() << " remote: " << get_address_port_remote(); } else { - VSOMEIP_ERROR << "Received a TCP message which exceeds " - << "maximum message size (" - << std::dec << current_message_size - << " > " << std::dec << max_message_size_ - << ") Magic cookies are disabled: " - << "Connection will be closed! local: " - << get_address_port_local() << " remote: " - << get_address_port_remote(); { - std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_); - stop(); + std::lock_guard<std::mutex> its_lock(socket_mutex_); + VSOMEIP_ERROR << "Received a TCP message which exceeds " + << "maximum message size (" + << std::dec << current_message_size + << " > " << std::dec << max_message_size_ + << ") Magic cookies are disabled: " + << "Connection will be closed! local: " + << get_address_port_local() << " remote: " + << get_address_port_remote(); } - its_server->remove_connection(this); + wait_until_sent(boost::asio::error::operation_aborted); return; } } else if (current_message_size > recv_buffer_size_) { @@ -720,11 +720,7 @@ void tcp_server_endpoint_impl::connection::receive_cbk( << " remote: " << get_address_port_remote() << ". Closing connection due to missing/broken data TCP stream."; } - { - std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_); - stop(); - } - its_server->remove_connection(this); + wait_until_sent(boost::asio::error::operation_aborted); return; } } @@ -752,11 +748,7 @@ void tcp_server_endpoint_impl::connection::receive_cbk( << " local: " << get_address_port_local() << " remote: " << get_address_port_remote(); } - { - std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_); - stop(); - } - its_server->remove_connection(this); + wait_until_sent(boost::asio::error::operation_aborted); } } @@ -962,4 +954,27 @@ bool tcp_server_endpoint_impl::tp_segmentation_enabled(service_t _service, return false; } +void tcp_server_endpoint_impl::connection::wait_until_sent(const boost::system::error_code &_error) { + std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock()); + std::unique_lock<std::mutex> its_sent_lock(its_server->sent_mutex_); + if (!its_server->is_sending_ || !_error) { + its_sent_lock.unlock(); + if (!_error) + VSOMEIP_WARNING << __func__ + << ": Maximum wait time for send operation exceeded for tse."; + { + std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_); + stop(); + } + its_server->remove_connection(this); + } else { + std::chrono::milliseconds its_timeout(VSOMEIP_MAX_TCP_SENT_WAIT_TIME); + boost::system::error_code ec; + its_server->sent_timer_.expires_from_now(its_timeout, ec); + its_server->sent_timer_.async_wait(std::bind(&tcp_server_endpoint_impl::connection::wait_until_sent, + std::dynamic_pointer_cast<tcp_server_endpoint_impl::connection>(shared_from_this()), + std::placeholders::_1)); + } +} + } // namespace vsomeip_v3 |