summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp303
1 files changed, 169 insertions, 134 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index c1daec7..00108a2 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -107,6 +107,7 @@ void routing_manager_impl::start() {
host_->on_state(state_type_e::ST_REGISTERED);
if (configuration_->log_version()) {
+ std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
version_log_timer_.expires_from_now(
std::chrono::seconds(0));
version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk,
@@ -121,7 +122,10 @@ void routing_manager_impl::start() {
}
void routing_manager_impl::stop() {
- version_log_timer_.cancel();
+ {
+ std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
+ version_log_timer_.cancel();
+ }
#ifndef WIN32
if (netlink_connector_) {
netlink_connector_->stop();
@@ -425,10 +429,12 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
}
}
}
- std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
- eventgroup_data_t subscription = { _service, _instance, _eventgroup, _major,
- _subscription_type};
- pending_subscriptions_.insert(subscription);
+ {
+ std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
+ eventgroup_data_t subscription = { _service, _instance, _eventgroup, _major,
+ _subscription_type};
+ pending_subscriptions_.insert(subscription);
+ }
} else {
VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!";
}
@@ -1621,11 +1627,14 @@ void routing_manager_impl::remove_local(client_t _client) {
instance_t routing_manager_impl::find_instance(service_t _service,
endpoint * _endpoint) {
instance_t its_instance(0xFFFF);
- auto found_service = service_instances_.find(_service);
- if (found_service != service_instances_.end()) {
- auto found_endpoint = found_service->second.find(_endpoint);
- if (found_endpoint != found_service->second.end()) {
- its_instance = found_endpoint->second;
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ auto found_service = service_instances_.find(_service);
+ if (found_service != service_instances_.end()) {
+ auto found_endpoint = found_service->second.find(_endpoint);
+ if (found_endpoint != found_service->second.end()) {
+ its_instance = found_endpoint->second;
+ }
}
}
return (its_instance);
@@ -1849,48 +1858,51 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
bool is_reliable_known(false);
bool is_unreliable_known(false);
- auto found_service = remote_service_info_.find(_service);
- if (found_service != remote_service_info_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- std::shared_ptr<endpoint_definition> its_definition;
- if (_reliable_port != ILLEGAL_PORT) {
- auto found_reliable = found_instance->second.find(true);
- if (found_reliable != found_instance->second.end()) {
- its_definition = found_reliable->second;
- if (its_definition->get_address() == _reliable_address
- && its_definition->get_port() == _reliable_port) {
- is_reliable_known = true;
- } else {
- VSOMEIP_WARNING << "Reliable service endpoint has changed: ["
- << std::hex << std::setfill('0') << std::setw(4) << _service << "."
- << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
- << std::dec << static_cast<std::uint32_t>(_major) << "."
- << std::dec << _minor << "] old: "
- << its_definition->get_address().to_string() << ":"
- << its_definition->get_port() << " new: "
- << _reliable_address.to_string() << ":"
- << _reliable_port;
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ auto found_service = remote_service_info_.find(_service);
+ if (found_service != remote_service_info_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ std::shared_ptr<endpoint_definition> its_definition;
+ if (_reliable_port != ILLEGAL_PORT) {
+ auto found_reliable = found_instance->second.find(true);
+ if (found_reliable != found_instance->second.end()) {
+ its_definition = found_reliable->second;
+ if (its_definition->get_address() == _reliable_address
+ && its_definition->get_port() == _reliable_port) {
+ is_reliable_known = true;
+ } else {
+ VSOMEIP_WARNING << "Reliable service endpoint has changed: ["
+ << std::hex << std::setfill('0') << std::setw(4) << _service << "."
+ << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
+ << std::dec << static_cast<std::uint32_t>(_major) << "."
+ << std::dec << _minor << "] old: "
+ << its_definition->get_address().to_string() << ":"
+ << its_definition->get_port() << " new: "
+ << _reliable_address.to_string() << ":"
+ << _reliable_port;
+ }
}
}
- }
- if (_unreliable_port != ILLEGAL_PORT) {
- auto found_unreliable = found_instance->second.find(false);
- if (found_unreliable != found_instance->second.end()) {
- its_definition = found_unreliable->second;
- if (its_definition->get_address() == _unreliable_address
- && its_definition->get_port() == _unreliable_port) {
- is_unreliable_known = true;
- } else {
- VSOMEIP_WARNING << "Unreliable service endpoint has changed: ["
- << std::hex << std::setfill('0') << std::setw(4) << _service << "."
- << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
- << std::dec << static_cast<std::uint32_t>(_major) << "."
- << std::dec << _minor << "] old: "
- << its_definition->get_address().to_string() << ":"
- << its_definition->get_port() << " new: "
- << _unreliable_address.to_string() << ":"
- << _unreliable_port;
+ if (_unreliable_port != ILLEGAL_PORT) {
+ auto found_unreliable = found_instance->second.find(false);
+ if (found_unreliable != found_instance->second.end()) {
+ its_definition = found_unreliable->second;
+ if (its_definition->get_address() == _unreliable_address
+ && its_definition->get_port() == _unreliable_port) {
+ is_unreliable_known = true;
+ } else {
+ VSOMEIP_WARNING << "Unreliable service endpoint has changed: ["
+ << std::hex << std::setfill('0') << std::setw(4) << _service << "."
+ << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
+ << std::dec << static_cast<std::uint32_t>(_major) << "."
+ << std::dec << _minor << "] old: "
+ << its_definition->get_address().to_string() << ":"
+ << its_definition->get_port() << " new: "
+ << _unreliable_address.to_string() << ":"
+ << _unreliable_port;
+ }
}
}
}
@@ -1901,7 +1913,10 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
if (_reliable_port != ILLEGAL_PORT && !is_reliable_known) {
std::shared_ptr<endpoint_definition> endpoint_def
= endpoint_definition::get(_reliable_address, _reliable_port, true);
- remote_service_info_[_service][_instance][true] = endpoint_def;
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ remote_service_info_[_service][_instance][true] = endpoint_def;
+ }
// check if service was requested and establish TCP connection if necessary
{
@@ -1933,14 +1948,15 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
}
}
}
-
- std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
- auto found_service2 = specific_endpoint_clients_.find(_service);
- if (found_service2 != specific_endpoint_clients_.end()) {
- auto found_instance = found_service2->second.find(_instance);
- if (found_instance != found_service2->second.end()) {
- for (const client_t& c : found_instance->second) {
- find_or_create_remote_client(_service, _instance, true, c);
+ {
+ std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
+ auto found_service2 = specific_endpoint_clients_.find(_service);
+ if (found_service2 != specific_endpoint_clients_.end()) {
+ auto found_instance = found_service2->second.find(_instance);
+ if (found_instance != found_service2->second.end()) {
+ for (const client_t& c : found_instance->second) {
+ find_or_create_remote_client(_service, _instance, true, c);
+ }
}
}
}
@@ -1984,7 +2000,10 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
if (_unreliable_port != ILLEGAL_PORT && !is_unreliable_known) {
std::shared_ptr<endpoint_definition> endpoint_def
= endpoint_definition::get(_unreliable_address, _unreliable_port, false);
- remote_service_info_[_service][_instance][false] = endpoint_def;
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ remote_service_info_[_service][_instance][false] = endpoint_def;
+ }
if (!is_reliable_known) {
on_availability(_service, _instance, true, _major, _minor);
stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, _major, _minor);
@@ -2066,7 +2085,7 @@ std::chrono::milliseconds routing_manager_impl::update_routing_info(std::chrono:
for (auto &s : get_services()) {
for (auto &i : s.second) {
- if (find_local_client(s.first, i.first) != VSOMEIP_ROUTING_CLIENT) {
+ if (i.second->is_local()) {
continue; //don't expire local services
}
ttl_t its_ttl = i.second->get_ttl();
@@ -2403,7 +2422,10 @@ void routing_manager_impl::on_subscribe_ack(service_t _service,
std::shared_ptr<endpoint> its_endpoint
= find_or_create_server_endpoint(_port, false, is_someip);
if (its_endpoint) {
- service_instances_[_service][its_endpoint.get()] = _instance;
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ service_instances_[_service][its_endpoint.get()] = _instance;
+ }
its_endpoint->join(_address.to_string());
} else {
VSOMEIP_ERROR<<"Could not find/create multicast endpoint!";
@@ -2458,101 +2480,109 @@ void routing_manager_impl::on_subscribe_nack(client_t _client,
bool routing_manager_impl::deliver_specific_endpoint_message(service_t _service,
instance_t _instance, const byte_t *_data, length_t _size, endpoint *_receiver) {
+ client_t its_client(0x0);
+
// Try to deliver specific endpoint message (for selective subscribers)
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- auto found_servic = remote_services_.find(_service);
- if (found_servic != remote_services_.end()) {
- auto found_instance = found_servic->second.find(_instance);
- if (found_instance != found_servic->second.end()) {
- for (auto client_entry : found_instance->second) {
- client_t client = client_entry.first;
- if (!client) {
- continue;
- }
- auto found_reliability = client_entry.second.find(_receiver->is_reliable());
- if (found_reliability != client_entry.second.end()) {
- auto found_enpoint = found_reliability->second;
- if (found_enpoint.get() == _receiver) {
- if (client != get_client()) {
- auto local_endpoint = find_local(client);
- if (local_endpoint) {
- send_local(local_endpoint, client, _data, _size, _instance, true,
- _receiver->is_reliable(), VSOMEIP_SEND);
- }
- } else {
- deliver_message(_data, _size, _instance, _receiver->is_reliable());
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ auto found_servic = remote_services_.find(_service);
+ if (found_servic != remote_services_.end()) {
+ auto found_instance = found_servic->second.find(_instance);
+ if (found_instance != found_servic->second.end()) {
+ for (auto client_entry : found_instance->second) {
+ if (!client_entry.first) {
+ continue;
+ }
+ auto found_reliability = client_entry.second.find(_receiver->is_reliable());
+ if (found_reliability != client_entry.second.end()) {
+ auto found_enpoint = found_reliability->second;
+ if (found_enpoint.get() == _receiver) {
+ its_client = client_entry.first;
+ break;
}
- return true;
}
}
}
}
}
+ if (its_client) {
+ if (its_client != get_client()) {
+ auto local_endpoint = find_local(its_client);
+ if (local_endpoint) {
+ send_local(local_endpoint, its_client, _data, _size, _instance, true,
+ _receiver->is_reliable(), VSOMEIP_SEND);
+ }
+ } else {
+ deliver_message(_data, _size, _instance, _receiver->is_reliable());
+ }
+ return true;
+ }
return false;
}
void routing_manager_impl::clear_client_endpoints(service_t _service, instance_t _instance,
bool _reliable) {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- std::shared_ptr<endpoint> deleted_endpoint;
- // Clear client endpoints for remote services (generic and specific ones)
- if (remote_services_.find(_service) != remote_services_.end()) {
- if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) {
- auto endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT][_reliable];
- if (endpoint) {
- service_instances_[_service].erase(endpoint.get());
- deleted_endpoint = endpoint;
- }
- remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].erase(_reliable);
- auto found_endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].find(
- !_reliable);
- if (found_endpoint == remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].end()) {
- remote_services_[_service][_instance].erase(VSOMEIP_ROUTING_CLIENT);
- }
- }
- }
+ std::unordered_set<client_t> its_specific_endpoint_clients;
{
std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
auto found_service = specific_endpoint_clients_.find(_service);
if(found_service != specific_endpoint_clients_.end()){
auto found_instance = found_service->second.find(_instance);
if(found_instance != found_service->second.end()) {
- for (const client_t& client : found_instance->second) {
- if (remote_services_.find(_service) != remote_services_.end()) {
- if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) {
- auto endpoint = remote_services_[_service][_instance][client][_reliable];
- if (endpoint) {
- service_instances_[_service].erase(endpoint.get());
- endpoint->stop();
- }
- remote_services_[_service][_instance][client].erase(_reliable);
- auto found_endpoint = remote_services_[_service][_instance][client].find(!_reliable);
- if (found_endpoint == remote_services_[_service][_instance][client].end()) {
- remote_services_[_service][_instance].erase(client);
- }
- }
+ its_specific_endpoint_clients = found_instance->second;
+ }
+ }
+ }
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ std::shared_ptr<endpoint> deleted_endpoint;
+ // Clear client endpoints for remote services (generic and specific ones)
+ if (remote_services_.find(_service) != remote_services_.end()) {
+ if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) {
+ auto endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT][_reliable];
+ if (endpoint) {
+ service_instances_[_service].erase(endpoint.get());
+ deleted_endpoint = endpoint;
+ }
+ remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].erase(_reliable);
+ auto found_endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].find(
+ !_reliable);
+ if (found_endpoint == remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].end()) {
+ remote_services_[_service][_instance].erase(VSOMEIP_ROUTING_CLIENT);
+ }
+ // erase specific client endpoints
+ for (const client_t &client : its_specific_endpoint_clients) {
+ auto endpoint = remote_services_[_service][_instance][client][_reliable];
+ if (endpoint) {
+ service_instances_[_service].erase(endpoint.get());
+ endpoint->stop();
+ }
+ remote_services_[_service][_instance][client].erase(_reliable);
+ auto found_endpoint = remote_services_[_service][_instance][client].find(!_reliable);
+ if (found_endpoint == remote_services_[_service][_instance][client].end()) {
+ remote_services_[_service][_instance].erase(client);
}
}
}
}
- }
- if (remote_services_.find(_service) != remote_services_.end()) {
- if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) {
- if (!remote_services_[_service][_instance].size()) {
- remote_services_[_service].erase(_instance);
- if (0 >= remote_services_[_service].size()) {
- remote_services_.erase(_service);
+ if (remote_services_.find(_service) != remote_services_.end()) {
+ if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) {
+ if (!remote_services_[_service][_instance].size()) {
+ remote_services_[_service].erase(_instance);
+ if (0 >= remote_services_[_service].size()) {
+ remote_services_.erase(_service);
+ }
}
}
}
- }
- if (!service_instances_[_service].size()) {
- service_instances_.erase(_service);
- }
- if(deleted_endpoint) {
- stop_and_delete_client_endpoint(deleted_endpoint);
+ if (!service_instances_[_service].size()) {
+ service_instances_.erase(_service);
+ }
+ if(deleted_endpoint) {
+ stop_and_delete_client_endpoint(deleted_endpoint);
+ }
}
}
@@ -2828,6 +2858,7 @@ bool routing_manager_impl::send_identify_message(client_t _client,
bool routing_manager_impl::supports_selective(service_t _service, instance_t _instance) {
bool supports_selective(false);
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
auto its_service = remote_service_info_.find(_service);
if (its_service != remote_service_info_.end()) {
auto its_instance = its_service->second.find(_instance);
@@ -2973,10 +3004,13 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const
#endif
VSOMEIP_INFO << "vSomeIP " << VSOMEIP_VERSION;
- version_log_timer_.expires_from_now(
- std::chrono::seconds(configuration_->get_log_version_interval()));
- version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk,
- this, std::placeholders::_1));
+ {
+ std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
+ version_log_timer_.expires_from_now(
+ std::chrono::seconds(configuration_->get_log_version_interval()));
+ version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk,
+ this, std::placeholders::_1));
+ }
}
}
@@ -3008,6 +3042,7 @@ void routing_manager_impl::watchdog_cbk(boost::system::error_code const &_error)
void routing_manager_impl::clear_remote_service_info(service_t _service, instance_t _instance, bool _reliable) {
// Clear remote_service_info_
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
if (remote_service_info_.find(_service) != remote_service_info_.end()) {
if (remote_service_info_[_service].find(_instance) != remote_service_info_[_service].end()) {
remote_service_info_[_service][_instance].erase(_reliable);