summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/endpoint_manager_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/endpoint_manager_impl.cpp')
-rw-r--r--implementation/endpoints/src/endpoint_manager_impl.cpp190
1 files changed, 133 insertions, 57 deletions
diff --git a/implementation/endpoints/src/endpoint_manager_impl.cpp b/implementation/endpoints/src/endpoint_manager_impl.cpp
index a82a173..dbb2107 100644
--- a/implementation/endpoints/src/endpoint_manager_impl.cpp
+++ b/implementation/endpoints/src/endpoint_manager_impl.cpp
@@ -328,8 +328,10 @@ bool endpoint_manager_impl::remove_server_endpoint(uint16_t _port, bool _reliabl
return ret;
}
-void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_t _instance,
- bool _reliable) {
+void
+endpoint_manager_impl::clear_client_endpoints(
+ service_t _service, instance_t _instance, bool _reliable) {
+
std::shared_ptr<endpoint> endpoint_to_delete;
bool other_services_reachable_through_endpoint(false);
{
@@ -371,8 +373,12 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_
}
if (!other_services_reachable_through_endpoint) {
- std::uint16_t its_port(0);
+ partition_id_t its_partition;
boost::asio::ip::address its_address;
+ std::uint16_t its_port(0);
+
+ its_partition = configuration_->get_partition_id(_service, _instance);
+
if (_reliable) {
std::shared_ptr<tcp_client_endpoint_impl> ep =
std::dynamic_pointer_cast<tcp_client_endpoint_impl>(endpoint_to_delete);
@@ -392,15 +398,21 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_
if (found_ip != client_endpoints_by_ip_.end()) {
const auto found_port = found_ip->second.find(its_port);
if (found_port != found_ip->second.end()) {
- const auto found_reliable = found_port->second.find(_reliable);
+ auto found_reliable = found_port->second.find(_reliable);
if (found_reliable != found_port->second.end()) {
- if (found_reliable->second == endpoint_to_delete) {
- found_port->second.erase(_reliable);
- // delete if necessary
- if (!found_port->second.size()) {
- found_ip->second.erase(found_port);
- if (!found_ip->second.size()) {
- client_endpoints_by_ip_.erase(found_ip);
+ const auto found_partition = found_reliable->second.find(its_partition);
+ if (found_partition != found_reliable->second.end()) {
+ if (found_partition->second == endpoint_to_delete) {
+ found_reliable->second.erase(its_partition);
+ // delete if necessary
+ if (0 == found_reliable->second.size()) {
+ found_port->second.erase(_reliable);
+ if (0 == found_port->second.size()) {
+ found_ip->second.erase(found_port);
+ if (0 == found_ip->second.size()) {
+ client_endpoints_by_ip_.erase(found_ip);
+ }
+ }
}
}
}
@@ -455,8 +467,14 @@ void endpoint_manager_impl::find_or_create_multicast_endpoint(
std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
service_instances_multicast_[_service][_sender] = _instance;
}
- dynamic_cast<udp_server_endpoint_impl*>(its_endpoint.get())->join_unlocked(
- _address.to_string());
+
+ auto its_udp_server_endpoint
+ = std::dynamic_pointer_cast<udp_server_endpoint_impl>(its_endpoint);
+ if (_port != configuration_->get_sd_port()) {
+ its_udp_server_endpoint->join(_address.to_string());
+ } else {
+ its_udp_server_endpoint->join_unlocked(_address.to_string());
+ }
} else {
VSOMEIP_ERROR <<"Could not find/create multicast endpoint!";
}
@@ -540,11 +558,13 @@ void endpoint_manager_impl::print_status() const {
VSOMEIP_INFO << "status start remote client endpoints:";
std::uint32_t num_remote_client_endpoints(0);
// normal endpoints
- for (const auto &a : client_endpoints_by_ip) {
- for (const auto& p : a.second) {
- for (const auto& ru : p.second) {
- ru.second->print_status();
- num_remote_client_endpoints++;
+ for (const auto &its_address : client_endpoints_by_ip) {
+ for (const auto &its_port : its_address.second) {
+ for (const auto &its_reliability : its_port.second) {
+ for (const auto &its_partition : its_reliability.second) {
+ its_partition.second->print_status();
+ num_remote_client_endpoints++;
+ }
}
}
}
@@ -800,6 +820,37 @@ void endpoint_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
}
}
+bool endpoint_manager_impl::on_bind_error(std::shared_ptr<endpoint> _endpoint, std::uint16_t _remote_port) {
+ std::lock_guard<std::recursive_mutex> its_ep_lock(endpoint_mutex_);
+ for (auto &its_service : remote_services_) {
+ for (auto &its_instance : its_service.second) {
+ const bool is_reliable = _endpoint->is_reliable();
+ auto found_endpoint = its_instance.second.find(is_reliable);
+ if (found_endpoint != its_instance.second.end()) {
+ if (found_endpoint->second == _endpoint) {
+ // get a new client port using service / instance / remote port
+ uint16_t its_old_local_port = _endpoint->get_local_port();
+ uint16_t its_new_local_port(ILLEGAL_PORT);
+
+ std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_);
+ if (configuration_->get_client_port(its_service.first,
+ its_instance.first,
+ _remote_port,
+ is_reliable,
+ used_client_ports_,
+ its_new_local_port)) {
+ _endpoint->set_local_port(its_new_local_port);
+ its_lock.unlock();
+ release_port(its_old_local_port, _endpoint->is_reliable());
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
+
void endpoint_manager_impl::on_error(
const byte_t *_data, length_t _length, endpoint* const _receiver,
const boost::asio::ip::address &_remote_address,
@@ -820,8 +871,10 @@ void endpoint_manager_impl::release_port(uint16_t _port, bool _reliable) {
used_client_ports_[_reliable].erase(_port);
}
-std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client(
+std::shared_ptr<endpoint>
+endpoint_manager_impl::find_remote_client(
service_t _service, instance_t _instance, bool _reliable) {
+
std::shared_ptr<endpoint> its_endpoint;
auto found_service = remote_services_.find(_service);
if (found_service != remote_services_.end()) {
@@ -837,35 +890,46 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client(
return its_endpoint;
}
- // If another service is hosted on the same server_endpoint
- // reuse the existing client_endpoint.
+ // Endpoint did not yet exist. Get the partition id to check
+ // whether the client endpoint for the partition does exist.
+ partition_id_t its_partition_id
+ = configuration_->get_partition_id(_service, _instance);
+
+ // If another service within the same partition is hosted on the
+ // same server_endpoint reuse the existing client_endpoint.
auto found_service_info = remote_service_info_.find(_service);
- if(found_service_info != remote_service_info_.end()) {
+ if (found_service_info != remote_service_info_.end()) {
auto found_instance = found_service_info->second.find(_instance);
- if(found_instance != found_service_info->second.end()) {
+ if (found_instance != found_service_info->second.end()) {
auto found_reliable = found_instance->second.find(_reliable);
- if(found_reliable != found_instance->second.end()) {
- std::shared_ptr<endpoint_definition> its_ep_def =
- found_reliable->second;
+ if (found_reliable != found_instance->second.end()) {
+ std::shared_ptr<endpoint_definition> its_ep_def
+ = found_reliable->second;
auto found_address = client_endpoints_by_ip_.find(
its_ep_def->get_address());
- if(found_address != client_endpoints_by_ip_.end()) {
+ if (found_address != client_endpoints_by_ip_.end()) {
auto found_port = found_address->second.find(
its_ep_def->get_remote_port());
- if(found_port != found_address->second.end()) {
- auto found_reliable2 = found_port->second.find(
- _reliable);
- if(found_reliable2 != found_port->second.end()) {
- its_endpoint = found_reliable2->second;
- // store the endpoint under this service/instance id
- // as well - needed for later cleanup
- remote_services_[_service][_instance][_reliable] =
- its_endpoint;
- service_instances_[_service][its_endpoint.get()] = _instance;
- // add endpoint to serviceinfo object
- auto found_service_info = rm_->find_service(_service,_instance);
- if (found_service_info) {
- found_service_info->set_endpoint(its_endpoint, _reliable);
+ if (found_port != found_address->second.end()) {
+ auto found_reliable2
+ = found_port->second.find(_reliable);
+ if (found_reliable2 != found_port->second.end()) {
+ auto found_partition
+ = found_reliable2->second.find(its_partition_id);
+ if (found_partition != found_reliable2->second.end()) {
+ its_endpoint = found_partition->second;
+
+ // store the endpoint under this service/instance id
+ // as well - needed for later cleanup
+ remote_services_[_service][_instance][_reliable]
+ = its_endpoint;
+ service_instances_[_service][its_endpoint.get()] = _instance;
+
+ // add endpoint to serviceinfo object
+ auto found_service_info = rm_->find_service(_service,_instance);
+ if (found_service_info) {
+ found_service_info->set_endpoint(its_endpoint, _reliable);
+ }
}
}
}
@@ -873,7 +937,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client(
}
}
}
- return its_endpoint;
+
+ return (its_endpoint);
}
std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
@@ -910,6 +975,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
}
if (its_endpoint) {
+ partition_id_t its_partition
+ = configuration_->get_partition_id(_service, _instance);
used_client_ports_[_reliable].insert(its_local_port);
its_lock.unlock();
service_instances_[_service][its_endpoint.get()] = _instance;
@@ -917,12 +984,19 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
client_endpoints_by_ip_[its_endpoint_def->get_address()]
[its_endpoint_def->get_port()]
- [_reliable] = its_endpoint;
+ [_reliable]
+ [its_partition]= its_endpoint;
// Set the basic route to the service in the service info
auto found_service_info = rm_->find_service(_service, _instance);
if (found_service_info) {
found_service_info->set_endpoint(its_endpoint, _reliable);
}
+ boost::system::error_code ec;
+ VSOMEIP_INFO << "endpoint_manager_impl::create_remote_client: "
+ << its_endpoint_def->get_address().to_string(ec)
+ << ":" << std::dec << its_endpoint_def->get_port()
+ << " reliable: " << _reliable
+ << " using local port: " << std::dec << its_local_port;
}
}
}
@@ -983,18 +1057,20 @@ endpoint_manager_impl::log_client_states() const {
its_client_endpoints = client_endpoints_by_ip_;
}
- for (const auto& its_address : its_client_endpoints) {
- for (const auto& its_port : its_address.second) {
- for (const auto& its_reliability : its_port.second) {
- size_t its_queue_size = its_reliability.second->get_queue_size();
- if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
- its_client_queue_sizes.push_back(
- std::make_pair(
- std::make_tuple(
- its_address.first,
- its_port.first,
- its_reliability.first),
- its_queue_size));
+ for (const auto &its_address : its_client_endpoints) {
+ for (const auto &its_port : its_address.second) {
+ for (const auto &its_reliability : its_port.second) {
+ for (const auto &its_partition : its_reliability.second) {
+ size_t its_queue_size = its_partition.second->get_queue_size();
+ if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
+ its_client_queue_sizes.push_back(
+ std::make_pair(
+ std::make_tuple(
+ its_address.first,
+ its_port.first,
+ its_reliability.first),
+ its_queue_size));
+ }
}
}
}
@@ -1040,8 +1116,8 @@ endpoint_manager_impl::log_server_states() const {
its_server_endpoints = server_endpoints_;
}
- for (const auto& its_port : its_server_endpoints) {
- for (const auto& its_reliability : its_port.second) {
+ for (const auto &its_port : its_server_endpoints) {
+ for (const auto &its_reliability : its_port.second) {
size_t its_queue_size = its_reliability.second->get_queue_size();
if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
its_client_queue_sizes.push_back(