summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/tcp_client_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp247
1 files changed, 147 insertions, 100 deletions
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index 3debcc7..f88d2a2 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -50,6 +50,7 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl(
tcp_restart_aborts_max_(configuration_->get_max_tcp_restart_aborts()),
tcp_connect_time_max_(configuration_->get_max_tcp_connect_time()),
aborted_restart_count_(0),
+ is_sending_(false),
sent_timer_(_io) {
is_supporting_magic_cookies_ = true;
@@ -67,64 +68,71 @@ bool tcp_client_endpoint_impl::is_local() const {
}
void tcp_client_endpoint_impl::start() {
- connect();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
}
void tcp_client_endpoint_impl::restart(bool _force) {
- if (!_force && state_ == cei_state_e::CONNECTING) {
- std::chrono::steady_clock::time_point its_current
- = std::chrono::steady_clock::now();
- long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
- its_current - connect_timepoint_).count();
- if (aborted_restart_count_ < tcp_restart_aborts_max_
- && its_connect_duration < tcp_connect_time_max_) {
- aborted_restart_count_++;
- return;
- } else {
- VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
- << tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
- << its_connect_duration;
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ auto restart_func = [self, _force] {
+ if (!_force && self->state_ == cei_state_e::CONNECTING) {
+ std::chrono::steady_clock::time_point its_current
+ = std::chrono::steady_clock::now();
+ long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+ its_current - self->connect_timepoint_).count();
+ if (self->aborted_restart_count_ < self->tcp_restart_aborts_max_
+ && its_connect_duration < self->tcp_connect_time_max_) {
+ self->aborted_restart_count_++;
+ return;
+ } else {
+ VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
+ << self->tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
+ << its_connect_duration;
+ }
}
- }
- state_ = cei_state_e::CONNECTING;
- std::string address_port_local;
- {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- address_port_local = get_address_port_local();
- shutdown_and_close_socket_unlocked(true);
- recv_buffer_ = std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0);
- }
- was_not_connected_ = true;
- reconnect_counter_ = 0;
- {
- std::lock_guard<std::mutex> its_lock(mutex_);
- for (const auto&m : queue_) {
- const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_SERVICE_POS_MIN],
- (*m)[VSOMEIP_SERVICE_POS_MAX]);
- const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_METHOD_POS_MIN],
- (*m)[VSOMEIP_METHOD_POS_MAX]);
- const client_t its_client = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_CLIENT_POS_MIN],
- (*m)[VSOMEIP_CLIENT_POS_MAX]);
- const session_t its_session = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_SESSION_POS_MIN],
- (*m)[VSOMEIP_SESSION_POS_MAX]);
- VSOMEIP_WARNING << "tce::restart: dropping message: "
- << "remote:" << get_address_port_remote() << " ("
- << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
- << " size: " << std::dec << m->size();
+ self->state_ = cei_state_e::CONNECTING;
+ std::string address_port_local;
+ {
+ std::lock_guard<std::mutex> its_lock(self->socket_mutex_);
+ address_port_local = self->get_address_port_local();
+ self->shutdown_and_close_socket_unlocked(true);
+ self->recv_buffer_ = std::make_shared<message_buffer_t>(self->recv_buffer_size_initial_, 0);
}
- queue_.clear();
- queue_size_ = 0;
- }
- VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
- << " remote: " << get_address_port_remote();
- start_connect_timer();
+ self->was_not_connected_ = true;
+ self->reconnect_counter_ = 0;
+ {
+ std::lock_guard<std::mutex> its_lock(self->mutex_);
+ for (const auto&m : self->queue_) {
+ const service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_SERVICE_POS_MIN],
+ (*m)[VSOMEIP_SERVICE_POS_MAX]);
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_METHOD_POS_MIN],
+ (*m)[VSOMEIP_METHOD_POS_MAX]);
+ const client_t its_client = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_CLIENT_POS_MIN],
+ (*m)[VSOMEIP_CLIENT_POS_MAX]);
+ const session_t its_session = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_SESSION_POS_MIN],
+ (*m)[VSOMEIP_SESSION_POS_MAX]);
+ VSOMEIP_WARNING << "tce::restart: dropping message: "
+ << "remote:" << self->get_address_port_remote() << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
+ << " size: " << std::dec << m->size();
+ }
+ self->queue_.clear();
+ self->queue_size_ = 0;
+ }
+ VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
+ << " remote: " << self->get_address_port_remote();
+ self->start_connect_timer();
+ };
+ // bind to strand_ to avoid socket closure if
+ // parallel socket operation is currently active
+ strand_.dispatch(restart_func);
}
void tcp_client_endpoint_impl::connect() {
@@ -169,26 +177,54 @@ void tcp_client_endpoint_impl::connect() {
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(socket_->native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "TCP Client: Could not bind to device \"" << its_device << "\"";
}
}
#endif
- // Bind address and, optionally, port.
- boost::system::error_code its_bind_error;
- socket_->bind(local_, its_bind_error);
- if(its_bind_error) {
- VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
- "Error binding socket: " << its_bind_error.message()
- << " remote:" << get_address_port_remote();
- try {
- // don't connect on bind error to avoid using a random port
- strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
- shared_from_this(), its_bind_error));
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
- << e.what() << " remote:" << get_address_port_remote();
+ // In case a client endpoint port was configured,
+ // bind to it before connecting
+ if (local_.port() != ILLEGAL_PORT) {
+ boost::system::error_code its_bind_error;
+ socket_->bind(local_, its_bind_error);
+ if(its_bind_error) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
+ "Error binding socket: " << its_bind_error.message()
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+
+ std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
+ if (its_host) {
+ // set new client port depending on service / instance / remote port
+ if (!its_host->on_bind_error(shared_from_this(), remote_port_)) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
+ "Failed to set new local port for tce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ } else {
+ local_.port(local_port_);
+ VSOMEIP_INFO << "tcp_client_endpoint::connect: "
+ "Using new new local port for tce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+ }
+ try {
+ // don't connect on bind error to avoid using a random port
+ strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
+ shared_from_this(), its_bind_error));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
+ << e.what()
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+ return;
}
return;
}
@@ -220,7 +256,10 @@ void tcp_client_endpoint_impl::receive() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
its_recv_buffer = recv_buffer_;
}
- receive(its_recv_buffer, 0, 0);
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch([self, &its_recv_buffer](){
+ self->receive(its_recv_buffer, 0, 0);
+ });
}
void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
@@ -277,32 +316,26 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
}
}
-void tcp_client_endpoint_impl::send_queued() {
- message_buffer_ptr_t its_buffer;
- if(queue_.size()) {
- its_buffer = queue_.front();
- } else {
- return;
- }
+void tcp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
- (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
+ (*_buffer)[VSOMEIP_SERVICE_POS_MIN],
+ (*_buffer)[VSOMEIP_SERVICE_POS_MAX]);
const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_METHOD_POS_MIN],
- (*its_buffer)[VSOMEIP_METHOD_POS_MAX]);
+ (*_buffer)[VSOMEIP_METHOD_POS_MIN],
+ (*_buffer)[VSOMEIP_METHOD_POS_MAX]);
const client_t its_client = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_CLIENT_POS_MIN],
- (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]);
+ (*_buffer)[VSOMEIP_CLIENT_POS_MIN],
+ (*_buffer)[VSOMEIP_CLIENT_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SESSION_POS_MIN],
- (*its_buffer)[VSOMEIP_SESSION_POS_MAX]);
+ (*_buffer)[VSOMEIP_SESSION_POS_MIN],
+ (*_buffer)[VSOMEIP_SESSION_POS_MAX]);
if (has_enabled_magic_cookies_) {
const std::chrono::steady_clock::time_point now =
std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
- send_magic_cookie(its_buffer);
+ send_magic_cookie(_buffer);
last_cookie_sent_ = now;
}
}
@@ -312,9 +345,9 @@ void tcp_client_endpoint_impl::send_queued() {
std::stringstream msg;
msg << "tcei<" << remote_.address() << ":"
<< std::dec << remote_.port() << ">::sq: ";
- for (std::size_t i = 0; i < its_buffer->size(); i++)
+ for (std::size_t i = 0; i < _buffer->size(); i++)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)(*its_buffer)[i] << " ";
+ << (int)(*_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
{
@@ -326,21 +359,23 @@ void tcp_client_endpoint_impl::send_queued() {
}
boost::asio::async_write(
*socket_,
- boost::asio::buffer(*its_buffer),
- std::bind(&tcp_client_endpoint_impl::write_completion_condition,
- std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
- std::placeholders::_1,
- std::placeholders::_2,
- its_buffer->size(),
- its_service, its_method, its_client, its_session,
- std::chrono::steady_clock::now()),
+ boost::asio::buffer(*_buffer),
std::bind(
+ &tcp_client_endpoint_impl::write_completion_condition,
+ std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ _buffer->size(),
+ its_service, its_method, its_client, its_session,
+ std::chrono::steady_clock::now()),
+ strand_.wrap(
+ std::bind(
&tcp_client_endpoint_base_impl::send_cbk,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2,
- its_buffer
- )
+ _buffer
+ ))
);
}
}
@@ -675,7 +710,10 @@ void tcp_client_endpoint_impl::receive_cbk(
}
}
its_lock.unlock();
- receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
+ self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ });
} else {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
<< _error.message() << "(" << std::dec << _error.value()
@@ -700,7 +738,10 @@ void tcp_client_endpoint_impl::receive_cbk(
}
} else {
its_lock.unlock();
- receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
+ self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ });
}
}
}
@@ -838,7 +879,13 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
if (queue_.size() > 0) {
queue_size_ -= queue_.front()->size();
queue_.pop_front();
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch(
+ [self, its_buffer]() { self->send_queued(its_buffer);}
+ );
+ }
}
} else if (_error == boost::system::errc::destination_address_required) {
VSOMEIP_WARNING << "tce::send_cbk received error: " << _error.message()