summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src')
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp54
-rw-r--r--implementation/endpoints/src/endpoint_manager_base.cpp9
-rw-r--r--implementation/endpoints/src/endpoint_manager_impl.cpp190
-rw-r--r--implementation/endpoints/src/local_client_endpoint_impl.cpp13
-rw-r--r--implementation/endpoints/src/local_server_endpoint_impl.cpp4
-rw-r--r--implementation/endpoints/src/server_endpoint_impl.cpp39
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp247
-rw-r--r--implementation/endpoints/src/tcp_server_endpoint_impl.cpp51
-rw-r--r--implementation/endpoints/src/udp_client_endpoint_impl.cpp263
-rw-r--r--implementation/endpoints/src/udp_server_endpoint_impl.cpp121
-rw-r--r--implementation/endpoints/src/virtual_server_endpoint_impl.cpp4
11 files changed, 721 insertions, 274 deletions
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 9b31cc1..66b3138 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -120,9 +120,24 @@ void client_endpoint_impl<Protocol>::stop() {
connect_timer_.cancel(ec);
}
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
- shutdown_and_close_socket(false);
+
+ // bind to strand as stop() might be called from different thread
+ strand_.dispatch(std::bind(&client_endpoint_impl::shutdown_and_close_socket,
+ this->shared_from_this(),
+ false)
+ );
+}
+
+template<typename Protocol>
+message_buffer_ptr_t client_endpoint_impl<Protocol>::get_front() {
+ message_buffer_ptr_t its_buffer;
+ if (queue_.size())
+ its_buffer = queue_.front();
+
+ return (its_buffer);
}
+
template<typename Protocol>
bool client_endpoint_impl<Protocol>::send_to(
const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
@@ -317,7 +332,11 @@ void client_endpoint_impl<Protocol>::send_segments(
// respect minimal debounce time
wait_until_debounce_time_reached();
// ignore retention time and send immediately as the train is full anyway
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
+ this->shared_from_this(), its_buffer));
+ }
}
train_.last_departure_ = std::chrono::steady_clock::now();
}
@@ -397,8 +416,10 @@ void client_endpoint_impl<Protocol>::connect_cbk(
if (was_not_connected_) {
was_not_connected_ = false;
std::lock_guard<std::mutex> its_lock(mutex_);
- if (queue_.size() > 0) {
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
+ this->shared_from_this(), its_buffer));
VSOMEIP_WARNING << __func__ << ": resume sending to: "
<< get_remote_information();
}
@@ -415,7 +436,9 @@ template<typename Protocol>
void client_endpoint_impl<Protocol>::wait_connect_cbk(
boost::system::error_code const &_error) {
if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) {
- connect();
+ auto self = this->shared_from_this();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
}
}
@@ -429,7 +452,9 @@ void client_endpoint_impl<Protocol>::send_cbk(
if (queue_.size() > 0) {
queue_size_ -= queue_.front()->size();
queue_.pop_front();
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer)
+ send_queued(its_buffer);
}
} else if (_error == boost::asio::error::broken_pipe) {
state_ = cei_state_e::CLOSED;
@@ -475,7 +500,8 @@ void client_endpoint_impl<Protocol>::send_cbk(
}
was_not_connected_ = true;
shutdown_and_close_socket(true);
- connect();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
} else if (_error == boost::asio::error::not_connected
|| _error == boost::asio::error::bad_descriptor
|| _error == boost::asio::error::no_permission) {
@@ -490,7 +516,8 @@ void client_endpoint_impl<Protocol>::send_cbk(
}
was_not_connected_ = true;
shutdown_and_close_socket(true);
- connect();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
} else if (_error == boost::asio::error::operation_aborted) {
VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message();
// endpoint was stopped
@@ -585,6 +612,11 @@ std::uint16_t client_endpoint_impl<Protocol>::get_local_port() const {
}
template<typename Protocol>
+void client_endpoint_impl<Protocol>::set_local_port(uint16_t _port) {
+ local_port_ = _port;
+}
+
+template<typename Protocol>
void client_endpoint_impl<Protocol>::start_connect_timer() {
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
connect_timer_.expires_from_now(
@@ -665,7 +697,11 @@ void client_endpoint_impl<Protocol>::queue_train(bool _queue_size_zero_on_entry)
queue_size_ += train_.buffer_->size();
train_.buffer_ = std::make_shared<message_buffer_t>();
if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
+ this->shared_from_this(), its_buffer));
+ }
}
}
diff --git a/implementation/endpoints/src/endpoint_manager_base.cpp b/implementation/endpoints/src/endpoint_manager_base.cpp
index a79ae48..cfddc87 100644
--- a/implementation/endpoints/src/endpoint_manager_base.cpp
+++ b/implementation/endpoints/src/endpoint_manager_base.cpp
@@ -123,6 +123,13 @@ void endpoint_manager_base::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
rm_->on_disconnect(_endpoint);
}
+bool endpoint_manager_base::on_bind_error(std::shared_ptr<endpoint> _endpoint, uint16_t _remote_port) {
+ (void)_endpoint;
+ (void)_remote_port;
+ return true;
+ // intentionally left blank
+}
+
void endpoint_manager_base::on_error(
const byte_t *_data, length_t _length, endpoint* const _receiver,
const boost::asio::ip::address &_remote_address,
@@ -158,7 +165,7 @@ endpoint_manager_base::log_client_states() const {
{
std::lock_guard<std::mutex> its_lock(local_endpoint_mutex_);
- for (const auto& e : local_endpoints_) {
+ for (const auto &e : local_endpoints_) {
size_t its_queue_size = e.second->get_queue_size();
if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE) {
its_client_queue_sizes.push_back(
diff --git a/implementation/endpoints/src/endpoint_manager_impl.cpp b/implementation/endpoints/src/endpoint_manager_impl.cpp
index a82a173..dbb2107 100644
--- a/implementation/endpoints/src/endpoint_manager_impl.cpp
+++ b/implementation/endpoints/src/endpoint_manager_impl.cpp
@@ -328,8 +328,10 @@ bool endpoint_manager_impl::remove_server_endpoint(uint16_t _port, bool _reliabl
return ret;
}
-void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_t _instance,
- bool _reliable) {
+void
+endpoint_manager_impl::clear_client_endpoints(
+ service_t _service, instance_t _instance, bool _reliable) {
+
std::shared_ptr<endpoint> endpoint_to_delete;
bool other_services_reachable_through_endpoint(false);
{
@@ -371,8 +373,12 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_
}
if (!other_services_reachable_through_endpoint) {
- std::uint16_t its_port(0);
+ partition_id_t its_partition;
boost::asio::ip::address its_address;
+ std::uint16_t its_port(0);
+
+ its_partition = configuration_->get_partition_id(_service, _instance);
+
if (_reliable) {
std::shared_ptr<tcp_client_endpoint_impl> ep =
std::dynamic_pointer_cast<tcp_client_endpoint_impl>(endpoint_to_delete);
@@ -392,15 +398,21 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_
if (found_ip != client_endpoints_by_ip_.end()) {
const auto found_port = found_ip->second.find(its_port);
if (found_port != found_ip->second.end()) {
- const auto found_reliable = found_port->second.find(_reliable);
+ auto found_reliable = found_port->second.find(_reliable);
if (found_reliable != found_port->second.end()) {
- if (found_reliable->second == endpoint_to_delete) {
- found_port->second.erase(_reliable);
- // delete if necessary
- if (!found_port->second.size()) {
- found_ip->second.erase(found_port);
- if (!found_ip->second.size()) {
- client_endpoints_by_ip_.erase(found_ip);
+ const auto found_partition = found_reliable->second.find(its_partition);
+ if (found_partition != found_reliable->second.end()) {
+ if (found_partition->second == endpoint_to_delete) {
+ found_reliable->second.erase(its_partition);
+ // delete if necessary
+ if (0 == found_reliable->second.size()) {
+ found_port->second.erase(_reliable);
+ if (0 == found_port->second.size()) {
+ found_ip->second.erase(found_port);
+ if (0 == found_ip->second.size()) {
+ client_endpoints_by_ip_.erase(found_ip);
+ }
+ }
}
}
}
@@ -455,8 +467,14 @@ void endpoint_manager_impl::find_or_create_multicast_endpoint(
std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
service_instances_multicast_[_service][_sender] = _instance;
}
- dynamic_cast<udp_server_endpoint_impl*>(its_endpoint.get())->join_unlocked(
- _address.to_string());
+
+ auto its_udp_server_endpoint
+ = std::dynamic_pointer_cast<udp_server_endpoint_impl>(its_endpoint);
+ if (_port != configuration_->get_sd_port()) {
+ its_udp_server_endpoint->join(_address.to_string());
+ } else {
+ its_udp_server_endpoint->join_unlocked(_address.to_string());
+ }
} else {
VSOMEIP_ERROR <<"Could not find/create multicast endpoint!";
}
@@ -540,11 +558,13 @@ void endpoint_manager_impl::print_status() const {
VSOMEIP_INFO << "status start remote client endpoints:";
std::uint32_t num_remote_client_endpoints(0);
// normal endpoints
- for (const auto &a : client_endpoints_by_ip) {
- for (const auto& p : a.second) {
- for (const auto& ru : p.second) {
- ru.second->print_status();
- num_remote_client_endpoints++;
+ for (const auto &its_address : client_endpoints_by_ip) {
+ for (const auto &its_port : its_address.second) {
+ for (const auto &its_reliability : its_port.second) {
+ for (const auto &its_partition : its_reliability.second) {
+ its_partition.second->print_status();
+ num_remote_client_endpoints++;
+ }
}
}
}
@@ -800,6 +820,37 @@ void endpoint_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
}
}
+bool endpoint_manager_impl::on_bind_error(std::shared_ptr<endpoint> _endpoint, std::uint16_t _remote_port) {
+ std::lock_guard<std::recursive_mutex> its_ep_lock(endpoint_mutex_);
+ for (auto &its_service : remote_services_) {
+ for (auto &its_instance : its_service.second) {
+ const bool is_reliable = _endpoint->is_reliable();
+ auto found_endpoint = its_instance.second.find(is_reliable);
+ if (found_endpoint != its_instance.second.end()) {
+ if (found_endpoint->second == _endpoint) {
+ // get a new client port using service / instance / remote port
+ uint16_t its_old_local_port = _endpoint->get_local_port();
+ uint16_t its_new_local_port(ILLEGAL_PORT);
+
+ std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_);
+ if (configuration_->get_client_port(its_service.first,
+ its_instance.first,
+ _remote_port,
+ is_reliable,
+ used_client_ports_,
+ its_new_local_port)) {
+ _endpoint->set_local_port(its_new_local_port);
+ its_lock.unlock();
+ release_port(its_old_local_port, _endpoint->is_reliable());
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
+
void endpoint_manager_impl::on_error(
const byte_t *_data, length_t _length, endpoint* const _receiver,
const boost::asio::ip::address &_remote_address,
@@ -820,8 +871,10 @@ void endpoint_manager_impl::release_port(uint16_t _port, bool _reliable) {
used_client_ports_[_reliable].erase(_port);
}
-std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client(
+std::shared_ptr<endpoint>
+endpoint_manager_impl::find_remote_client(
service_t _service, instance_t _instance, bool _reliable) {
+
std::shared_ptr<endpoint> its_endpoint;
auto found_service = remote_services_.find(_service);
if (found_service != remote_services_.end()) {
@@ -837,35 +890,46 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client(
return its_endpoint;
}
- // If another service is hosted on the same server_endpoint
- // reuse the existing client_endpoint.
+ // Endpoint did not yet exist. Get the partition id to check
+ // whether the client endpoint for the partition does exist.
+ partition_id_t its_partition_id
+ = configuration_->get_partition_id(_service, _instance);
+
+ // If another service within the same partition is hosted on the
+ // same server_endpoint reuse the existing client_endpoint.
auto found_service_info = remote_service_info_.find(_service);
- if(found_service_info != remote_service_info_.end()) {
+ if (found_service_info != remote_service_info_.end()) {
auto found_instance = found_service_info->second.find(_instance);
- if(found_instance != found_service_info->second.end()) {
+ if (found_instance != found_service_info->second.end()) {
auto found_reliable = found_instance->second.find(_reliable);
- if(found_reliable != found_instance->second.end()) {
- std::shared_ptr<endpoint_definition> its_ep_def =
- found_reliable->second;
+ if (found_reliable != found_instance->second.end()) {
+ std::shared_ptr<endpoint_definition> its_ep_def
+ = found_reliable->second;
auto found_address = client_endpoints_by_ip_.find(
its_ep_def->get_address());
- if(found_address != client_endpoints_by_ip_.end()) {
+ if (found_address != client_endpoints_by_ip_.end()) {
auto found_port = found_address->second.find(
its_ep_def->get_remote_port());
- if(found_port != found_address->second.end()) {
- auto found_reliable2 = found_port->second.find(
- _reliable);
- if(found_reliable2 != found_port->second.end()) {
- its_endpoint = found_reliable2->second;
- // store the endpoint under this service/instance id
- // as well - needed for later cleanup
- remote_services_[_service][_instance][_reliable] =
- its_endpoint;
- service_instances_[_service][its_endpoint.get()] = _instance;
- // add endpoint to serviceinfo object
- auto found_service_info = rm_->find_service(_service,_instance);
- if (found_service_info) {
- found_service_info->set_endpoint(its_endpoint, _reliable);
+ if (found_port != found_address->second.end()) {
+ auto found_reliable2
+ = found_port->second.find(_reliable);
+ if (found_reliable2 != found_port->second.end()) {
+ auto found_partition
+ = found_reliable2->second.find(its_partition_id);
+ if (found_partition != found_reliable2->second.end()) {
+ its_endpoint = found_partition->second;
+
+ // store the endpoint under this service/instance id
+ // as well - needed for later cleanup
+ remote_services_[_service][_instance][_reliable]
+ = its_endpoint;
+ service_instances_[_service][its_endpoint.get()] = _instance;
+
+ // add endpoint to serviceinfo object
+ auto found_service_info = rm_->find_service(_service,_instance);
+ if (found_service_info) {
+ found_service_info->set_endpoint(its_endpoint, _reliable);
+ }
}
}
}
@@ -873,7 +937,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client(
}
}
}
- return its_endpoint;
+
+ return (its_endpoint);
}
std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
@@ -910,6 +975,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
}
if (its_endpoint) {
+ partition_id_t its_partition
+ = configuration_->get_partition_id(_service, _instance);
used_client_ports_[_reliable].insert(its_local_port);
its_lock.unlock();
service_instances_[_service][its_endpoint.get()] = _instance;
@@ -917,12 +984,19 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
client_endpoints_by_ip_[its_endpoint_def->get_address()]
[its_endpoint_def->get_port()]
- [_reliable] = its_endpoint;
+ [_reliable]
+ [its_partition]= its_endpoint;
// Set the basic route to the service in the service info
auto found_service_info = rm_->find_service(_service, _instance);
if (found_service_info) {
found_service_info->set_endpoint(its_endpoint, _reliable);
}
+ boost::system::error_code ec;
+ VSOMEIP_INFO << "endpoint_manager_impl::create_remote_client: "
+ << its_endpoint_def->get_address().to_string(ec)
+ << ":" << std::dec << its_endpoint_def->get_port()
+ << " reliable: " << _reliable
+ << " using local port: " << std::dec << its_local_port;
}
}
}
@@ -983,18 +1057,20 @@ endpoint_manager_impl::log_client_states() const {
its_client_endpoints = client_endpoints_by_ip_;
}
- for (const auto& its_address : its_client_endpoints) {
- for (const auto& its_port : its_address.second) {
- for (const auto& its_reliability : its_port.second) {
- size_t its_queue_size = its_reliability.second->get_queue_size();
- if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
- its_client_queue_sizes.push_back(
- std::make_pair(
- std::make_tuple(
- its_address.first,
- its_port.first,
- its_reliability.first),
- its_queue_size));
+ for (const auto &its_address : its_client_endpoints) {
+ for (const auto &its_port : its_address.second) {
+ for (const auto &its_reliability : its_port.second) {
+ for (const auto &its_partition : its_reliability.second) {
+ size_t its_queue_size = its_partition.second->get_queue_size();
+ if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
+ its_client_queue_sizes.push_back(
+ std::make_pair(
+ std::make_tuple(
+ its_address.first,
+ its_port.first,
+ its_reliability.first),
+ its_queue_size));
+ }
}
}
}
@@ -1040,8 +1116,8 @@ endpoint_manager_impl::log_server_states() const {
its_server_endpoints = server_endpoints_;
}
- for (const auto& its_port : its_server_endpoints) {
- for (const auto& its_reliability : its_port.second) {
+ for (const auto &its_port : its_server_endpoints) {
+ for (const auto &its_reliability : its_port.second) {
size_t its_queue_size = its_reliability.second->get_queue_size();
if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
its_client_queue_sizes.push_back(
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp
index 7e58e67..04c7787 100644
--- a/implementation/endpoints/src/local_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp
@@ -203,20 +203,13 @@ bool local_client_endpoint_impl::send(const uint8_t *_data, uint32_t _size) {
return ret;
}
-void local_client_endpoint_impl::send_queued() {
+void local_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
static const byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 };
static const byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 };
std::vector<boost::asio::const_buffer> bufs;
- message_buffer_ptr_t its_buffer;
- if(queue_.size()) {
- its_buffer = queue_.front();
- } else {
- return;
- }
-
bufs.push_back(boost::asio::buffer(its_start_tag));
- bufs.push_back(boost::asio::buffer(*its_buffer));
+ bufs.push_back(boost::asio::buffer(*_buffer));
bufs.push_back(boost::asio::buffer(its_end_tag));
{
@@ -231,7 +224,7 @@ void local_client_endpoint_impl::send_queued() {
>(shared_from_this()),
std::placeholders::_1,
std::placeholders::_2,
- its_buffer
+ _buffer
)
);
}
diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp
index 1d412e4..ebf913f 100644
--- a/implementation/endpoints/src/local_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp
@@ -946,6 +946,10 @@ std::uint16_t local_server_endpoint_impl::get_local_port() const {
return 0;
}
+void local_server_endpoint_impl::set_local_port(std::uint16_t _port) {
+ (void) _port;
+}
+
bool local_server_endpoint_impl::check_packetizer_space(
queue_iterator_type _queue_iterator, message_buffer_ptr_t* _packetizer,
std::uint32_t _size) {
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp
index cfb5ea1..ddf6b25 100644
--- a/implementation/endpoints/src/server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/server_endpoint_impl.cpp
@@ -185,38 +185,45 @@ template<typename Protocol>bool server_endpoint_impl<Protocol>::send(const uint8
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
const client_t its_client = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SESSION_POS_MIN], _data[VSOMEIP_SESSION_POS_MAX]);
- clients_mutex_.lock();
- auto found_client = clients_.find(its_client);
- if (found_client != clients_.end()) {
- auto found_session = found_client->second.find(its_session);
- if (found_session != found_client->second.end()) {
- its_target = found_session->second;
+ requests_mutex_.lock();
+ auto found_client = requests_.find(its_client);
+ if (found_client != requests_.end()) {
+ auto its_request = std::make_tuple(its_service, its_method, its_session);
+ auto found_request = found_client->second.find(its_request);
+ if (found_request != found_client->second.end()) {
+ its_target = found_request->second;
is_valid_target = true;
- found_client->second.erase(its_session);
+ found_client->second.erase(found_request);
} else {
- VSOMEIP_WARNING << "server_endpoint::send: session_id 0x"
- << std::hex << its_session
- << " not found for client 0x" << its_client;
- const method_t its_method =
- VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
- _data[VSOMEIP_METHOD_POS_MAX]);
+ VSOMEIP_WARNING << "server_endpoint::send: request ["
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_method << "/"
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_client << "."
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_session
+ << "] could not be found.";
if (its_service == VSOMEIP_SD_SERVICE
&& its_method == VSOMEIP_SD_METHOD) {
VSOMEIP_ERROR << "Clearing clients map as a request was "
"received on SD port";
- clients_.clear();
+ requests_.clear();
is_valid_target = get_default_target(its_service, its_target);
}
}
} else {
is_valid_target = get_default_target(its_service, its_target);
}
- clients_mutex_.unlock();
+ requests_mutex_.unlock();
if (is_valid_target) {
is_valid_target = send_intern(its_target, _data, _size);
@@ -757,7 +764,7 @@ size_t server_endpoint_impl<Protocol>::get_queue_size() const {
{
std::lock_guard<std::mutex> its_lock(mutex_);
- for (const auto& q : queues_) {
+ for (const auto &q : queues_) {
its_queue_size += q.second.second.size();
}
}
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index 3debcc7..f88d2a2 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -50,6 +50,7 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl(
tcp_restart_aborts_max_(configuration_->get_max_tcp_restart_aborts()),
tcp_connect_time_max_(configuration_->get_max_tcp_connect_time()),
aborted_restart_count_(0),
+ is_sending_(false),
sent_timer_(_io) {
is_supporting_magic_cookies_ = true;
@@ -67,64 +68,71 @@ bool tcp_client_endpoint_impl::is_local() const {
}
void tcp_client_endpoint_impl::start() {
- connect();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
}
void tcp_client_endpoint_impl::restart(bool _force) {
- if (!_force && state_ == cei_state_e::CONNECTING) {
- std::chrono::steady_clock::time_point its_current
- = std::chrono::steady_clock::now();
- long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
- its_current - connect_timepoint_).count();
- if (aborted_restart_count_ < tcp_restart_aborts_max_
- && its_connect_duration < tcp_connect_time_max_) {
- aborted_restart_count_++;
- return;
- } else {
- VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
- << tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
- << its_connect_duration;
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ auto restart_func = [self, _force] {
+ if (!_force && self->state_ == cei_state_e::CONNECTING) {
+ std::chrono::steady_clock::time_point its_current
+ = std::chrono::steady_clock::now();
+ long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+ its_current - self->connect_timepoint_).count();
+ if (self->aborted_restart_count_ < self->tcp_restart_aborts_max_
+ && its_connect_duration < self->tcp_connect_time_max_) {
+ self->aborted_restart_count_++;
+ return;
+ } else {
+ VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
+ << self->tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
+ << its_connect_duration;
+ }
}
- }
- state_ = cei_state_e::CONNECTING;
- std::string address_port_local;
- {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- address_port_local = get_address_port_local();
- shutdown_and_close_socket_unlocked(true);
- recv_buffer_ = std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0);
- }
- was_not_connected_ = true;
- reconnect_counter_ = 0;
- {
- std::lock_guard<std::mutex> its_lock(mutex_);
- for (const auto&m : queue_) {
- const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_SERVICE_POS_MIN],
- (*m)[VSOMEIP_SERVICE_POS_MAX]);
- const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_METHOD_POS_MIN],
- (*m)[VSOMEIP_METHOD_POS_MAX]);
- const client_t its_client = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_CLIENT_POS_MIN],
- (*m)[VSOMEIP_CLIENT_POS_MAX]);
- const session_t its_session = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_SESSION_POS_MIN],
- (*m)[VSOMEIP_SESSION_POS_MAX]);
- VSOMEIP_WARNING << "tce::restart: dropping message: "
- << "remote:" << get_address_port_remote() << " ("
- << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
- << " size: " << std::dec << m->size();
+ self->state_ = cei_state_e::CONNECTING;
+ std::string address_port_local;
+ {
+ std::lock_guard<std::mutex> its_lock(self->socket_mutex_);
+ address_port_local = self->get_address_port_local();
+ self->shutdown_and_close_socket_unlocked(true);
+ self->recv_buffer_ = std::make_shared<message_buffer_t>(self->recv_buffer_size_initial_, 0);
}
- queue_.clear();
- queue_size_ = 0;
- }
- VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
- << " remote: " << get_address_port_remote();
- start_connect_timer();
+ self->was_not_connected_ = true;
+ self->reconnect_counter_ = 0;
+ {
+ std::lock_guard<std::mutex> its_lock(self->mutex_);
+ for (const auto&m : self->queue_) {
+ const service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_SERVICE_POS_MIN],
+ (*m)[VSOMEIP_SERVICE_POS_MAX]);
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_METHOD_POS_MIN],
+ (*m)[VSOMEIP_METHOD_POS_MAX]);
+ const client_t its_client = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_CLIENT_POS_MIN],
+ (*m)[VSOMEIP_CLIENT_POS_MAX]);
+ const session_t its_session = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_SESSION_POS_MIN],
+ (*m)[VSOMEIP_SESSION_POS_MAX]);
+ VSOMEIP_WARNING << "tce::restart: dropping message: "
+ << "remote:" << self->get_address_port_remote() << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
+ << " size: " << std::dec << m->size();
+ }
+ self->queue_.clear();
+ self->queue_size_ = 0;
+ }
+ VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
+ << " remote: " << self->get_address_port_remote();
+ self->start_connect_timer();
+ };
+ // bind to strand_ to avoid socket closure if
+ // parallel socket operation is currently active
+ strand_.dispatch(restart_func);
}
void tcp_client_endpoint_impl::connect() {
@@ -169,26 +177,54 @@ void tcp_client_endpoint_impl::connect() {
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(socket_->native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "TCP Client: Could not bind to device \"" << its_device << "\"";
}
}
#endif
- // Bind address and, optionally, port.
- boost::system::error_code its_bind_error;
- socket_->bind(local_, its_bind_error);
- if(its_bind_error) {
- VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
- "Error binding socket: " << its_bind_error.message()
- << " remote:" << get_address_port_remote();
- try {
- // don't connect on bind error to avoid using a random port
- strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
- shared_from_this(), its_bind_error));
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
- << e.what() << " remote:" << get_address_port_remote();
+ // In case a client endpoint port was configured,
+ // bind to it before connecting
+ if (local_.port() != ILLEGAL_PORT) {
+ boost::system::error_code its_bind_error;
+ socket_->bind(local_, its_bind_error);
+ if(its_bind_error) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
+ "Error binding socket: " << its_bind_error.message()
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+
+ std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
+ if (its_host) {
+ // set new client port depending on service / instance / remote port
+ if (!its_host->on_bind_error(shared_from_this(), remote_port_)) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
+ "Failed to set new local port for tce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ } else {
+ local_.port(local_port_);
+ VSOMEIP_INFO << "tcp_client_endpoint::connect: "
+ "Using new new local port for tce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+ }
+ try {
+ // don't connect on bind error to avoid using a random port
+ strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
+ shared_from_this(), its_bind_error));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
+ << e.what()
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+ return;
}
return;
}
@@ -220,7 +256,10 @@ void tcp_client_endpoint_impl::receive() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
its_recv_buffer = recv_buffer_;
}
- receive(its_recv_buffer, 0, 0);
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch([self, &its_recv_buffer](){
+ self->receive(its_recv_buffer, 0, 0);
+ });
}
void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
@@ -277,32 +316,26 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
}
}
-void tcp_client_endpoint_impl::send_queued() {
- message_buffer_ptr_t its_buffer;
- if(queue_.size()) {
- its_buffer = queue_.front();
- } else {
- return;
- }
+void tcp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
- (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
+ (*_buffer)[VSOMEIP_SERVICE_POS_MIN],
+ (*_buffer)[VSOMEIP_SERVICE_POS_MAX]);
const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_METHOD_POS_MIN],
- (*its_buffer)[VSOMEIP_METHOD_POS_MAX]);
+ (*_buffer)[VSOMEIP_METHOD_POS_MIN],
+ (*_buffer)[VSOMEIP_METHOD_POS_MAX]);
const client_t its_client = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_CLIENT_POS_MIN],
- (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]);
+ (*_buffer)[VSOMEIP_CLIENT_POS_MIN],
+ (*_buffer)[VSOMEIP_CLIENT_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SESSION_POS_MIN],
- (*its_buffer)[VSOMEIP_SESSION_POS_MAX]);
+ (*_buffer)[VSOMEIP_SESSION_POS_MIN],
+ (*_buffer)[VSOMEIP_SESSION_POS_MAX]);
if (has_enabled_magic_cookies_) {
const std::chrono::steady_clock::time_point now =
std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
- send_magic_cookie(its_buffer);
+ send_magic_cookie(_buffer);
last_cookie_sent_ = now;
}
}
@@ -312,9 +345,9 @@ void tcp_client_endpoint_impl::send_queued() {
std::stringstream msg;
msg << "tcei<" << remote_.address() << ":"
<< std::dec << remote_.port() << ">::sq: ";
- for (std::size_t i = 0; i < its_buffer->size(); i++)
+ for (std::size_t i = 0; i < _buffer->size(); i++)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)(*its_buffer)[i] << " ";
+ << (int)(*_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
{
@@ -326,21 +359,23 @@ void tcp_client_endpoint_impl::send_queued() {
}
boost::asio::async_write(
*socket_,
- boost::asio::buffer(*its_buffer),
- std::bind(&tcp_client_endpoint_impl::write_completion_condition,
- std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
- std::placeholders::_1,
- std::placeholders::_2,
- its_buffer->size(),
- its_service, its_method, its_client, its_session,
- std::chrono::steady_clock::now()),
+ boost::asio::buffer(*_buffer),
std::bind(
+ &tcp_client_endpoint_impl::write_completion_condition,
+ std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ _buffer->size(),
+ its_service, its_method, its_client, its_session,
+ std::chrono::steady_clock::now()),
+ strand_.wrap(
+ std::bind(
&tcp_client_endpoint_base_impl::send_cbk,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2,
- its_buffer
- )
+ _buffer
+ ))
);
}
}
@@ -675,7 +710,10 @@ void tcp_client_endpoint_impl::receive_cbk(
}
}
its_lock.unlock();
- receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
+ self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ });
} else {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
<< _error.message() << "(" << std::dec << _error.value()
@@ -700,7 +738,10 @@ void tcp_client_endpoint_impl::receive_cbk(
}
} else {
its_lock.unlock();
- receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
+ self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ });
}
}
}
@@ -838,7 +879,13 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
if (queue_.size() > 0) {
queue_size_ -= queue_.front()->size();
queue_.pop_front();
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch(
+ [self, its_buffer]() { self->send_queued(its_buffer);}
+ );
+ }
}
} else if (_error == boost::system::errc::destination_address_required) {
VSOMEIP_WARNING << "tce::send_cbk received error: " << _error.message()
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
index 1cd2b5b..37db3f5 100644
--- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
@@ -50,7 +50,7 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl(
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(acceptor_.native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "TCP Server: Could not bind to device \"" << its_device << "\"";
}
}
@@ -152,6 +152,38 @@ void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iter
<< static_cast<std::uint16_t>(_queue_iterator->first.port())
<< " dropping outstanding messages (" << std::dec
<< _queue_iterator->second.second.size() << ").";
+
+ if (_queue_iterator->second.second.size()) {
+ std::set<service_t> its_services;
+
+ // check all outstanding messages of this connection
+ // whether stop handlers need to be called
+ for (const auto &its_buffer : _queue_iterator->second.second) {
+ if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) {
+ service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
+ (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
+ its_services.insert(its_service);
+ }
+ }
+
+ for (auto its_service : its_services) {
+ auto found_cbk = prepare_stop_handlers_.find(its_service);
+ if (found_cbk != prepare_stop_handlers_.end()) {
+ VSOMEIP_INFO << "Calling prepare stop handler "
+ << "for service: 0x"
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_service;
+ auto handler = found_cbk->second;
+ auto ptr = this->shared_from_this();
+ service_.post([ptr, handler, its_service](){
+ handler(ptr, its_service);
+ });
+ prepare_stop_handlers_.erase(found_cbk);
+ }
+ }
+ }
+
queues_.erase(_queue_iterator->first);
}
}
@@ -259,6 +291,10 @@ std::uint16_t tcp_server_endpoint_impl::get_local_port() const {
return local_port_;
}
+void tcp_server_endpoint_impl::set_local_port(std::uint16_t _port) {
+ (void)_port;
+}
+
bool tcp_server_endpoint_impl::is_reliable() const {
return true;
}
@@ -551,12 +587,19 @@ void tcp_server_endpoint_impl::connection::receive_cbk(
recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN],
recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MAX]);
if (its_client != MAGIC_COOKIE_CLIENT) {
+ const service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MIN],
+ recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MAX]);
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MIN],
+ recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN],
recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MAX]);
- its_server->clients_mutex_.lock();
- its_server->clients_[its_client][its_session] = remote_;
- its_server->clients_mutex_.unlock();
+
+ std::lock_guard<std::mutex> its_requests_guard(its_server->requests_mutex_);
+ its_server->requests_[its_client]
+ [std::make_tuple(its_service, its_method, its_session)] = remote_;
}
}
if (!magic_cookies_enabled_) {
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
index dc7a7bf..3b9a212 100644
--- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
@@ -14,6 +14,8 @@
#include "../../routing/include/routing_host.hpp"
#include "../include/udp_client_endpoint_impl.hpp"
#include "../../utility/include/utility.hpp"
+#include "../../utility/include/byteorder.hpp"
+
namespace vsomeip_v3 {
@@ -61,27 +63,50 @@ void udp_client_endpoint_impl::connect() {
socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't enable "
- << "SO_REUSEADDR: " << its_error.message() << " remote:"
- << get_address_port_remote();
+ << "SO_REUSEADDR: " << its_error.message()
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
}
+
socket_->set_option(boost::asio::socket_base::receive_buffer_size(
udp_receive_buffer_size_), its_error);
if (its_error) {
VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't set "
- << "SO_RCVBUF: " << its_error.message() << " to: "
- << std::dec << udp_receive_buffer_size_ << " remote:"
- << get_address_port_remote();
- } else {
- boost::asio::socket_base::receive_buffer_size its_option;
- socket_->get_option(its_option, its_error);
- if (its_error) {
- VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get "
- << "SO_RCVBUF: " << its_error.message() << " remote:"
- << get_address_port_remote();
- } else {
- VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: "
- << std::dec << its_option.value();
+ << "SO_RCVBUF: " << its_error.message()
+ << " to: " << std::dec << udp_receive_buffer_size_
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+
+ boost::asio::socket_base::receive_buffer_size its_option;
+ socket_->get_option(its_option, its_error);
+ #ifdef __linux__
+ // If regular setting of the buffer size did not work, try to force
+ // (requires CAP_NET_ADMIN to be successful)
+ if (its_option.value() < 0
+ || its_option.value() < udp_receive_buffer_size_) {
+ its_error.assign(setsockopt(socket_->native_handle(),
+ SOL_SOCKET, SO_RCVBUFFORCE,
+ &udp_receive_buffer_size_, sizeof(udp_receive_buffer_size_)),
+ boost::system::generic_category());
+ if (!its_error) {
+ VSOMEIP_INFO << "udp_client_endpoint_impl::connect: "
+ << "SO_RCVBUFFORCE successful!";
}
+ socket_->get_option(its_option, its_error);
+ }
+ #endif
+ if (its_error) {
+ VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get "
+ << "SO_RCVBUF: " << its_error.message()
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ } else {
+ VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: "
+ << std::dec << its_option.value()
+ << " (" << udp_receive_buffer_size_ << ")"
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
}
#ifndef _WIN32
@@ -89,26 +114,53 @@ void udp_client_endpoint_impl::connect() {
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(socket_->native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "UDP Client: Could not bind to device \"" << its_device << "\"";
}
}
#endif
- // Bind address and, optionally, port.
- boost::system::error_code its_bind_error;
- socket_->bind(local_, its_bind_error);
- if(its_bind_error) {
- VSOMEIP_WARNING << "udp_client_endpoint::connect: "
- "Error binding socket: " << its_bind_error.message()
- << " remote:" << get_address_port_remote();
- try {
- // don't connect on bind error to avoid using a random port
- strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
- shared_from_this(), its_bind_error));
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: "
- << e.what() << " remote:" << get_address_port_remote();
+ // In case a client endpoint port was configured,
+ // bind to it before connecting
+ if (local_.port() != ILLEGAL_PORT) {
+ boost::system::error_code its_bind_error;
+ socket_->bind(local_, its_bind_error);
+ if(its_bind_error) {
+ VSOMEIP_WARNING << "udp_client_endpoint::connect: "
+ "Error binding socket: " << its_bind_error.message()
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+
+ std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
+ if (its_host) {
+ // set new client port depending on service / instance / remote port
+ if (!its_host->on_bind_error(shared_from_this(), remote_port_)) {
+ VSOMEIP_WARNING << "udp_client_endpoint::connect: "
+ "Failed to set new local port for uce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ } else {
+ local_.port(local_port_);
+ VSOMEIP_INFO << "udp_client_endpoint::connect: "
+ "Using new new local port for uce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+ }
+
+
+ try {
+ // don't connect on bind error to avoid using a random port
+ strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
+ shared_from_this(), its_bind_error));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: "
+ << e.what() << " remote:" << get_address_port_remote();
+ }
+ return;
}
return;
}
@@ -158,32 +210,26 @@ void udp_client_endpoint_impl::restart(bool _force) {
start_connect_timer();
}
-void udp_client_endpoint_impl::send_queued() {
- message_buffer_ptr_t its_buffer;
- if(queue_.size()) {
- its_buffer = queue_.front();
- } else {
- return;
- }
+void udp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
#if 0
std::stringstream msg;
msg << "ucei<" << remote_.address() << ":"
<< std::dec << remote_.port() << ">::sq: ";
- for (std::size_t i = 0; i < its_buffer->size(); i++)
+ for (std::size_t i = 0; i < _buffer->size(); i++)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)(*its_buffer)[i] << " ";
+ << (int)(*_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
socket_->async_send(
- boost::asio::buffer(*its_buffer),
+ boost::asio::buffer(*_buffer),
std::bind(
&udp_client_endpoint_base_impl::send_cbk,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2,
- its_buffer
+ _buffer
)
);
}
@@ -354,7 +400,12 @@ void udp_client_endpoint_impl::receive_cbk(
receive();
} else {
if (_error == boost::asio::error::connection_refused) {
- shutdown_and_close_socket(false);
+ VSOMEIP_WARNING << "uce::receive_cbk: local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote()
+ << " error: " << _error.message();
+ std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
+ its_ep_host->on_disconnect(shared_from_this());
+ restart(false);
} else {
receive();
}
@@ -415,6 +466,134 @@ std::string udp_client_endpoint_impl::get_remote_information() const {
+ std::to_string(remote_.port());
}
+void udp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, std::size_t _bytes,
+ const message_buffer_ptr_t &_sent_msg) {
+ (void)_bytes;
+ if (!_error) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ if (queue_.size() > 0) {
+ queue_size_ -= queue_.front()->size();
+ queue_.pop_front();
+ auto its_buffer = get_front();
+ if (its_buffer)
+ send_queued(its_buffer);
+ }
+ } else if (_error == boost::asio::error::broken_pipe) {
+ state_ = cei_state_e::CLOSED;
+ bool stopping(false);
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ stopping = sending_blocked_;
+ if (stopping) {
+ queue_.clear();
+ queue_size_ = 0;
+ } else {
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_WARNING << "uce::send_cbk received error: "
+ << _error.message() << " (" << std::dec
+ << _error.value() << ") " << get_remote_information()
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ }
+ }
+ if (!stopping) {
+ print_status();
+ }
+ was_not_connected_ = true;
+ shutdown_and_close_socket(true);
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
+ } else if (_error == boost::asio::error::not_connected
+ || _error == boost::asio::error::bad_descriptor
+ || _error == boost::asio::error::no_permission) {
+ state_ = cei_state_e::CLOSED;
+ if (_error == boost::asio::error::no_permission) {
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information();
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.clear();
+ queue_size_ = 0;
+ }
+ was_not_connected_ = true;
+ shutdown_and_close_socket(true);
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
+ } else if (_error == boost::asio::error::operation_aborted) {
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message();
+ // endpoint was stopped
+ sending_blocked_ = true;
+ shutdown_and_close_socket(false);
+ } else if (_error == boost::system::errc::destination_address_required) {
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information();
+ was_not_connected_ = true;
+ } else {
+ if (state_ == cei_state_e::CONNECTING) {
+ VSOMEIP_WARNING << "uce::send_cbk endpoint is already restarting:"
+ << get_remote_information();
+ } else {
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket(false);
+ std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
+ if (its_host) {
+ its_host->on_disconnect(shared_from_this());
+ }
+ restart(true);
+ }
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information() << " "
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ print_status();
+ }
+}
+
bool udp_client_endpoint_impl::tp_segmentation_enabled(service_t _service,
method_t _method) const {
return configuration_->tp_segment_messages_client_to_service(_service,
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
index 7aadf3f..bd44b48 100644
--- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
@@ -55,7 +55,7 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(unicast_socket_.native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "UDP Server: Could not bind to device \"" << its_device << "\"";
}
}
@@ -79,28 +79,46 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
unicast_socket_.set_option(option, ec);
boost::asio::detail::throw_error(ec, "broadcast option");
- const std::uint32_t its_udp_recv_buffer_size =
+ const int its_udp_recv_buffer_size =
configuration_->get_udp_receive_buffer_size();
unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size(
its_udp_recv_buffer_size), ec);
-
if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set "
+ VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't set "
<< "SO_RCVBUF: " << ec.message() << " to: " << std::dec
<< its_udp_recv_buffer_size << " local port: " << std::dec
<< local_port_;
- } else {
- boost::asio::socket_base::receive_buffer_size its_option;
- unicast_socket_.get_option(its_option, ec);
- if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get "
- << "SO_RCVBUF: " << ec.message() << " local port:"
- << std::dec << local_port_;
- } else {
- VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF is: "
- << std::dec << its_option.value();
+ }
+
+ boost::asio::socket_base::receive_buffer_size its_option;
+ unicast_socket_.get_option(its_option, ec);
+#ifdef __linux__
+ // If regular setting of the buffer size did not work, try to force
+ // (requires CAP_NET_ADMIN to be successful)
+ if (its_option.value() < 0
+ || its_option.value() < its_udp_recv_buffer_size) {
+ ec.assign(setsockopt(unicast_socket_.native_handle(),
+ SOL_SOCKET, SO_RCVBUFFORCE,
+ &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)),
+ boost::system::generic_category());
+ if (!ec) {
+ VSOMEIP_INFO << "udp_server_endpoint_impl: "
+ << "SO_RCVBUFFORCE successful.";
}
+ unicast_socket_.get_option(its_option, ec);
}
+#endif
+ if (ec) {
+ VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get "
+ << "SO_RCVBUF: " << ec.message() << " local port:"
+ << std::dec << local_port_;
+ } else {
+ VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF is: "
+ << std::dec << its_option.value()
+ << " (" << its_udp_recv_buffer_size << ") local port:"
+ << std::dec << local_port_;
+ }
+
#ifdef _WIN32
const char* optval("0001");
@@ -345,29 +363,44 @@ void udp_server_endpoint_impl::join_unlocked(const std::string &_address) {
multicast_socket_->bind(*multicast_local_, ec);
boost::asio::detail::throw_error(ec, "bind multicast");
- const std::uint32_t its_udp_recv_buffer_size =
+ const int its_udp_recv_buffer_size =
configuration_->get_udp_receive_buffer_size();
-
multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size(
its_udp_recv_buffer_size), ec);
-
if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set "
+ VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't set "
<< "SO_RCVBUF: " << ec.message() << " to: " << std::dec
<< its_udp_recv_buffer_size << " local port: " << std::dec
<< local_port_;
- } else {
- boost::asio::socket_base::receive_buffer_size its_option;
- multicast_socket_->get_option(its_option, ec);
+ }
- if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get "
- << "SO_RCVBUF: " << ec.message() << " local port:"
- << std::dec << local_port_;
- } else {
- VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF (Multicast) is: "
- << std::dec << its_option.value();
+ boost::asio::socket_base::receive_buffer_size its_option;
+ multicast_socket_->get_option(its_option, ec);
+ #ifdef __linux__
+ // If regular setting of the buffer size did not work, try to force
+ // (requires CAP_NET_ADMIN to be successful)
+ if (its_option.value() < 0
+ || its_option.value() < its_udp_recv_buffer_size) {
+ ec.assign(setsockopt(multicast_socket_->native_handle(),
+ SOL_SOCKET, SO_RCVBUFFORCE,
+ &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)),
+ boost::system::generic_category());
+ if (!ec) {
+ VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: "
+ << "SO_RCVBUFFORCE: successful.";
}
+ multicast_socket_->get_option(its_option, ec);
+ }
+ #endif
+ if (ec) {
+ VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't get "
+ << "SO_RCVBUF: " << ec.message() << " local port:"
+ << std::dec << local_port_;
+ } else {
+ VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: SO_RCVBUF is: "
+ << std::dec << its_option.value()
+ << " (" << its_udp_recv_buffer_size << ") local port:"
+ << std::dec << local_port_;
}
#ifdef _WIN32
@@ -413,7 +446,8 @@ void udp_server_endpoint_impl::join_unlocked(const std::string &_address) {
joined_group_ = true;
} catch (const std::exception &e) {
- VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what();
+ VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what()
+ << " address: " << _address;
}
};
@@ -467,7 +501,8 @@ void udp_server_endpoint_impl::leave_unlocked(const std::string &_address) {
}
}
catch (const std::exception &e) {
- VSOMEIP_ERROR << __func__ << ":" << e.what();
+ VSOMEIP_ERROR << __func__ << ":" << e.what()
+ << " address: " << _address;
}
}
@@ -500,6 +535,10 @@ std::uint16_t udp_server_endpoint_impl::get_local_port() const {
return local_port_;
}
+void udp_server_endpoint_impl::set_local_port(std::uint16_t _port) {
+ (void)_port;
+}
+
void udp_server_endpoint_impl::on_unicast_received(
boost::system::error_code const &_error,
std::size_t _bytes,
@@ -624,9 +663,13 @@ void udp_server_endpoint_impl::on_message_received(
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
_buffer[i + VSOMEIP_SESSION_POS_MIN],
_buffer[i + VSOMEIP_SESSION_POS_MAX]);
- clients_mutex_.lock();
- clients_[its_client][its_session] = _remote;
- clients_mutex_.unlock();
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ _buffer[i + VSOMEIP_METHOD_POS_MIN],
+ _buffer[i + VSOMEIP_METHOD_POS_MAX]);
+
+ std::lock_guard<std::mutex> its_requests_guard(requests_mutex_);
+ requests_[its_client]
+ [std::make_tuple(its_service, its_method, its_session)] = _remote;
}
} else if (its_service != VSOMEIP_SD_SERVICE
&& utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])
@@ -656,11 +699,19 @@ void udp_server_endpoint_impl::on_message_received(
res.second[VSOMEIP_CLIENT_POS_MIN],
res.second[VSOMEIP_CLIENT_POS_MAX]);
if (its_client != MAGIC_COOKIE_CLIENT) {
+ const service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ res.second[VSOMEIP_SERVICE_POS_MIN],
+ res.second[VSOMEIP_SERVICE_POS_MAX]);
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ res.second[VSOMEIP_METHOD_POS_MIN],
+ res.second[VSOMEIP_METHOD_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
res.second[VSOMEIP_SESSION_POS_MIN],
res.second[VSOMEIP_SESSION_POS_MAX]);
- std::lock_guard<std::mutex> its_client_lock(clients_mutex_);
- clients_[its_client][its_session] = _remote;
+
+ std::lock_guard<std::mutex> its_requests_guard(requests_mutex_);
+ requests_[its_client]
+ [std::make_tuple(its_service, its_method, its_session)] = _remote;
}
} else if (its_service != VSOMEIP_SD_SERVICE
&& utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS])
diff --git a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
index c2c917f..5c8981c 100644
--- a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
@@ -112,6 +112,10 @@ std::uint16_t virtual_server_endpoint_impl::get_local_port() const {
return port_;
}
+void virtual_server_endpoint_impl::set_local_port(std::uint16_t _port) {
+ port_ = _port;
+}
+
std::uint16_t virtual_server_endpoint_impl::get_remote_port() const {
return ILLEGAL_PORT;
}