summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp187
1 files changed, 125 insertions, 62 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 00108a2..e78f400 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -92,7 +92,7 @@ void routing_manager_impl::init() {
}
void routing_manager_impl::start() {
-#ifndef WIN32
+#ifndef _WIN32
netlink_connector_ = std::make_shared<netlink_connector>(host_->get_io(),
configuration_->get_unicast_address());
netlink_connector_->register_net_if_changes_handler(
@@ -115,9 +115,12 @@ void routing_manager_impl::start() {
}
#ifndef WITHOUT_SYSTEMD
- watchdog_timer_.expires_from_now(std::chrono::seconds(0));
- watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk,
- this, std::placeholders::_1));
+ {
+ std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
+ watchdog_timer_.expires_from_now(std::chrono::seconds(0));
+ watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk,
+ this, std::placeholders::_1));
+ }
#endif
}
@@ -126,14 +129,17 @@ void routing_manager_impl::stop() {
std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
version_log_timer_.cancel();
}
-#ifndef WIN32
+#ifndef _WIN32
if (netlink_connector_) {
netlink_connector_->stop();
}
#endif
#ifndef WITHOUT_SYSTEMD
- watchdog_timer_.cancel();
+ {
+ std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
+ watchdog_timer_.cancel();
+ }
sd_notify(0, "STOPPING=1");
#endif
@@ -251,10 +257,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
auto its_info = find_service(_service, _instance);
if (!its_info) {
- {
- std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
- requested_services_[_client][_service][_instance].insert({ _major, _minor });
- }
+ requested_service_add(_client, _service, _instance, _major, _minor);
if (discovery_) {
if (!configuration_->is_local_service(_service, _instance)) {
// Non local service instance ~> tell SD to find it!
@@ -270,16 +273,14 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
}
} else {
if ((_major == its_info->get_major()
- || DEFAULT_MAJOR == its_info->get_major())
+ || DEFAULT_MAJOR == its_info->get_major()
+ || ANY_MAJOR == _major)
&& (_minor <= its_info->get_minor()
|| DEFAULT_MINOR == its_info->get_minor()
|| _minor == ANY_MINOR)) {
if(!its_info->is_local()) {
- {
- std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
- requested_services_[_client][_service][_instance].insert({ _major, _minor });
- its_info->add_client(_client);
- }
+ requested_service_add(_client, _service, _instance, _major, _minor);
+ its_info->add_client(_client);
find_or_create_remote_client(_service, _instance, true, VSOMEIP_ROUTING_CLIENT);
if (_use_exclusive_proxy) {
std::shared_ptr<endpoint> its_endpoint = its_info->get_endpoint(true);
@@ -310,20 +311,7 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
routing_manager_base::release_service(_client, _service, _instance);
-
- {
- std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
- auto its_client = requested_services_.find(_client);
- if (its_client != requested_services_.end()) {
- auto its_service = its_client->second.find(_service);
- if (its_service != its_client->second.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- its_service->second.erase(_instance);
- }
- }
- }
- }
+ requested_service_remove(_client, _service, _instance);
std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
if(its_info && !its_info->is_local()) {
@@ -339,6 +327,7 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
clear_service_info(_service, _instance, false);
clear_identified_clients(_service, _instance);
clear_identifying_clients( _service, _instance);
+ unset_all_eventpayloads(_service, _instance);
} else {
remove_identified_client(_service, _instance, _client);
remove_identifying_client(_service, _instance, _client);
@@ -462,9 +451,15 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
_eventgroup);
if (found_eventgroup != found_instance->second.end()) {
found_eventgroup->second.erase(_client);
- if (0 == found_eventgroup->second.size()) {
+ if (!found_eventgroup->second.size()) {
last_subscriber_removed = true;
- eventgroup_clients_.erase(_eventgroup);
+ found_instance->second.erase(_eventgroup);
+ if (!found_service->second.size()) {
+ found_service->second.erase(_instance);
+ if (!found_service->second.size()) {
+ eventgroup_clients_.erase(_service);
+ }
+ }
}
}
}
@@ -487,6 +482,9 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
}
}
}
+ if (last_subscriber_removed) {
+ unset_all_eventpayloads(_service, _instance, _eventgroup);
+ }
if (subscriber == VSOMEIP_ROUTING_CLIENT && last_subscriber_removed) {
// for normal subscribers only unsubscribe via SD if last
// subscriber was removed
@@ -751,6 +749,27 @@ bool routing_manager_impl::send_to(const std::shared_ptr<endpoint_definition> &_
return false;
}
+void routing_manager_impl::register_event(
+ client_t _client, service_t _service, instance_t _instance,
+ event_t _event, const std::set<eventgroup_t> &_eventgroups,
+ bool _is_field, std::chrono::milliseconds _cycle,
+ bool _change_resets_cycle, epsilon_change_func_t _epsilon_change_func,
+ bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
+ auto its_event = find_event(_service, _instance, _event);
+ bool is_first(false);
+ if (its_event && !its_event->has_ref(_client, _is_provided)) {
+ is_first = true;
+ } else {
+ is_first = true;
+ }
+ if (is_first) {
+ routing_manager_base::register_event(_client, _service, _instance,
+ _event, _eventgroups, _is_field, _cycle, _change_resets_cycle,
+ _epsilon_change_func, _is_provided, _is_shadow,
+ _is_cache_placeholder);
+ }
+}
+
void routing_manager_impl::register_shadow_event(client_t _client,
service_t _service, instance_t _instance,
event_t _event, const std::set<eventgroup_t> &_eventgroups,
@@ -1549,7 +1568,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
its_endpoint->enable_magic_cookies();
}
} else {
-#ifndef WIN32
+#ifndef _WIN32
if (its_unicast.is_v4()) {
its_unicast = boost::asio::ip::address_v4::any();
} else if (its_unicast.is_v6()) {
@@ -1929,7 +1948,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
if (found_instance != found_service->second.end()) {
for (const auto &major_minor_pair : found_instance->second) {
if ((major_minor_pair.first == _major
- || _major == DEFAULT_MAJOR)
+ || _major == DEFAULT_MAJOR
+ || major_minor_pair.first == ANY_MAJOR)
&& (major_minor_pair.second <= _minor
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
@@ -1969,7 +1989,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
if (found_instance != found_service->second.end()) {
for (const auto &major_minor_pair : found_instance->second) {
if ((major_minor_pair.first == _major
- || _major == DEFAULT_MAJOR)
+ || _major == DEFAULT_MAJOR
+ || major_minor_pair.first == ANY_MAJOR)
&& (major_minor_pair.second <= _minor
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
@@ -2339,14 +2360,14 @@ void routing_manager_impl::on_subscribe(
get_client(_subscriber);
}
}
- stub_->send_subscribe(find_local(_service, _instance),
- client, _service, _instance, _eventgroup, its_eventgroup->get_major(), true);
// send initial events if we already have a cached field (is_set)
for (auto its_event : its_eventgroup->get_events()) {
if (its_event->is_field() && its_event->is_set()) {
its_event->notify_one(_subscriber, true); // TODO: use _flush parameter to send all event at once
}
}
+ stub_->send_subscribe(find_local(_service, _instance),
+ client, _service, _instance, _eventgroup, its_eventgroup->get_major(), true);
}
}
}
@@ -2397,25 +2418,27 @@ void routing_manager_impl::on_subscribe_ack(service_t _service,
instance_t _instance, const boost::asio::ip::address &_address,
uint16_t _port) {
- if (multicast_info.find(_service) != multicast_info.end()) {
- if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) {
- auto endpoint_def = multicast_info[_service][_instance];
- if (endpoint_def->get_address() == _address &&
- endpoint_def->get_port() == _port) {
-
- // Multicast info and endpoint already created before
- // This can happen when more than one client subscribe on the same instance!
- return;
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ if (multicast_info.find(_service) != multicast_info.end()) {
+ if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) {
+ auto endpoint_def = multicast_info[_service][_instance];
+ if (endpoint_def->get_address() == _address &&
+ endpoint_def->get_port() == _port) {
+
+ // Multicast info and endpoint already created before
+ // This can happen when more than one client subscribe on the same instance!
+ return;
+ }
}
}
- }
-
- // Save multicast info to be able to delete the endpoint
- // as soon as the instance stops offering its service
- std::shared_ptr<endpoint_definition> endpoint_def =
- endpoint_definition::get(_address, _port, false);
- multicast_info[_service][_instance] = endpoint_def;
+ // Save multicast info to be able to delete the endpoint
+ // as soon as the instance stops offering its service
+ std::shared_ptr<endpoint_definition> endpoint_def =
+ endpoint_definition::get(_address, _port, false);
+ multicast_info[_service][_instance] = endpoint_def;
+ }
bool is_someip = configuration_->is_someip(_service, _instance);
// Create multicast endpoint & join multicase group
@@ -2645,12 +2668,19 @@ void routing_manager_impl::clear_multicast_endpoints(service_t _service, instanc
if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) {
std::string address = multicast_info[_service][_instance]->get_address().to_string();
uint16_t port = multicast_info[_service][_instance]->get_port();
- auto multicast_endpoint = server_endpoints_[port][false];
- multicast_endpoint->leave(address);
- multicast_endpoint->stop();
- server_endpoints_[port].erase(false);
- if (server_endpoints_[port].find(true) == server_endpoints_[port].end()) {
- server_endpoints_.erase(port);
+ std::shared_ptr<endpoint> multicast_endpoint;
+ auto found_port = server_endpoints_.find(port);
+ if (found_port != server_endpoints_.end()) {
+ auto found_unreliable = found_port->second.find(false);
+ if (found_unreliable != found_port->second.end()) {
+ multicast_endpoint = found_unreliable->second;
+ multicast_endpoint->leave(address);
+ multicast_endpoint->stop();
+ server_endpoints_[port].erase(false);
+ }
+ if (found_port->second.find(true) == found_port->second.end()) {
+ server_endpoints_.erase(port);
+ }
}
multicast_info[_service].erase(_instance);
if (0 >= multicast_info[_service].size()) {
@@ -2659,7 +2689,7 @@ void routing_manager_impl::clear_multicast_endpoints(service_t _service, instanc
// Clear service_instances_ for multicase endpoint
if (1 >= service_instances_[_service].size()) {
service_instances_.erase(_service);
- } else {
+ } else if (multicast_endpoint) {
service_instances_[_service].erase(multicast_endpoint.get());
}
}
@@ -3032,6 +3062,7 @@ void routing_manager_impl::watchdog_cbk(boost::system::error_code const &_error)
}
if (has_interval) {
+ std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
watchdog_timer_.expires_from_now(std::chrono::microseconds(its_interval / 2));
watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk,
this, std::placeholders::_1));
@@ -3501,7 +3532,7 @@ void routing_manager_impl::on_net_if_state_changed(std::string _if, bool _availa
if (_available) {
VSOMEIP_INFO << "Network interface \"" << _if << "\" is up and running.";
start_ip_routing();
-#ifndef WIN32
+#ifndef _WIN32
if (netlink_connector_) {
netlink_connector_->unregister_net_if_changes_handler();
}
@@ -3526,4 +3557,36 @@ void routing_manager_impl::start_ip_routing() {
pending_sd_offers_.clear();
}
+void routing_manager_impl::requested_service_add(client_t _client,
+ service_t _service,
+ instance_t _instance,
+ major_version_t _major,
+ minor_version_t _minor) {
+ std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
+ requested_services_[_client][_service][_instance].insert({ _major, _minor });
+}
+
+void routing_manager_impl::requested_service_remove(client_t _client,
+ service_t _service,
+ instance_t _instance) {
+ std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
+ auto found_client = requested_services_.find(_client);
+ if (found_client != requested_services_.end()) {
+ auto found_service = found_client->second.find(_service);
+ if (found_service != found_client->second.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ // delete all requested major/minor versions
+ found_service->second.erase(_instance);
+ if (!found_service->second.size()) {
+ found_client->second.erase(_service);
+ if (!found_client->second.size()) {
+ requested_services_.erase(client_);
+ }
+ }
+ }
+ }
+ }
+}
+
} // namespace vsomeip