summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/tcp_client_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp70
1 files changed, 57 insertions, 13 deletions
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index 2c56521..2a86244 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -49,7 +49,9 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl(
send_timeout_warning_(send_timeout_ / 2),
tcp_restart_aborts_max_(configuration_->get_max_tcp_restart_aborts()),
tcp_connect_time_max_(configuration_->get_max_tcp_connect_time()),
- aborted_restart_count_(0) {
+ aborted_restart_count_(0),
+ sent_timer_(_io) {
+
is_supporting_magic_cookies_ = true;
}
@@ -320,6 +322,10 @@ void tcp_client_endpoint_impl::send_queued() {
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
if (socket_->is_open()) {
+ {
+ std::lock_guard<std::mutex> its_sent_lock(sent_mutex_);
+ is_sending_ = true;
+ }
boost::asio::async_write(
*socket_,
boost::asio::buffer(*its_buffer),
@@ -584,9 +590,10 @@ void tcp_client_endpoint_impl::receive_cbk(
state_ = cei_state_e::CONNECTING;
shutdown_and_close_socket_unlocked(false);
its_lock.unlock();
- std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
- its_ep_host->on_disconnect(shared_from_this());
- restart(true);
+
+ // wait_until_sent interprets "no error" as timeout.
+ // Therefore call it with an error.
+ wait_until_sent(boost::asio::error::operation_aborted);
return;
} else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED &&
current_message_size > max_message_size_) {
@@ -612,9 +619,10 @@ void tcp_client_endpoint_impl::receive_cbk(
state_ = cei_state_e::CONNECTING;
shutdown_and_close_socket_unlocked(false);
its_lock.unlock();
- std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
- its_ep_host->on_disconnect(shared_from_this());
- restart(true);
+
+ // wait_until_sent interprets "no error" as timeout.
+ // Therefore call it with an error.
+ wait_until_sent(boost::asio::error::operation_aborted);
return;
}
} else if (current_message_size > _recv_buffer_size) {
@@ -649,9 +657,10 @@ void tcp_client_endpoint_impl::receive_cbk(
state_ = cei_state_e::CONNECTING;
shutdown_and_close_socket_unlocked(false);
its_lock.unlock();
- std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
- its_ep_host->on_disconnect(shared_from_this());
- restart(true);
+
+ // wait_until_sent interprets "no error" as timeout.
+ // Therefore call it with an error.
+ wait_until_sent(boost::asio::error::operation_aborted);
return;
}
}
@@ -686,9 +695,10 @@ void tcp_client_endpoint_impl::receive_cbk(
state_ = cei_state_e::CONNECTING;
shutdown_and_close_socket_unlocked(false);
its_lock.unlock();
- std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
- its_ep_host->on_disconnect(shared_from_this());
- restart(true);
+
+ // wait_until_sent interprets "no error" as timeout.
+ // Therefore call it with an error.
+ wait_until_sent(boost::asio::error::operation_aborted);
}
} else {
its_lock.unlock();
@@ -813,6 +823,18 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
std::size_t _bytes,
const message_buffer_ptr_t& _sent_msg) {
(void)_bytes;
+
+ {
+ // Signal that the current send operation has finished.
+ // Note: Waiting is always done after having closed the socket.
+ // Therefore, no new send operation will be scheduled.
+ std::lock_guard<std::mutex> its_sent_lock(sent_mutex_);
+ is_sending_ = false;
+
+ boost::system::error_code ec;
+ sent_timer_.cancel(ec);
+ }
+
if (!_error) {
std::lock_guard<std::mutex> its_lock(mutex_);
if (queue_.size() > 0) {
@@ -886,4 +908,26 @@ void tcp_client_endpoint_impl::max_allowed_reconnects_reached() {
return;
}
+void tcp_client_endpoint_impl::wait_until_sent(const boost::system::error_code &_error) {
+
+ std::unique_lock<std::mutex> its_sent_lock(sent_mutex_);
+ if (!is_sending_ || !_error) {
+ its_sent_lock.unlock();
+ if (!_error)
+ VSOMEIP_WARNING << __func__
+ << ": Maximum wait time for send operation exceeded.";
+
+ std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
+ its_ep_host->on_disconnect(shared_from_this());
+ restart(true);
+ } else {
+ std::chrono::milliseconds its_timeout(VSOMEIP_MAX_TCP_SENT_WAIT_TIME);
+ boost::system::error_code ec;
+ sent_timer_.expires_from_now(its_timeout, ec);
+ sent_timer_.async_wait(std::bind(&tcp_client_endpoint_impl::wait_until_sent,
+ std::dynamic_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
+ std::placeholders::_1));
+ }
+}
+
} // namespace vsomeip_v3