diff options
Diffstat (limited to 'implementation/endpoints/src/tcp_client_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/tcp_client_endpoint_impl.cpp | 247 |
1 files changed, 147 insertions, 100 deletions
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index 3debcc7..f88d2a2 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -50,6 +50,7 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl( tcp_restart_aborts_max_(configuration_->get_max_tcp_restart_aborts()), tcp_connect_time_max_(configuration_->get_max_tcp_connect_time()), aborted_restart_count_(0), + is_sending_(false), sent_timer_(_io) { is_supporting_magic_cookies_ = true; @@ -67,64 +68,71 @@ bool tcp_client_endpoint_impl::is_local() const { } void tcp_client_endpoint_impl::start() { - connect(); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); } void tcp_client_endpoint_impl::restart(bool _force) { - if (!_force && state_ == cei_state_e::CONNECTING) { - std::chrono::steady_clock::time_point its_current - = std::chrono::steady_clock::now(); - long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>( - its_current - connect_timepoint_).count(); - if (aborted_restart_count_ < tcp_restart_aborts_max_ - && its_connect_duration < tcp_connect_time_max_) { - aborted_restart_count_++; - return; - } else { - VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts [" - << tcp_restart_aborts_max_ << "] reached! its_connect_duration: " - << its_connect_duration; + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + auto restart_func = [self, _force] { + if (!_force && self->state_ == cei_state_e::CONNECTING) { + std::chrono::steady_clock::time_point its_current + = std::chrono::steady_clock::now(); + long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>( + its_current - self->connect_timepoint_).count(); + if (self->aborted_restart_count_ < self->tcp_restart_aborts_max_ + && its_connect_duration < self->tcp_connect_time_max_) { + self->aborted_restart_count_++; + return; + } else { + VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts [" + << self->tcp_restart_aborts_max_ << "] reached! its_connect_duration: " + << its_connect_duration; + } } - } - state_ = cei_state_e::CONNECTING; - std::string address_port_local; - { - std::lock_guard<std::mutex> its_lock(socket_mutex_); - address_port_local = get_address_port_local(); - shutdown_and_close_socket_unlocked(true); - recv_buffer_ = std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0); - } - was_not_connected_ = true; - reconnect_counter_ = 0; - { - std::lock_guard<std::mutex> its_lock(mutex_); - for (const auto&m : queue_) { - const service_t its_service = VSOMEIP_BYTES_TO_WORD( - (*m)[VSOMEIP_SERVICE_POS_MIN], - (*m)[VSOMEIP_SERVICE_POS_MAX]); - const method_t its_method = VSOMEIP_BYTES_TO_WORD( - (*m)[VSOMEIP_METHOD_POS_MIN], - (*m)[VSOMEIP_METHOD_POS_MAX]); - const client_t its_client = VSOMEIP_BYTES_TO_WORD( - (*m)[VSOMEIP_CLIENT_POS_MIN], - (*m)[VSOMEIP_CLIENT_POS_MAX]); - const session_t its_session = VSOMEIP_BYTES_TO_WORD( - (*m)[VSOMEIP_SESSION_POS_MIN], - (*m)[VSOMEIP_SESSION_POS_MAX]); - VSOMEIP_WARNING << "tce::restart: dropping message: " - << "remote:" << get_address_port_remote() << " (" - << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" - << std::hex << std::setw(4) << std::setfill('0') << its_service << "." - << std::hex << std::setw(4) << std::setfill('0') << its_method << "." - << std::hex << std::setw(4) << std::setfill('0') << its_session << "]" - << " size: " << std::dec << m->size(); + self->state_ = cei_state_e::CONNECTING; + std::string address_port_local; + { + std::lock_guard<std::mutex> its_lock(self->socket_mutex_); + address_port_local = self->get_address_port_local(); + self->shutdown_and_close_socket_unlocked(true); + self->recv_buffer_ = std::make_shared<message_buffer_t>(self->recv_buffer_size_initial_, 0); } - queue_.clear(); - queue_size_ = 0; - } - VSOMEIP_WARNING << "tce::restart: local: " << address_port_local - << " remote: " << get_address_port_remote(); - start_connect_timer(); + self->was_not_connected_ = true; + self->reconnect_counter_ = 0; + { + std::lock_guard<std::mutex> its_lock(self->mutex_); + for (const auto&m : self->queue_) { + const service_t its_service = VSOMEIP_BYTES_TO_WORD( + (*m)[VSOMEIP_SERVICE_POS_MIN], + (*m)[VSOMEIP_SERVICE_POS_MAX]); + const method_t its_method = VSOMEIP_BYTES_TO_WORD( + (*m)[VSOMEIP_METHOD_POS_MIN], + (*m)[VSOMEIP_METHOD_POS_MAX]); + const client_t its_client = VSOMEIP_BYTES_TO_WORD( + (*m)[VSOMEIP_CLIENT_POS_MIN], + (*m)[VSOMEIP_CLIENT_POS_MAX]); + const session_t its_session = VSOMEIP_BYTES_TO_WORD( + (*m)[VSOMEIP_SESSION_POS_MIN], + (*m)[VSOMEIP_SESSION_POS_MAX]); + VSOMEIP_WARNING << "tce::restart: dropping message: " + << "remote:" << self->get_address_port_remote() << " (" + << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "." + << std::hex << std::setw(4) << std::setfill('0') << its_session << "]" + << " size: " << std::dec << m->size(); + } + self->queue_.clear(); + self->queue_size_ = 0; + } + VSOMEIP_WARNING << "tce::restart: local: " << address_port_local + << " remote: " << self->get_address_port_remote(); + self->start_connect_timer(); + }; + // bind to strand_ to avoid socket closure if + // parallel socket operation is currently active + strand_.dispatch(restart_func); } void tcp_client_endpoint_impl::connect() { @@ -169,26 +177,54 @@ void tcp_client_endpoint_impl::connect() { std::string its_device(configuration_->get_device()); if (its_device != "") { if (setsockopt(socket_->native_handle(), - SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) { + SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) { VSOMEIP_WARNING << "TCP Client: Could not bind to device \"" << its_device << "\""; } } #endif - // Bind address and, optionally, port. - boost::system::error_code its_bind_error; - socket_->bind(local_, its_bind_error); - if(its_bind_error) { - VSOMEIP_WARNING << "tcp_client_endpoint::connect: " - "Error binding socket: " << its_bind_error.message() - << " remote:" << get_address_port_remote(); - try { - // don't connect on bind error to avoid using a random port - strand_.post(std::bind(&client_endpoint_impl::connect_cbk, - shared_from_this(), its_bind_error)); - } catch (const std::exception &e) { - VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: " - << e.what() << " remote:" << get_address_port_remote(); + // In case a client endpoint port was configured, + // bind to it before connecting + if (local_.port() != ILLEGAL_PORT) { + boost::system::error_code its_bind_error; + socket_->bind(local_, its_bind_error); + if(its_bind_error) { + VSOMEIP_WARNING << "tcp_client_endpoint::connect: " + "Error binding socket: " << its_bind_error.message() + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + + std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock(); + if (its_host) { + // set new client port depending on service / instance / remote port + if (!its_host->on_bind_error(shared_from_this(), remote_port_)) { + VSOMEIP_WARNING << "tcp_client_endpoint::connect: " + "Failed to set new local port for tce: " + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } else { + local_.port(local_port_); + VSOMEIP_INFO << "tcp_client_endpoint::connect: " + "Using new new local port for tce: " + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } + } + try { + // don't connect on bind error to avoid using a random port + strand_.post(std::bind(&client_endpoint_impl::connect_cbk, + shared_from_this(), its_bind_error)); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: " + << e.what() + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } + return; } return; } @@ -220,7 +256,10 @@ void tcp_client_endpoint_impl::receive() { std::lock_guard<std::mutex> its_lock(socket_mutex_); its_recv_buffer = recv_buffer_; } - receive(its_recv_buffer, 0, 0); + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + strand_.dispatch([self, &its_recv_buffer](){ + self->receive(its_recv_buffer, 0, 0); + }); } void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer, @@ -277,32 +316,26 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer, } } -void tcp_client_endpoint_impl::send_queued() { - message_buffer_ptr_t its_buffer; - if(queue_.size()) { - its_buffer = queue_.front(); - } else { - return; - } +void tcp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) { const service_t its_service = VSOMEIP_BYTES_TO_WORD( - (*its_buffer)[VSOMEIP_SERVICE_POS_MIN], - (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]); + (*_buffer)[VSOMEIP_SERVICE_POS_MIN], + (*_buffer)[VSOMEIP_SERVICE_POS_MAX]); const method_t its_method = VSOMEIP_BYTES_TO_WORD( - (*its_buffer)[VSOMEIP_METHOD_POS_MIN], - (*its_buffer)[VSOMEIP_METHOD_POS_MAX]); + (*_buffer)[VSOMEIP_METHOD_POS_MIN], + (*_buffer)[VSOMEIP_METHOD_POS_MAX]); const client_t its_client = VSOMEIP_BYTES_TO_WORD( - (*its_buffer)[VSOMEIP_CLIENT_POS_MIN], - (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]); + (*_buffer)[VSOMEIP_CLIENT_POS_MIN], + (*_buffer)[VSOMEIP_CLIENT_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( - (*its_buffer)[VSOMEIP_SESSION_POS_MIN], - (*its_buffer)[VSOMEIP_SESSION_POS_MAX]); + (*_buffer)[VSOMEIP_SESSION_POS_MIN], + (*_buffer)[VSOMEIP_SESSION_POS_MAX]); if (has_enabled_magic_cookies_) { const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); if (std::chrono::duration_cast<std::chrono::milliseconds>( now - last_cookie_sent_) > std::chrono::milliseconds(10000)) { - send_magic_cookie(its_buffer); + send_magic_cookie(_buffer); last_cookie_sent_ = now; } } @@ -312,9 +345,9 @@ void tcp_client_endpoint_impl::send_queued() { std::stringstream msg; msg << "tcei<" << remote_.address() << ":" << std::dec << remote_.port() << ">::sq: "; - for (std::size_t i = 0; i < its_buffer->size(); i++) + for (std::size_t i = 0; i < _buffer->size(); i++) msg << std::hex << std::setw(2) << std::setfill('0') - << (int)(*its_buffer)[i] << " "; + << (int)(*_buffer)[i] << " "; VSOMEIP_INFO << msg.str(); #endif { @@ -326,21 +359,23 @@ void tcp_client_endpoint_impl::send_queued() { } boost::asio::async_write( *socket_, - boost::asio::buffer(*its_buffer), - std::bind(&tcp_client_endpoint_impl::write_completion_condition, - std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()), - std::placeholders::_1, - std::placeholders::_2, - its_buffer->size(), - its_service, its_method, its_client, its_session, - std::chrono::steady_clock::now()), + boost::asio::buffer(*_buffer), std::bind( + &tcp_client_endpoint_impl::write_completion_condition, + std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()), + std::placeholders::_1, + std::placeholders::_2, + _buffer->size(), + its_service, its_method, its_client, its_session, + std::chrono::steady_clock::now()), + strand_.wrap( + std::bind( &tcp_client_endpoint_base_impl::send_cbk, shared_from_this(), std::placeholders::_1, std::placeholders::_2, - its_buffer - ) + _buffer + )) ); } } @@ -675,7 +710,10 @@ void tcp_client_endpoint_impl::receive_cbk( } } its_lock.unlock(); - receive(_recv_buffer, _recv_buffer_size, its_missing_capacity); + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){ + self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity); + }); } else { VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: " << _error.message() << "(" << std::dec << _error.value() @@ -700,7 +738,10 @@ void tcp_client_endpoint_impl::receive_cbk( } } else { its_lock.unlock(); - receive(_recv_buffer, _recv_buffer_size, its_missing_capacity); + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){ + self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity); + }); } } } @@ -838,7 +879,13 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, if (queue_.size() > 0) { queue_size_ -= queue_.front()->size(); queue_.pop_front(); - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) { + auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()); + strand_.dispatch( + [self, its_buffer]() { self->send_queued(its_buffer);} + ); + } } } else if (_error == boost::system::errc::destination_address_required) { VSOMEIP_WARNING << "tce::send_cbk received error: " << _error.message() |