diff options
Diffstat (limited to 'implementation/endpoints/src/udp_client_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/udp_client_endpoint_impl.cpp | 263 |
1 files changed, 221 insertions, 42 deletions
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index dc7a7bf..3b9a212 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -14,6 +14,8 @@ #include "../../routing/include/routing_host.hpp" #include "../include/udp_client_endpoint_impl.hpp" #include "../../utility/include/utility.hpp" +#include "../../utility/include/byteorder.hpp" + namespace vsomeip_v3 { @@ -61,27 +63,50 @@ void udp_client_endpoint_impl::connect() { socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error); if (its_error) { VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't enable " - << "SO_REUSEADDR: " << its_error.message() << " remote:" - << get_address_port_remote(); + << "SO_REUSEADDR: " << its_error.message() + << " local port:" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); } + socket_->set_option(boost::asio::socket_base::receive_buffer_size( udp_receive_buffer_size_), its_error); if (its_error) { VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't set " - << "SO_RCVBUF: " << its_error.message() << " to: " - << std::dec << udp_receive_buffer_size_ << " remote:" - << get_address_port_remote(); - } else { - boost::asio::socket_base::receive_buffer_size its_option; - socket_->get_option(its_option, its_error); - if (its_error) { - VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get " - << "SO_RCVBUF: " << its_error.message() << " remote:" - << get_address_port_remote(); - } else { - VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: " - << std::dec << its_option.value(); + << "SO_RCVBUF: " << its_error.message() + << " to: " << std::dec << udp_receive_buffer_size_ + << " local port:" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } + + boost::asio::socket_base::receive_buffer_size its_option; + socket_->get_option(its_option, its_error); + #ifdef __linux__ + // If regular setting of the buffer size did not work, try to force + // (requires CAP_NET_ADMIN to be successful) + if (its_option.value() < 0 + || its_option.value() < udp_receive_buffer_size_) { + its_error.assign(setsockopt(socket_->native_handle(), + SOL_SOCKET, SO_RCVBUFFORCE, + &udp_receive_buffer_size_, sizeof(udp_receive_buffer_size_)), + boost::system::generic_category()); + if (!its_error) { + VSOMEIP_INFO << "udp_client_endpoint_impl::connect: " + << "SO_RCVBUFFORCE successful!"; } + socket_->get_option(its_option, its_error); + } + #endif + if (its_error) { + VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get " + << "SO_RCVBUF: " << its_error.message() + << " local port:" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } else { + VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: " + << std::dec << its_option.value() + << " (" << udp_receive_buffer_size_ << ")" + << " local port:" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); } #ifndef _WIN32 @@ -89,26 +114,53 @@ void udp_client_endpoint_impl::connect() { std::string its_device(configuration_->get_device()); if (its_device != "") { if (setsockopt(socket_->native_handle(), - SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) { + SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) { VSOMEIP_WARNING << "UDP Client: Could not bind to device \"" << its_device << "\""; } } #endif - // Bind address and, optionally, port. - boost::system::error_code its_bind_error; - socket_->bind(local_, its_bind_error); - if(its_bind_error) { - VSOMEIP_WARNING << "udp_client_endpoint::connect: " - "Error binding socket: " << its_bind_error.message() - << " remote:" << get_address_port_remote(); - try { - // don't connect on bind error to avoid using a random port - strand_.post(std::bind(&client_endpoint_impl::connect_cbk, - shared_from_this(), its_bind_error)); - } catch (const std::exception &e) { - VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: " - << e.what() << " remote:" << get_address_port_remote(); + // In case a client endpoint port was configured, + // bind to it before connecting + if (local_.port() != ILLEGAL_PORT) { + boost::system::error_code its_bind_error; + socket_->bind(local_, its_bind_error); + if(its_bind_error) { + VSOMEIP_WARNING << "udp_client_endpoint::connect: " + "Error binding socket: " << its_bind_error.message() + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + + std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock(); + if (its_host) { + // set new client port depending on service / instance / remote port + if (!its_host->on_bind_error(shared_from_this(), remote_port_)) { + VSOMEIP_WARNING << "udp_client_endpoint::connect: " + "Failed to set new local port for uce: " + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } else { + local_.port(local_port_); + VSOMEIP_INFO << "udp_client_endpoint::connect: " + "Using new new local port for uce: " + << " local: " << local_.address().to_string() + << ":" << std::dec << local_.port() + << " remote:" << get_address_port_remote(); + } + } + + + try { + // don't connect on bind error to avoid using a random port + strand_.post(std::bind(&client_endpoint_impl::connect_cbk, + shared_from_this(), its_bind_error)); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: " + << e.what() << " remote:" << get_address_port_remote(); + } + return; } return; } @@ -158,32 +210,26 @@ void udp_client_endpoint_impl::restart(bool _force) { start_connect_timer(); } -void udp_client_endpoint_impl::send_queued() { - message_buffer_ptr_t its_buffer; - if(queue_.size()) { - its_buffer = queue_.front(); - } else { - return; - } +void udp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) { #if 0 std::stringstream msg; msg << "ucei<" << remote_.address() << ":" << std::dec << remote_.port() << ">::sq: "; - for (std::size_t i = 0; i < its_buffer->size(); i++) + for (std::size_t i = 0; i < _buffer->size(); i++) msg << std::hex << std::setw(2) << std::setfill('0') - << (int)(*its_buffer)[i] << " "; + << (int)(*_buffer)[i] << " "; VSOMEIP_INFO << msg.str(); #endif { std::lock_guard<std::mutex> its_lock(socket_mutex_); socket_->async_send( - boost::asio::buffer(*its_buffer), + boost::asio::buffer(*_buffer), std::bind( &udp_client_endpoint_base_impl::send_cbk, shared_from_this(), std::placeholders::_1, std::placeholders::_2, - its_buffer + _buffer ) ); } @@ -354,7 +400,12 @@ void udp_client_endpoint_impl::receive_cbk( receive(); } else { if (_error == boost::asio::error::connection_refused) { - shutdown_and_close_socket(false); + VSOMEIP_WARNING << "uce::receive_cbk: local: " << get_address_port_local() + << " remote: " << get_address_port_remote() + << " error: " << _error.message(); + std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock(); + its_ep_host->on_disconnect(shared_from_this()); + restart(false); } else { receive(); } @@ -415,6 +466,134 @@ std::string udp_client_endpoint_impl::get_remote_information() const { + std::to_string(remote_.port()); } +void udp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, std::size_t _bytes, + const message_buffer_ptr_t &_sent_msg) { + (void)_bytes; + if (!_error) { + std::lock_guard<std::mutex> its_lock(mutex_); + if (queue_.size() > 0) { + queue_size_ -= queue_.front()->size(); + queue_.pop_front(); + auto its_buffer = get_front(); + if (its_buffer) + send_queued(its_buffer); + } + } else if (_error == boost::asio::error::broken_pipe) { + state_ = cei_state_e::CLOSED; + bool stopping(false); + { + std::lock_guard<std::mutex> its_lock(mutex_); + stopping = sending_blocked_; + if (stopping) { + queue_.clear(); + queue_size_ = 0; + } else { + service_t its_service(0); + method_t its_method(0); + client_t its_client(0); + session_t its_session(0); + if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) { + its_service = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN], + (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]); + its_method = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_METHOD_POS_MIN], + (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]); + its_client = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN], + (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]); + its_session = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_SESSION_POS_MIN], + (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]); + } + VSOMEIP_WARNING << "uce::send_cbk received error: " + << _error.message() << " (" << std::dec + << _error.value() << ") " << get_remote_information() + << " " << std::dec << queue_.size() + << " " << std::dec << queue_size_ << " (" + << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "." + << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"; + } + } + if (!stopping) { + print_status(); + } + was_not_connected_ = true; + shutdown_and_close_socket(true); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); + } else if (_error == boost::asio::error::not_connected + || _error == boost::asio::error::bad_descriptor + || _error == boost::asio::error::no_permission) { + state_ = cei_state_e::CLOSED; + if (_error == boost::asio::error::no_permission) { + VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message() + << " (" << std::dec << _error.value() << ") " + << get_remote_information(); + std::lock_guard<std::mutex> its_lock(mutex_); + queue_.clear(); + queue_size_ = 0; + } + was_not_connected_ = true; + shutdown_and_close_socket(true); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); + } else if (_error == boost::asio::error::operation_aborted) { + VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message(); + // endpoint was stopped + sending_blocked_ = true; + shutdown_and_close_socket(false); + } else if (_error == boost::system::errc::destination_address_required) { + VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message() + << " (" << std::dec << _error.value() << ") " + << get_remote_information(); + was_not_connected_ = true; + } else { + if (state_ == cei_state_e::CONNECTING) { + VSOMEIP_WARNING << "uce::send_cbk endpoint is already restarting:" + << get_remote_information(); + } else { + state_ = cei_state_e::CONNECTING; + shutdown_and_close_socket(false); + std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock(); + if (its_host) { + its_host->on_disconnect(shared_from_this()); + } + restart(true); + } + service_t its_service(0); + method_t its_method(0); + client_t its_client(0); + session_t its_session(0); + if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) { + its_service = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN], + (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]); + its_method = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_METHOD_POS_MIN], + (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]); + its_client = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN], + (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]); + its_session = VSOMEIP_BYTES_TO_WORD( + (*_sent_msg)[VSOMEIP_SESSION_POS_MIN], + (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]); + } + VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message() + << " (" << std::dec << _error.value() << ") " + << get_remote_information() << " " + << " " << std::dec << queue_.size() + << " " << std::dec << queue_size_ << " (" + << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << its_service << "." + << std::hex << std::setw(4) << std::setfill('0') << its_method << "." + << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"; + print_status(); + } +} + bool udp_client_endpoint_impl::tp_segmentation_enabled(service_t _service, method_t _method) const { return configuration_->tp_segment_messages_client_to_service(_service, |