summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/udp_client_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/udp_client_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/udp_client_endpoint_impl.cpp263
1 files changed, 221 insertions, 42 deletions
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
index dc7a7bf..3b9a212 100644
--- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
@@ -14,6 +14,8 @@
#include "../../routing/include/routing_host.hpp"
#include "../include/udp_client_endpoint_impl.hpp"
#include "../../utility/include/utility.hpp"
+#include "../../utility/include/byteorder.hpp"
+
namespace vsomeip_v3 {
@@ -61,27 +63,50 @@ void udp_client_endpoint_impl::connect() {
socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't enable "
- << "SO_REUSEADDR: " << its_error.message() << " remote:"
- << get_address_port_remote();
+ << "SO_REUSEADDR: " << its_error.message()
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
}
+
socket_->set_option(boost::asio::socket_base::receive_buffer_size(
udp_receive_buffer_size_), its_error);
if (its_error) {
VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't set "
- << "SO_RCVBUF: " << its_error.message() << " to: "
- << std::dec << udp_receive_buffer_size_ << " remote:"
- << get_address_port_remote();
- } else {
- boost::asio::socket_base::receive_buffer_size its_option;
- socket_->get_option(its_option, its_error);
- if (its_error) {
- VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get "
- << "SO_RCVBUF: " << its_error.message() << " remote:"
- << get_address_port_remote();
- } else {
- VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: "
- << std::dec << its_option.value();
+ << "SO_RCVBUF: " << its_error.message()
+ << " to: " << std::dec << udp_receive_buffer_size_
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+
+ boost::asio::socket_base::receive_buffer_size its_option;
+ socket_->get_option(its_option, its_error);
+ #ifdef __linux__
+ // If regular setting of the buffer size did not work, try to force
+ // (requires CAP_NET_ADMIN to be successful)
+ if (its_option.value() < 0
+ || its_option.value() < udp_receive_buffer_size_) {
+ its_error.assign(setsockopt(socket_->native_handle(),
+ SOL_SOCKET, SO_RCVBUFFORCE,
+ &udp_receive_buffer_size_, sizeof(udp_receive_buffer_size_)),
+ boost::system::generic_category());
+ if (!its_error) {
+ VSOMEIP_INFO << "udp_client_endpoint_impl::connect: "
+ << "SO_RCVBUFFORCE successful!";
}
+ socket_->get_option(its_option, its_error);
+ }
+ #endif
+ if (its_error) {
+ VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get "
+ << "SO_RCVBUF: " << its_error.message()
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ } else {
+ VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: "
+ << std::dec << its_option.value()
+ << " (" << udp_receive_buffer_size_ << ")"
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
}
#ifndef _WIN32
@@ -89,26 +114,53 @@ void udp_client_endpoint_impl::connect() {
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(socket_->native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "UDP Client: Could not bind to device \"" << its_device << "\"";
}
}
#endif
- // Bind address and, optionally, port.
- boost::system::error_code its_bind_error;
- socket_->bind(local_, its_bind_error);
- if(its_bind_error) {
- VSOMEIP_WARNING << "udp_client_endpoint::connect: "
- "Error binding socket: " << its_bind_error.message()
- << " remote:" << get_address_port_remote();
- try {
- // don't connect on bind error to avoid using a random port
- strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
- shared_from_this(), its_bind_error));
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: "
- << e.what() << " remote:" << get_address_port_remote();
+ // In case a client endpoint port was configured,
+ // bind to it before connecting
+ if (local_.port() != ILLEGAL_PORT) {
+ boost::system::error_code its_bind_error;
+ socket_->bind(local_, its_bind_error);
+ if(its_bind_error) {
+ VSOMEIP_WARNING << "udp_client_endpoint::connect: "
+ "Error binding socket: " << its_bind_error.message()
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+
+ std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
+ if (its_host) {
+ // set new client port depending on service / instance / remote port
+ if (!its_host->on_bind_error(shared_from_this(), remote_port_)) {
+ VSOMEIP_WARNING << "udp_client_endpoint::connect: "
+ "Failed to set new local port for uce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ } else {
+ local_.port(local_port_);
+ VSOMEIP_INFO << "udp_client_endpoint::connect: "
+ "Using new new local port for uce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+ }
+
+
+ try {
+ // don't connect on bind error to avoid using a random port
+ strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
+ shared_from_this(), its_bind_error));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: "
+ << e.what() << " remote:" << get_address_port_remote();
+ }
+ return;
}
return;
}
@@ -158,32 +210,26 @@ void udp_client_endpoint_impl::restart(bool _force) {
start_connect_timer();
}
-void udp_client_endpoint_impl::send_queued() {
- message_buffer_ptr_t its_buffer;
- if(queue_.size()) {
- its_buffer = queue_.front();
- } else {
- return;
- }
+void udp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
#if 0
std::stringstream msg;
msg << "ucei<" << remote_.address() << ":"
<< std::dec << remote_.port() << ">::sq: ";
- for (std::size_t i = 0; i < its_buffer->size(); i++)
+ for (std::size_t i = 0; i < _buffer->size(); i++)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)(*its_buffer)[i] << " ";
+ << (int)(*_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
socket_->async_send(
- boost::asio::buffer(*its_buffer),
+ boost::asio::buffer(*_buffer),
std::bind(
&udp_client_endpoint_base_impl::send_cbk,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2,
- its_buffer
+ _buffer
)
);
}
@@ -354,7 +400,12 @@ void udp_client_endpoint_impl::receive_cbk(
receive();
} else {
if (_error == boost::asio::error::connection_refused) {
- shutdown_and_close_socket(false);
+ VSOMEIP_WARNING << "uce::receive_cbk: local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote()
+ << " error: " << _error.message();
+ std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
+ its_ep_host->on_disconnect(shared_from_this());
+ restart(false);
} else {
receive();
}
@@ -415,6 +466,134 @@ std::string udp_client_endpoint_impl::get_remote_information() const {
+ std::to_string(remote_.port());
}
+void udp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, std::size_t _bytes,
+ const message_buffer_ptr_t &_sent_msg) {
+ (void)_bytes;
+ if (!_error) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ if (queue_.size() > 0) {
+ queue_size_ -= queue_.front()->size();
+ queue_.pop_front();
+ auto its_buffer = get_front();
+ if (its_buffer)
+ send_queued(its_buffer);
+ }
+ } else if (_error == boost::asio::error::broken_pipe) {
+ state_ = cei_state_e::CLOSED;
+ bool stopping(false);
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ stopping = sending_blocked_;
+ if (stopping) {
+ queue_.clear();
+ queue_size_ = 0;
+ } else {
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_WARNING << "uce::send_cbk received error: "
+ << _error.message() << " (" << std::dec
+ << _error.value() << ") " << get_remote_information()
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ }
+ }
+ if (!stopping) {
+ print_status();
+ }
+ was_not_connected_ = true;
+ shutdown_and_close_socket(true);
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
+ } else if (_error == boost::asio::error::not_connected
+ || _error == boost::asio::error::bad_descriptor
+ || _error == boost::asio::error::no_permission) {
+ state_ = cei_state_e::CLOSED;
+ if (_error == boost::asio::error::no_permission) {
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information();
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.clear();
+ queue_size_ = 0;
+ }
+ was_not_connected_ = true;
+ shutdown_and_close_socket(true);
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
+ } else if (_error == boost::asio::error::operation_aborted) {
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message();
+ // endpoint was stopped
+ sending_blocked_ = true;
+ shutdown_and_close_socket(false);
+ } else if (_error == boost::system::errc::destination_address_required) {
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information();
+ was_not_connected_ = true;
+ } else {
+ if (state_ == cei_state_e::CONNECTING) {
+ VSOMEIP_WARNING << "uce::send_cbk endpoint is already restarting:"
+ << get_remote_information();
+ } else {
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket(false);
+ std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
+ if (its_host) {
+ its_host->on_disconnect(shared_from_this());
+ }
+ restart(true);
+ }
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information() << " "
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ print_status();
+ }
+}
+
bool udp_client_endpoint_impl::tp_segmentation_enabled(service_t _service,
method_t _method) const {
return configuration_->tp_segment_messages_client_to_service(_service,