diff options
Diffstat (limited to 'implementation/endpoints/src/tcp_client_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/tcp_client_endpoint_impl.cpp | 70 |
1 files changed, 57 insertions, 13 deletions
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index 2c56521..2a86244 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -49,7 +49,9 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl( send_timeout_warning_(send_timeout_ / 2), tcp_restart_aborts_max_(configuration_->get_max_tcp_restart_aborts()), tcp_connect_time_max_(configuration_->get_max_tcp_connect_time()), - aborted_restart_count_(0) { + aborted_restart_count_(0), + sent_timer_(_io) { + is_supporting_magic_cookies_ = true; } @@ -320,6 +322,10 @@ void tcp_client_endpoint_impl::send_queued() { { std::lock_guard<std::mutex> its_lock(socket_mutex_); if (socket_->is_open()) { + { + std::lock_guard<std::mutex> its_sent_lock(sent_mutex_); + is_sending_ = true; + } boost::asio::async_write( *socket_, boost::asio::buffer(*its_buffer), @@ -584,9 +590,10 @@ void tcp_client_endpoint_impl::receive_cbk( state_ = cei_state_e::CONNECTING; shutdown_and_close_socket_unlocked(false); its_lock.unlock(); - std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock(); - its_ep_host->on_disconnect(shared_from_this()); - restart(true); + + // wait_until_sent interprets "no error" as timeout. + // Therefore call it with an error. + wait_until_sent(boost::asio::error::operation_aborted); return; } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED && current_message_size > max_message_size_) { @@ -612,9 +619,10 @@ void tcp_client_endpoint_impl::receive_cbk( state_ = cei_state_e::CONNECTING; shutdown_and_close_socket_unlocked(false); its_lock.unlock(); - std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock(); - its_ep_host->on_disconnect(shared_from_this()); - restart(true); + + // wait_until_sent interprets "no error" as timeout. + // Therefore call it with an error. + wait_until_sent(boost::asio::error::operation_aborted); return; } } else if (current_message_size > _recv_buffer_size) { @@ -649,9 +657,10 @@ void tcp_client_endpoint_impl::receive_cbk( state_ = cei_state_e::CONNECTING; shutdown_and_close_socket_unlocked(false); its_lock.unlock(); - std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock(); - its_ep_host->on_disconnect(shared_from_this()); - restart(true); + + // wait_until_sent interprets "no error" as timeout. + // Therefore call it with an error. + wait_until_sent(boost::asio::error::operation_aborted); return; } } @@ -686,9 +695,10 @@ void tcp_client_endpoint_impl::receive_cbk( state_ = cei_state_e::CONNECTING; shutdown_and_close_socket_unlocked(false); its_lock.unlock(); - std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock(); - its_ep_host->on_disconnect(shared_from_this()); - restart(true); + + // wait_until_sent interprets "no error" as timeout. + // Therefore call it with an error. + wait_until_sent(boost::asio::error::operation_aborted); } } else { its_lock.unlock(); @@ -813,6 +823,18 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, std::size_t _bytes, const message_buffer_ptr_t& _sent_msg) { (void)_bytes; + + { + // Signal that the current send operation has finished. + // Note: Waiting is always done after having closed the socket. + // Therefore, no new send operation will be scheduled. + std::lock_guard<std::mutex> its_sent_lock(sent_mutex_); + is_sending_ = false; + + boost::system::error_code ec; + sent_timer_.cancel(ec); + } + if (!_error) { std::lock_guard<std::mutex> its_lock(mutex_); if (queue_.size() > 0) { @@ -886,4 +908,26 @@ void tcp_client_endpoint_impl::max_allowed_reconnects_reached() { return; } +void tcp_client_endpoint_impl::wait_until_sent(const boost::system::error_code &_error) { + + std::unique_lock<std::mutex> its_sent_lock(sent_mutex_); + if (!is_sending_ || !_error) { + its_sent_lock.unlock(); + if (!_error) + VSOMEIP_WARNING << __func__ + << ": Maximum wait time for send operation exceeded."; + + std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock(); + its_ep_host->on_disconnect(shared_from_this()); + restart(true); + } else { + std::chrono::milliseconds its_timeout(VSOMEIP_MAX_TCP_SENT_WAIT_TIME); + boost::system::error_code ec; + sent_timer_.expires_from_now(its_timeout, ec); + sent_timer_.async_wait(std::bind(&tcp_client_endpoint_impl::wait_until_sent, + std::dynamic_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()), + std::placeholders::_1)); + } +} + } // namespace vsomeip_v3 |